From 3f60bdadac1a02e1109148bbbe5a8a3545f13849 Mon Sep 17 00:00:00 2001 From: mat <27899617+mat-1@users.noreply.github.com> Date: Thu, 17 Apr 2025 16:16:51 -0500 Subject: [PATCH] Move login state to the ECS (#213) * use packet handlers code for login custom_query * initial broken implementation for ecs-only login * fixes * run Update schedule 60 times per second and delete code related to run_schedule_sender * fix tests * fix online-mode * reply to query packets in a separate system and make it easier for plugins to disable individual replies * remove unused imports --- Cargo.lock | 14 + Cargo.toml | 1 + azalea-auth/Cargo.toml | 5 +- azalea-auth/src/game_profile.rs | 1 + azalea-buf/src/write.rs | 2 +- azalea-client/Cargo.toml | 1 + azalea-client/src/account.rs | 2 +- azalea-client/src/client.rs | 498 +++++------------- azalea-client/src/lib.rs | 5 +- azalea-client/src/plugins/chat/mod.rs | 3 - azalea-client/src/plugins/connection.rs | 369 +++++++++++++ azalea-client/src/plugins/disconnect.rs | 4 +- azalea-client/src/plugins/events.rs | 6 +- azalea-client/src/plugins/login.rs | 152 ++++++ azalea-client/src/plugins/mod.rs | 43 ++ .../src/plugins/packet/config/events.rs | 76 +-- .../src/plugins/packet/config/mod.rs | 180 +++---- .../src/plugins/packet/game/events.rs | 87 +-- azalea-client/src/plugins/packet/game/mod.rs | 313 ++++++----- azalea-client/src/plugins/packet/login.rs | 114 ---- .../src/plugins/packet/login/events.rs | 86 +++ azalea-client/src/plugins/packet/login/mod.rs | 145 +++++ azalea-client/src/plugins/packet/mod.rs | 84 ++- azalea-client/src/raw_connection.rs | 208 -------- azalea-client/src/test_simulation.rs | 95 +--- .../change_dimension_to_nether_and_back.rs | 2 + azalea-protocol/src/connect.rs | 5 +- .../src/packets/game/s_client_information.rs | 2 +- .../src/packets/login/s_custom_query.rs | 9 - azalea-protocol/src/read.rs | 2 + azalea-protocol/src/write.rs | 11 +- azalea/examples/echo.rs | 2 +- azalea/examples/steal.rs | 2 +- azalea/examples/testbot/commands/debug.rs | 8 +- azalea/examples/testbot/main.rs | 2 +- .../examples/todo/craft_dig_straight_down.rs | 2 +- azalea/src/container.rs | 7 +- azalea/src/swarm/mod.rs | 16 +- 38 files changed, 1322 insertions(+), 1242 deletions(-) create mode 100644 azalea-client/src/plugins/connection.rs create mode 100644 azalea-client/src/plugins/login.rs delete mode 100644 azalea-client/src/plugins/packet/login.rs create mode 100644 azalea-client/src/plugins/packet/login/events.rs create mode 100644 azalea-client/src/plugins/packet/login/mod.rs delete mode 100644 azalea-client/src/raw_connection.rs delete mode 100644 azalea-protocol/src/packets/login/s_custom_query.rs diff --git a/Cargo.lock b/Cargo.lock index dfebdd1a..4d48f4d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,6 +160,19 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-compat" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bab94bde396a3f7b4962e396fdad640e241ed797d4d8d77fc8c237d14c58fc0" +dependencies = [ + "futures-core", + "futures-io", + "once_cell", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-executor" version = "1.13.1" @@ -336,6 +349,7 @@ name = "azalea-client" version = "0.12.0+mc1.21.5" dependencies = [ "anyhow", + "async-compat", "azalea-auth", "azalea-block", "azalea-buf", diff --git a/Cargo.toml b/Cargo.toml index 4ea64850..18a97422 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ indexmap = "2.9.0" paste = "1.0.15" compact_str = "0.9.0" crc32fast = "1.4.2" +async-compat = "0.2.4" # --- Profile Settings --- diff --git a/azalea-auth/Cargo.toml b/azalea-auth/Cargo.toml index 8418616f..fb1b37c9 100644 --- a/azalea-auth/Cargo.toml +++ b/azalea-auth/Cargo.toml @@ -12,7 +12,10 @@ azalea-crypto = { path = "../azalea-crypto", version = "0.12.0" } base64.workspace = true chrono = { workspace = true, features = ["serde"] } md-5.workspace = true -reqwest = { workspace = true, features = ["json", "rustls-tls"] } +reqwest = { workspace = true, default-features = false, features = [ + "json", + "rustls-tls", +] } rsa.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true diff --git a/azalea-auth/src/game_profile.rs b/azalea-auth/src/game_profile.rs index ebff4fce..c2561a9d 100644 --- a/azalea-auth/src/game_profile.rs +++ b/azalea-auth/src/game_profile.rs @@ -10,6 +10,7 @@ pub struct GameProfile { pub uuid: Uuid, /// The username of the player. pub name: String, + // this is an arc to make GameProfile cheaper to clone when the properties are big pub properties: Arc>, } diff --git a/azalea-buf/src/write.rs b/azalea-buf/src/write.rs index c56b0062..0f35dba8 100644 --- a/azalea-buf/src/write.rs +++ b/azalea-buf/src/write.rs @@ -39,7 +39,7 @@ impl AzaleaWriteVar for i32 { let mut buffer = [0]; let mut value = *self; if value == 0 { - buf.write_all(&buffer).unwrap(); + buf.write_all(&buffer)?; } while value != 0 { buffer[0] = (value & 0b0111_1111) as u8; diff --git a/azalea-client/Cargo.toml b/azalea-client/Cargo.toml index ae00d1cf..6ca17a94 100644 --- a/azalea-client/Cargo.toml +++ b/azalea-client/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true repository.workspace = true [dependencies] +async-compat.workspace = true azalea-auth = { path = "../azalea-auth", version = "0.12.0" } azalea-block = { path = "../azalea-block", version = "0.12.0" } azalea-buf = { path = "../azalea-buf", version = "0.12.0" } diff --git a/azalea-client/src/account.rs b/azalea-client/src/account.rs index 5e2fafa7..a4b35e81 100644 --- a/azalea-client/src/account.rs +++ b/azalea-client/src/account.rs @@ -15,7 +15,7 @@ use uuid::Uuid; /// To join a server using this account, use [`Client::join`] or /// [`azalea::ClientBuilder`]. /// -/// Note that this is also a component that our clients have. +/// This is also an ECS component that is present on our client entities. /// /// # Examples /// diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index b8688b40..bef05a14 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -8,84 +8,69 @@ use std::{ time::{Duration, Instant}, }; -use azalea_auth::{game_profile::GameProfile, sessionserver::ClientSessionServerError}; +use azalea_auth::game_profile::GameProfile; use azalea_chat::FormattedText; use azalea_core::{ data_registry::ResolvableDataRegistry, position::Vec3, resource_location::ResourceLocation, tick::GameTick, }; use azalea_entity::{ - EntityPlugin, EntityUpdateSet, EyeHeight, LocalEntity, Position, + EntityUpdateSet, EyeHeight, LocalEntity, Position, indexing::{EntityIdIndex, EntityUuidIndex}, metadata::Health, }; -use azalea_physics::PhysicsPlugin; use azalea_protocol::{ ServerAddress, common::client_information::ClientInformation, connect::{Connection, ConnectionError, Proxy}, packets::{ self, ClientIntention, ConnectionProtocol, PROTOCOL_VERSION, Packet, - config::{ClientboundConfigPacket, ServerboundConfigPacket}, - game::ServerboundGamePacket, - handshake::{ - ClientboundHandshakePacket, ServerboundHandshakePacket, - s_intention::ServerboundIntention, - }, - login::{ - ClientboundLoginPacket, s_hello::ServerboundHello, s_key::ServerboundKey, - s_login_acknowledged::ServerboundLoginAcknowledged, - }, + game::{self, ServerboundGamePacket}, + handshake::s_intention::ServerboundIntention, + login::s_hello::ServerboundHello, }, resolver, }; use azalea_world::{Instance, InstanceContainer, InstanceName, MinecraftEntityId, PartialInstance}; -use bevy_app::{App, Plugin, PluginGroup, PluginGroupBuilder, PluginsState, Update}; +use bevy_app::{App, Plugin, PluginsState, Update}; use bevy_ecs::{ bundle::Bundle, component::Component, entity::Entity, schedule::{InternedScheduleLabel, IntoSystemConfigs, LogLevel, ScheduleBuildSettings}, - system::Resource, + system::{Commands, Resource}, world::World, }; -use bevy_time::TimePlugin; use parking_lot::{Mutex, RwLock}; use simdnbt::owned::NbtCompound; use thiserror::Error; use tokio::{ - sync::mpsc::{self, error::TrySendError}, + sync::mpsc::{self}, time, }; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use crate::{ - Account, PlayerInfo, - attack::{self, AttackPlugin}, - brand::BrandPlugin, - chat::ChatPlugin, - chunks::{ChunkBatchInfo, ChunksPlugin}, - disconnect::{DisconnectEvent, DisconnectPlugin}, - events::{Event, EventsPlugin, LocalPlayerEvents}, - interact::{CurrentSequenceNumber, InteractPlugin}, - inventory::{Inventory, InventoryPlugin}, + Account, DefaultPlugins, PlayerInfo, + attack::{self}, + chunks::ChunkBatchInfo, + connection::RawConnection, + disconnect::DisconnectEvent, + events::{Event, LocalPlayerEvents}, + interact::CurrentSequenceNumber, + inventory::Inventory, local_player::{ GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList, }, - mining::{self, MiningPlugin}, - movement::{LastSentLookDirection, MovementPlugin, PhysicsState}, + mining::{self}, + movement::{LastSentLookDirection, PhysicsState}, packet::{ - PacketPlugin, - login::{self, InLoginState, LoginSendPacketQueue}, + as_system, + game::SendPacketEvent, + login::{InLoginState, SendLoginPacketEvent}, }, player::retroactively_add_game_profile_component, - pong::PongPlugin, - raw_connection::RawConnection, - respawn::RespawnPlugin, - task_pool::TaskPoolPlugin, - tick_broadcast::TickBroadcastPlugin, - tick_end::TickEndPlugin, }; /// `Client` has the things that a user interacting with the library will want. @@ -99,15 +84,6 @@ use crate::{ /// [`azalea::ClientBuilder`]: https://docs.rs/azalea/latest/azalea/struct.ClientBuilder.html #[derive(Clone)] pub struct Client { - /// The [`GameProfile`] for our client. This contains your username, UUID, - /// and skin data. - /// - /// This is immutable; the server cannot change it. To get the username and - /// skin the server chose for you, get your player from the [`TabList`] - /// component. - /// - /// This as also available from the ECS as [`GameProfileComponent`]. - pub profile: GameProfile, /// The entity for this client in the ECS. pub entity: Entity, @@ -115,9 +91,6 @@ pub struct Client { /// directly. Note that if you're using a shared world (i.e. a swarm), this /// will contain all entities in all worlds. pub ecs: Arc>, - - /// Use this to force the client to run the schedule outside of a tick. - pub run_schedule_sender: mpsc::Sender<()>, } /// An error that happened while joining the server. @@ -131,6 +104,8 @@ pub enum JoinError { ReadPacket(#[from] Box), #[error("{0}")] Io(#[from] io::Error), + #[error("Failed to encrypt the challenge from the server for {0:?}")] + EncryptionError(packets::login::ClientboundHello), #[error("{0}")] SessionServer(#[from] azalea_auth::sessionserver::ClientSessionServerError), #[error("The given address could not be parsed into a ServerAddress")] @@ -147,7 +122,6 @@ pub struct StartClientOpts<'a> { pub address: &'a ServerAddress, pub resolved_address: &'a SocketAddr, pub proxy: Option, - pub run_schedule_sender: mpsc::Sender<()>, pub event_sender: Option>, } @@ -158,13 +132,10 @@ impl<'a> StartClientOpts<'a> { resolved_address: &'a SocketAddr, event_sender: Option>, ) -> StartClientOpts<'a> { - // An event that causes the schedule to run. This is only used internally. - let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1); - let mut app = App::new(); app.add_plugins(DefaultPlugins); - let ecs_lock = start_ecs_runner(app, run_schedule_receiver, run_schedule_sender.clone()); + let ecs_lock = start_ecs_runner(app); Self { ecs_lock, @@ -172,7 +143,6 @@ impl<'a> StartClientOpts<'a> { address, resolved_address, proxy: None, - run_schedule_sender, event_sender, } } @@ -188,20 +158,12 @@ impl Client { /// World, and schedule runner function. /// You should only use this if you want to change these fields from the /// defaults, otherwise use [`Client::join`]. - pub fn new( - profile: GameProfile, - entity: Entity, - ecs: Arc>, - run_schedule_sender: mpsc::Sender<()>, - ) -> Self { + pub fn new(entity: Entity, ecs: Arc>) -> Self { Self { - profile, // default our id to 0, it'll be set later entity, ecs, - - run_schedule_sender, } } @@ -268,7 +230,6 @@ impl Client { address, resolved_address, proxy, - run_schedule_sender, event_sender, }: StartClientOpts<'_>, ) -> Result { @@ -291,92 +252,31 @@ impl Client { entity }; - // add the Account to the entity now so plugins can access it earlier - ecs.entity_mut(entity).insert(account.to_owned()); + let mut entity_mut = ecs.entity_mut(entity); + entity_mut.insert(( + InLoginState, + // add the Account to the entity now so plugins can access it earlier + account.to_owned(), + // localentity is always present for our clients, even if we're not actually logged + // in + LocalEntity, + )); + if let Some(event_sender) = event_sender { + // this is optional so we don't leak memory in case the user doesn't want to + // handle receiving packets + entity_mut.insert(LocalPlayerEvents(event_sender)); + } entity }; - let conn = if let Some(proxy) = proxy { + let mut conn = if let Some(proxy) = proxy { Connection::new_with_proxy(resolved_address, proxy).await? } else { Connection::new(resolved_address).await? }; - let (conn, game_profile) = - Self::handshake(ecs_lock.clone(), entity, conn, account, address).await?; + debug!("Created connection to {resolved_address:?}"); - // note that we send the proper packets in - // crate::configuration::handle_in_configuration_state - - let (read_conn, write_conn) = conn.into_split(); - let (read_conn, write_conn) = (read_conn.raw, write_conn.raw); - - // we did the handshake, so now we're connected to the server - - let mut ecs = ecs_lock.lock(); - - // we got the ConfigurationConnection, so the client is now connected :) - let client = Client::new( - game_profile.clone(), - entity, - ecs_lock.clone(), - run_schedule_sender.clone(), - ); - - let instance = Instance::default(); - let instance_holder = crate::local_player::InstanceHolder::new( - entity, - // default to an empty world, it'll be set correctly later when we - // get the login packet - Arc::new(RwLock::new(instance)), - ); - - let mut entity = ecs.entity_mut(entity); - entity.insert(( - // these stay when we switch to the game state - LocalPlayerBundle { - raw_connection: RawConnection::new( - run_schedule_sender, - ConnectionProtocol::Configuration, - read_conn, - write_conn, - ), - game_profile: GameProfileComponent(game_profile), - client_information: crate::ClientInformation::default(), - instance_holder, - metadata: azalea_entity::metadata::PlayerMetadataBundle::default(), - }, - InConfigState, - // this component is never removed - LocalEntity, - )); - if let Some(event_sender) = event_sender { - // this is optional so we don't leak memory in case the user - entity.insert(LocalPlayerEvents(event_sender)); - } - - Ok(client) - } - - /// Do a handshake with the server and get to the game state from the - /// initial handshake state. - /// - /// This will also automatically refresh the account's access token if - /// it's expired. - pub async fn handshake( - ecs_lock: Arc>, - entity: Entity, - mut conn: Connection, - account: &Account, - address: &ServerAddress, - ) -> Result< - ( - Connection, - GameProfile, - ), - JoinError, - > { - // handshake conn.write(ServerboundIntention { protocol_version: PROTOCOL_VERSION, hostname: address.host.clone(), @@ -384,147 +284,63 @@ impl Client { intention: ClientIntention::Login, }) .await?; - let mut conn = conn.login(); + let conn = conn.login(); - // this makes it so plugins can send an `SendLoginPacketEvent` event to the ecs - // and we'll send it to the server - let (ecs_packets_tx, mut ecs_packets_rx) = mpsc::unbounded_channel(); - ecs_lock.lock().entity_mut(entity).insert(( - LoginSendPacketQueue { tx: ecs_packets_tx }, - crate::packet::login::IgnoreQueryIds::default(), - InLoginState, - )); + let (read_conn, write_conn) = conn.into_split(); + let (read_conn, write_conn) = (read_conn.raw, write_conn.raw); - // login - conn.write(ServerboundHello { - name: account.username.clone(), - // TODO: pretty sure this should generate an offline-mode uuid instead of just - // Uuid::default() - profile_id: account.uuid.unwrap_or_default(), - }) - .await?; + // insert the client into the ecs so it finishes logging in + { + let mut ecs = ecs_lock.lock(); - let (conn, profile) = loop { - let packet = tokio::select! { - packet = conn.read() => packet?, - Some(packet) = ecs_packets_rx.recv() => { - // write this packet to the server - conn.write(packet).await?; - continue; - } - }; - - ecs_lock.lock().send_event(login::LoginPacketEvent { + let instance = Instance::default(); + let instance_holder = crate::local_player::InstanceHolder::new( entity, - packet: Arc::new(packet.clone()), - }); + // default to an empty world, it'll be set correctly later when we + // get the login packet + Arc::new(RwLock::new(instance)), + ); - match packet { - ClientboundLoginPacket::Hello(p) => { - debug!("Got encryption request"); - let Ok(e) = azalea_crypto::encrypt(&p.public_key, &p.challenge) else { - error!("Failed to encrypt the challenge from the server for {p:?}"); - continue; - }; + let mut entity = ecs.entity_mut(entity); + entity.insert(( + // these stay when we switch to the game state + LocalPlayerBundle { + raw_connection: RawConnection::new( + read_conn, + write_conn, + ConnectionProtocol::Login, + ), + client_information: crate::ClientInformation::default(), + instance_holder, + metadata: azalea_entity::metadata::PlayerMetadataBundle::default(), + }, + )); + } - if let Some(access_token) = &account.access_token { - // keep track of the number of times we tried - // authenticating so we can give up after too many - let mut attempts: usize = 1; + as_system::(&mut ecs_lock.lock(), |mut commands| { + commands.entity(entity).insert((InLoginState,)); + commands.trigger(SendLoginPacketEvent::new( + entity, + ServerboundHello { + name: account.username.clone(), + // TODO: pretty sure this should generate an offline-mode uuid instead of just + // Uuid::default() + profile_id: account.uuid.unwrap_or_default(), + }, + )) + }); - while let Err(e) = { - let access_token = access_token.lock().clone(); - conn.authenticate( - &access_token, - &account - .uuid - .expect("Uuid must be present if access token is present."), - e.secret_key, - &p, - ) - .await - } { - if attempts >= 2 { - // if this is the second attempt and we failed - // both times, give up - return Err(e.into()); - } - if matches!( - e, - ClientSessionServerError::InvalidSession - | ClientSessionServerError::ForbiddenOperation - ) { - // uh oh, we got an invalid session and have - // to reauthenticate now - account.refresh().await?; - } else { - return Err(e.into()); - } - attempts += 1; - } - } - - conn.write(ServerboundKey { - key_bytes: e.encrypted_public_key, - encrypted_challenge: e.encrypted_challenge, - }) - .await?; - - conn.set_encryption_key(e.secret_key); - } - ClientboundLoginPacket::LoginCompression(p) => { - debug!("Got compression request {:?}", p.compression_threshold); - conn.set_compression_threshold(p.compression_threshold); - } - ClientboundLoginPacket::LoginFinished(p) => { - debug!( - "Got profile {:?}. handshake is finished and we're now switching to the configuration state", - p.game_profile - ); - conn.write(ServerboundLoginAcknowledged {}).await?; - - break (conn.config(), p.game_profile); - } - ClientboundLoginPacket::LoginDisconnect(p) => { - debug!("Got disconnect {:?}", p); - return Err(JoinError::Disconnect { reason: p.reason }); - } - ClientboundLoginPacket::CustomQuery(p) => { - debug!("Got custom query {:?}", p); - // replying to custom query is done in - // packet::login::process_packet_events - } - ClientboundLoginPacket::CookieRequest(p) => { - debug!("Got cookie request {:?}", p); - - conn.write(packets::login::ServerboundCookieResponse { - key: p.key, - // cookies aren't implemented - payload: None, - }) - .await?; - } - } - }; - - ecs_lock - .lock() - .entity_mut(entity) - .remove::() - .remove::() - .remove::(); - - Ok((conn, profile)) + let client = Client::new(entity, ecs_lock.clone()); + Ok(client) } /// Write a packet directly to the server. - pub fn write_packet( - &self, - packet: impl Packet, - ) -> Result<(), crate::raw_connection::WritePacketError> { + pub fn write_packet(&self, packet: impl Packet) { let packet = packet.into_variant(); - self.raw_connection_mut(&mut self.ecs.lock()) - .write_packet(packet) + self.ecs + .lock() + .commands() + .trigger(SendPacketEvent::new(self.entity, packet)); } /// Disconnect this client from the server by ending all tasks. @@ -687,14 +503,11 @@ impl Client { /// view_distance: 2, /// ..Default::default() /// }) - /// .await?; + /// .await; /// # Ok(()) /// # } /// ``` - pub async fn set_client_information( - &self, - client_information: ClientInformation, - ) -> Result<(), crate::raw_connection::WritePacketError> { + pub async fn set_client_information(&self, client_information: ClientInformation) { { let mut ecs = self.ecs.lock(); let mut client_information_mut = self.query::<&mut ClientInformation>(&mut ecs); @@ -706,10 +519,10 @@ impl Client { "Sending client information (already logged in): {:?}", client_information ); - self.write_packet(azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { information: client_information.clone() })?; + self.write_packet(game::s_client_information::ServerboundClientInformation { + client_information, + }); } - - Ok(()) } } @@ -757,14 +570,14 @@ impl Client { /// This is a shortcut for /// `bot.component::().name.to_owned()`. pub fn username(&self) -> String { - self.component::().name.to_owned() + self.profile().name.to_owned() } /// Get the Minecraft UUID of this client. /// /// This is a shortcut for `bot.component::().uuid`. pub fn uuid(&self) -> Uuid { - self.component::().uuid + self.profile().uuid } /// Get a map of player UUIDs to their information in the tab list. @@ -774,6 +587,19 @@ impl Client { (*self.component::()).clone() } + /// Returns the [`GameProfile`] for our client. This contains your username, + /// UUID, and skin data. + /// + /// These values are set by the server upon login, which means they might + /// not match up with your actual game profile. Also, note that the username + /// and skin that gets displayed in-game will actually be the ones from + /// the tab list, which you can get from [`Self::tab_list`]. + /// + /// This as also available from the ECS as [`GameProfileComponent`]. + pub fn profile(&self) -> GameProfile { + (*self.component::()).clone() + } + /// A convenience function to get the Minecraft Uuid of a player by their /// username, if they're present in the tab list. /// @@ -854,15 +680,14 @@ impl Client { } } -/// The bundle of components that's shared when we're either in the -/// `configuration` or `game` state. +/// A bundle of components that's inserted right when we switch to the `login` +/// state and stay present on our clients until we disconnect. /// /// For the components that are only present in the `game` state, see /// [`JoinedClientBundle`]. #[derive(Bundle)] pub struct LocalPlayerBundle { pub raw_connection: RawConnection, - pub game_profile: GameProfileComponent, pub client_information: ClientInformation, pub instance_holder: InstanceHolder, @@ -922,11 +747,7 @@ impl Plugin for AzaleaPlugin { /// You can create your app with `App::new()`, but don't forget to add /// [`DefaultPlugins`]. #[doc(hidden)] -pub fn start_ecs_runner( - mut app: App, - run_schedule_receiver: mpsc::Receiver<()>, - run_schedule_sender: mpsc::Sender<()>, -) -> Arc> { +pub fn start_ecs_runner(mut app: App) -> Arc> { // this block is based on Bevy's default runner: // https://github.com/bevyengine/bevy/blob/390877cdae7a17095a75c8f9f1b4241fe5047e83/crates/bevy_app/src/schedule_runner.rs#L77-L85 if app.plugins_state() != PluginsState::Cleaned { @@ -949,35 +770,54 @@ pub fn start_ecs_runner( tokio::spawn(run_schedule_loop( ecs.clone(), *app.main().update_schedule.as_ref().unwrap(), - run_schedule_receiver, )); - tokio::spawn(tick_run_schedule_loop(run_schedule_sender)); ecs } -async fn run_schedule_loop( - ecs: Arc>, - outer_schedule_label: InternedScheduleLabel, - mut run_schedule_receiver: mpsc::Receiver<()>, -) { +async fn run_schedule_loop(ecs: Arc>, outer_schedule_label: InternedScheduleLabel) { + let mut last_update: Option = None; let mut last_tick: Option = None; + + // azalea runs the Update schedule at most 60 times per second to simulate + // framerate. unlike vanilla though, we also only handle packets during Updates + // due to everything running in ecs systems. + const UPDATE_DURATION_TARGET: Duration = Duration::from_micros(1_000_000 / 60); + // minecraft runs at 20 tps + const GAME_TICK_DURATION_TARGET: Duration = Duration::from_micros(1_000_000 / 20); + loop { - // whenever we get an event from run_schedule_receiver, run the schedule - run_schedule_receiver.recv().await; + // sleep until the next update if necessary + let now = Instant::now(); + if let Some(last_update) = last_update { + let elapsed = now.duration_since(last_update); + if elapsed < UPDATE_DURATION_TARGET { + time::sleep(UPDATE_DURATION_TARGET - elapsed).await; + } + } + last_update = Some(now); let mut ecs = ecs.lock(); // if last tick is None or more than 50ms ago, run the GameTick schedule ecs.run_schedule(outer_schedule_label); if last_tick - .map(|last_tick| last_tick.elapsed() > Duration::from_millis(50)) + .map(|last_tick| last_tick.elapsed() > GAME_TICK_DURATION_TARGET) .unwrap_or(true) { if let Some(last_tick) = &mut last_tick { - *last_tick += Duration::from_millis(50); + *last_tick += GAME_TICK_DURATION_TARGET; + + // if we're more than 10 ticks behind, set last_tick to now. + // vanilla doesn't do it in exactly the same way but it shouldn't really matter + if (now - *last_tick) > GAME_TICK_DURATION_TARGET * 10 { + warn!( + "GameTick is more than 10 ticks behind, skipping ticks so we don't have to burst too much" + ); + *last_tick = now; + } } else { - last_tick = Some(Instant::now()); + last_tick = Some(now); } ecs.run_schedule(GameTick); } @@ -986,23 +826,6 @@ async fn run_schedule_loop( } } -/// Send an event to run the schedule every 50 milliseconds. It will stop when -/// the receiver is dropped. -pub async fn tick_run_schedule_loop(run_schedule_sender: mpsc::Sender<()>) { - let mut game_tick_interval = time::interval(Duration::from_millis(50)); - // TODO: Minecraft bursts up to 10 ticks and then skips, we should too - game_tick_interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst); - - loop { - game_tick_interval.tick().await; - if let Err(TrySendError::Closed(())) = run_schedule_sender.try_send(()) { - error!("tick_run_schedule_loop failed because run_schedule_sender was closed"); - // the sender is closed so end the task - return; - } - } -} - pub struct AmbiguityLoggerPlugin; impl Plugin for AmbiguityLoggerPlugin { fn build(&self, app: &mut App) { @@ -1020,40 +843,3 @@ impl Plugin for AmbiguityLoggerPlugin { }); } } - -/// This plugin group will add all the default plugins necessary for Azalea to -/// work. -pub struct DefaultPlugins; - -impl PluginGroup for DefaultPlugins { - fn build(self) -> PluginGroupBuilder { - #[allow(unused_mut)] - let mut group = PluginGroupBuilder::start::() - .add(AmbiguityLoggerPlugin) - .add(TimePlugin) - .add(PacketPlugin) - .add(AzaleaPlugin) - .add(EntityPlugin) - .add(PhysicsPlugin) - .add(EventsPlugin) - .add(TaskPoolPlugin::default()) - .add(InventoryPlugin) - .add(ChatPlugin) - .add(DisconnectPlugin) - .add(MovementPlugin) - .add(InteractPlugin) - .add(RespawnPlugin) - .add(MiningPlugin) - .add(AttackPlugin) - .add(ChunksPlugin) - .add(TickEndPlugin) - .add(BrandPlugin) - .add(TickBroadcastPlugin) - .add(PongPlugin); - #[cfg(feature = "log")] - { - group = group.add(bevy_log::LogPlugin::default()); - } - group - } -} diff --git a/azalea-client/src/lib.rs b/azalea-client/src/lib.rs index 38b8f196..b83c24a8 100644 --- a/azalea-client/src/lib.rs +++ b/azalea-client/src/lib.rs @@ -15,7 +15,6 @@ mod local_player; pub mod ping; mod player; mod plugins; -pub mod raw_connection; #[doc(hidden)] pub mod test_simulation; @@ -23,8 +22,8 @@ pub mod test_simulation; pub use account::{Account, AccountOpts}; pub use azalea_protocol::common::client_information::ClientInformation; pub use client::{ - Client, DefaultPlugins, InConfigState, InGameState, JoinError, JoinedClientBundle, - LocalPlayerBundle, StartClientOpts, start_ecs_runner, + Client, InConfigState, InGameState, JoinError, JoinedClientBundle, LocalPlayerBundle, + StartClientOpts, start_ecs_runner, }; pub use events::Event; pub use local_player::{GameProfileComponent, Hunger, InstanceHolder, TabList}; diff --git a/azalea-client/src/plugins/chat/mod.rs b/azalea-client/src/plugins/chat/mod.rs index 3d03d24e..8562f3ce 100644 --- a/azalea-client/src/plugins/chat/mod.rs +++ b/azalea-client/src/plugins/chat/mod.rs @@ -152,7 +152,6 @@ impl Client { content: message.to_string(), kind: ChatKind::Message, }); - let _ = self.run_schedule_sender.try_send(()); } /// Send a command packet to the server. The `command` argument should not @@ -166,7 +165,6 @@ impl Client { content: command.to_string(), kind: ChatKind::Command, }); - let _ = self.run_schedule_sender.try_send(()); } /// Send a message in chat. @@ -183,7 +181,6 @@ impl Client { entity: self.entity, content: content.to_string(), }); - let _ = self.run_schedule_sender.try_send(()); } } diff --git a/azalea-client/src/plugins/connection.rs b/azalea-client/src/plugins/connection.rs new file mode 100644 index 00000000..b462535e --- /dev/null +++ b/azalea-client/src/plugins/connection.rs @@ -0,0 +1,369 @@ +use std::{fmt::Debug, io::Cursor, mem, sync::Arc}; + +use azalea_crypto::Aes128CfbEnc; +use azalea_protocol::{ + connect::{RawReadConnection, RawWriteConnection}, + packets::{ + ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket, + game::ClientboundGamePacket, login::ClientboundLoginPacket, + }, + read::{ReadPacketError, deserialize_packet}, + write::serialize_packet, +}; +use bevy_app::prelude::*; +use bevy_ecs::prelude::*; +use bevy_tasks::{IoTaskPool, futures_lite::future}; +use thiserror::Error; +use tokio::{ + io::AsyncWriteExt, + net::tcp::OwnedWriteHalf, + sync::mpsc::{self}, +}; +use tracing::{debug, error, info, trace}; + +use super::packet::{ + config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent, +}; +use crate::packet::{config, game, login}; + +pub struct ConnectionPlugin; +impl Plugin for ConnectionPlugin { + fn build(&self, app: &mut App) { + app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain()); + } +} + +pub fn read_packets(ecs: &mut World) { + // receive_game_packet_events: EventWriter, + let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>(); + let mut conn_query = ecs.query::<&mut RawConnection>(); + + let mut entities_handling_packets = Vec::new(); + let mut entities_with_injected_packets = Vec::new(); + for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) { + if !raw_conn.injected_clientbound_packets.is_empty() { + entities_with_injected_packets.push(( + entity, + mem::take(&mut raw_conn.injected_clientbound_packets), + )); + } + + if raw_conn.network.is_none() { + // no network connection, don't bother with the normal packet handling + continue; + } + + entities_handling_packets.push(entity); + } + + let mut queued_packet_events = QueuedPacketEvents::default(); + + // handle injected packets, see the comment on + // RawConnection::injected_clientbound_packets for more info + for (entity, raw_packets) in entities_with_injected_packets { + for raw_packet in raw_packets { + let conn = conn_query.get(ecs, entity).unwrap(); + let state = conn.state; + + trace!("Received injected packet with bytes: {raw_packet:?}"); + if let Err(e) = + handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events) + { + error!("Error reading injected packet: {e}"); + } + } + } + + for entity in entities_handling_packets { + loop { + let mut conn = conn_query.get_mut(ecs, entity).unwrap(); + let net_conn = conn.net_conn().unwrap(); + let read_res = net_conn.reader.try_read(); + let state = conn.state; + match read_res { + Ok(Some(raw_packet)) => { + let raw_packet = Arc::<[u8]>::from(raw_packet); + if let Err(e) = handle_raw_packet( + ecs, + &raw_packet, + entity, + state, + &mut queued_packet_events, + ) { + error!("Error reading packet: {e}"); + } + } + Ok(None) => { + // no packets available + break; + } + Err(err) => { + log_for_error(&err); + + if matches!( + &*err, + ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed + ) { + info!("Server closed connection"); + // ungraceful disconnect :( + conn.network = None; + // setting this will make us send a DisconnectEvent + conn.is_alive = false; + } + + break; + } + } + } + } + + queued_packet_events.send_events(ecs); +} + +fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) { + for mut conn in conn_query.iter_mut() { + if let Some(net_conn) = &mut conn.network { + // this needs to be done at some point every update to make sure packets are + // actually sent to the network + + net_conn.poll_writer(); + } + } +} + +#[derive(Default)] +pub struct QueuedPacketEvents { + login: Vec, + config: Vec, + game: Vec, +} +impl QueuedPacketEvents { + fn send_events(&mut self, ecs: &mut World) { + ecs.send_event_batch(self.login.drain(..)); + ecs.send_event_batch(self.config.drain(..)); + ecs.send_event_batch(self.game.drain(..)); + } +} + +fn log_for_error(error: &ReadPacketError) { + if !matches!(*error, ReadPacketError::ConnectionClosed) { + error!("Error reading packet from Client: {error:?}"); + } +} + +/// The client's connection to the server. +#[derive(Component)] +pub struct RawConnection { + /// The network connection to the server. + /// + /// This isn't guaranteed to be present, for example during the main packet + /// handlers or at all times during tests. + /// + /// You shouldn't rely on this. Instead, use the events for sending packets + /// like [`SendPacketEvent`](crate::packet::game::SendPacketEvent) / + /// [`SendConfigPacketEvent`](crate::packet::config::SendConfigPacketEvent) + /// / [`SendLoginPacketEvent`](crate::packet::login::SendLoginPacketEvent). + /// + /// To check if we haven't disconnected from the server, use + /// [`Self::is_alive`]. + network: Option, + pub state: ConnectionProtocol, + is_alive: bool, + + /// This exists for internal testing purposes and probably shouldn't be used + /// for normal bots. It's basically a way to make our client think it + /// received a packet from the server without needing to interact with the + /// network. + pub injected_clientbound_packets: Vec>, +} +impl RawConnection { + pub fn new( + reader: RawReadConnection, + writer: RawWriteConnection, + state: ConnectionProtocol, + ) -> Self { + let task_pool = IoTaskPool::get(); + + let (network_packet_writer_tx, network_packet_writer_rx) = + mpsc::unbounded_channel::>(); + + let writer_task = + task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream)); + + let mut conn = Self::new_networkless(state); + conn.network = Some(NetworkConnection { + reader, + enc_cipher: writer.enc_cipher, + network_packet_writer_tx, + writer_task, + }); + + conn + } + + pub fn new_networkless(state: ConnectionProtocol) -> Self { + Self { + network: None, + state, + is_alive: true, + injected_clientbound_packets: Vec::new(), + } + } + + pub fn is_alive(&self) -> bool { + self.is_alive + } + + /// Write a packet to the server without emitting any events. + /// + /// This is called by the handlers for [`SendPacketEvent`], + /// [`SendConfigPacketEvent`], and [`SendLoginPacketEvent`]. + /// + /// [`SendPacketEvent`]: crate::packet::game::SendPacketEvent + /// [`SendConfigPacketEvent`]: crate::packet::config::SendConfigPacketEvent + /// [`SendLoginPacketEvent`]: crate::packet::login::SendLoginPacketEvent + pub fn write( + &mut self, + packet: impl Packet

, + ) -> Result<(), WritePacketError> { + if let Some(network) = &mut self.network { + network.write(packet)?; + } else { + debug!( + "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead" + ); + } + Ok(()) + } + + pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> { + self.network.as_mut() + } +} + +pub fn handle_raw_packet( + ecs: &mut World, + raw_packet: &[u8], + entity: Entity, + state: ConnectionProtocol, + queued_packet_events: &mut QueuedPacketEvents, +) -> Result<(), Box> { + let stream = &mut Cursor::new(raw_packet); + match state { + ConnectionProtocol::Handshake => { + unreachable!() + } + ConnectionProtocol::Game => { + let packet = Arc::new(deserialize_packet::(stream)?); + trace!("Packet: {packet:?}"); + game::process_packet(ecs, entity, packet.as_ref()); + queued_packet_events + .game + .push(ReceiveGamePacketEvent { entity, packet }); + } + ConnectionProtocol::Status => { + unreachable!() + } + ConnectionProtocol::Login => { + let packet = Arc::new(deserialize_packet::(stream)?); + trace!("Packet: {packet:?}"); + login::process_packet(ecs, entity, &packet); + queued_packet_events + .login + .push(ReceiveLoginPacketEvent { entity, packet }); + } + ConnectionProtocol::Configuration => { + let packet = Arc::new(deserialize_packet::(stream)?); + trace!("Packet: {packet:?}"); + config::process_packet(ecs, entity, &packet); + queued_packet_events + .config + .push(ReceiveConfigPacketEvent { entity, packet }); + } + }; + + Ok(()) +} + +pub struct NetworkConnection { + reader: RawReadConnection, + // compression threshold is in the RawReadConnection + pub enc_cipher: Option, + + pub writer_task: bevy_tasks::Task<()>, + /// A queue of raw TCP packets to send. These will not be modified further, + /// they should already be serialized and encrypted and everything before + /// being added here. + network_packet_writer_tx: mpsc::UnboundedSender>, +} +impl NetworkConnection { + pub fn write( + &mut self, + packet: impl Packet

, + ) -> Result<(), WritePacketError> { + let packet = packet.into_variant(); + let raw_packet = serialize_packet(&packet)?; + self.write_raw(&raw_packet)?; + + Ok(()) + } + + pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> { + let network_packet = azalea_protocol::write::encode_to_network_packet( + raw_packet, + self.reader.compression_threshold, + &mut self.enc_cipher, + ); + self.network_packet_writer_tx + .send(network_packet.into_boxed_slice())?; + Ok(()) + } + + pub fn poll_writer(&mut self) { + let poll_once_res = future::poll_once(&mut self.writer_task); + future::block_on(poll_once_res); + } + + pub fn set_compression_threshold(&mut self, threshold: Option) { + trace!("Set compression threshold to {threshold:?}"); + self.reader.compression_threshold = threshold; + } + /// Set the encryption key that is used to encrypt and decrypt packets. It's + /// the same for both reading and writing. + pub fn set_encryption_key(&mut self, key: [u8; 16]) { + trace!("Enabled protocol encryption"); + let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key); + self.reader.dec_cipher = Some(dec_cipher); + self.enc_cipher = Some(enc_cipher); + } +} + +async fn write_task( + mut network_packet_writer_rx: mpsc::UnboundedReceiver>, + mut write_half: OwnedWriteHalf, +) { + while let Some(network_packet) = network_packet_writer_rx.recv().await { + if let Err(e) = write_half.write_all(&network_packet).await { + debug!("Error writing packet to server: {e}"); + break; + }; + } + trace!("write task is done"); +} + +#[derive(Error, Debug)] +pub enum WritePacketError { + #[error("Wrong protocol state: expected {expected:?}, got {got:?}")] + WrongState { + expected: ConnectionProtocol, + got: ConnectionProtocol, + }, + #[error(transparent)] + Encoding(#[from] azalea_protocol::write::PacketEncodeError), + #[error(transparent)] + SendError { + #[from] + #[backtrace] + source: mpsc::error::SendError>, + }, +} diff --git a/azalea-client/src/plugins/disconnect.rs b/azalea-client/src/plugins/disconnect.rs index bd10ac75..09606435 100644 --- a/azalea-client/src/plugins/disconnect.rs +++ b/azalea-client/src/plugins/disconnect.rs @@ -16,8 +16,8 @@ use derive_more::Deref; use tracing::trace; use crate::{ - InstanceHolder, client::JoinedClientBundle, events::LocalPlayerEvents, - raw_connection::RawConnection, + InstanceHolder, client::JoinedClientBundle, connection::RawConnection, + events::LocalPlayerEvents, }; pub struct DisconnectPlugin; diff --git a/azalea-client/src/plugins/events.rs b/azalea-client/src/plugins/events.rs index 64dcf4f5..85f50ea5 100644 --- a/azalea-client/src/plugins/events.rs +++ b/azalea-client/src/plugins/events.rs @@ -27,7 +27,7 @@ use crate::{ chat::{ChatPacket, ChatReceivedEvent}, disconnect::DisconnectEvent, packet::game::{ - AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceivePacketEvent, RemovePlayerEvent, + AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceiveGamePacketEvent, RemovePlayerEvent, UpdatePlayerEvent, }, }; @@ -157,7 +157,7 @@ impl Plugin for EventsPlugin { ) .add_systems( PreUpdate, - init_listener.before(crate::packet::game::process_packet_events), + init_listener.before(super::connection::read_packets), ) .add_systems(GameTick, tick_listener); } @@ -217,7 +217,7 @@ pub fn tick_listener(query: Query<&LocalPlayerEvents, With>) { pub fn packet_listener( query: Query<&LocalPlayerEvents>, - mut events: EventReader, + mut events: EventReader, ) { for event in events.read() { if let Ok(local_player_events) = query.get(event.entity) { diff --git a/azalea-client/src/plugins/login.rs b/azalea-client/src/plugins/login.rs new file mode 100644 index 00000000..385e9651 --- /dev/null +++ b/azalea-client/src/plugins/login.rs @@ -0,0 +1,152 @@ +use azalea_auth::sessionserver::ClientSessionServerError; +use azalea_protocol::packets::login::{ + ClientboundHello, ServerboundCustomQueryAnswer, ServerboundKey, +}; +use bevy_app::prelude::*; +use bevy_ecs::prelude::*; +use bevy_tasks::{IoTaskPool, Task, futures_lite::future}; +use tracing::{debug, error, trace}; + +use super::{ + connection::RawConnection, + packet::login::{ReceiveCustomQueryEvent, ReceiveHelloEvent, SendLoginPacketEvent}, +}; +use crate::{Account, JoinError}; + +/// Some systems that run during the `login` state. +pub struct LoginPlugin; +impl Plugin for LoginPlugin { + fn build(&self, app: &mut App) { + app.add_observer(handle_receive_hello_event) + .add_systems(Update, (poll_auth_task, reply_to_custom_queries)); + } +} + +fn handle_receive_hello_event(trigger: Trigger, mut commands: Commands) { + let task_pool = IoTaskPool::get(); + + let account = trigger.account.clone(); + let packet = trigger.packet.clone(); + let player = trigger.entity(); + + let task = task_pool.spawn(auth_with_account(account, packet)); + commands.entity(player).insert(AuthTask(task)); +} + +fn poll_auth_task( + mut commands: Commands, + mut query: Query<(Entity, &mut AuthTask, &mut RawConnection)>, +) { + for (entity, mut auth_task, mut raw_conn) in query.iter_mut() { + if let Some(poll_res) = future::block_on(future::poll_once(&mut auth_task.0)) { + debug!("Finished auth"); + commands.entity(entity).remove::(); + match poll_res { + Ok((packet, private_key)) => { + // we use this instead of SendLoginPacketEvent to ensure that it's sent right + // before encryption is enabled. i guess another option would be to make a + // Trigger+observer for set_encryption_key; the current implementation is + // simpler though. + if let Err(e) = raw_conn.write(packet) { + error!("Error sending key packet: {e:?}"); + } + if let Some(net_conn) = raw_conn.net_conn() { + net_conn.set_encryption_key(private_key); + } + } + Err(err) => { + error!("Error during authentication: {err:?}"); + } + } + } + } +} + +type PrivateKey = [u8; 16]; + +#[derive(Component)] +pub struct AuthTask(Task>); + +pub async fn auth_with_account( + account: Account, + packet: ClientboundHello, +) -> Result<(ServerboundKey, PrivateKey), JoinError> { + let Ok(encrypt_res) = azalea_crypto::encrypt(&packet.public_key, &packet.challenge) else { + return Err(JoinError::EncryptionError(packet)); + }; + let key_packet = ServerboundKey { + key_bytes: encrypt_res.encrypted_public_key, + encrypted_challenge: encrypt_res.encrypted_challenge, + }; + let private_key = encrypt_res.secret_key; + + let Some(access_token) = &account.access_token else { + // offline mode account, no need to do auth + return Ok((key_packet, private_key)); + }; + + // keep track of the number of times we tried authenticating so we can give up + // after too many + let mut attempts: usize = 1; + + while let Err(err) = { + let access_token = access_token.lock().clone(); + + let uuid = &account + .uuid + .expect("Uuid must be present if access token is present."); + + // this is necessary since reqwest usually depends on tokio and we're using + // `futures` here + async_compat::Compat::new(async { + azalea_auth::sessionserver::join( + &access_token, + &packet.public_key, + &private_key, + uuid, + &packet.server_id, + ) + .await + }) + .await + } { + if attempts >= 2 { + // if this is the second attempt and we failed + // both times, give up + return Err(err.into()); + } + if matches!( + err, + ClientSessionServerError::InvalidSession | ClientSessionServerError::ForbiddenOperation + ) { + // uh oh, we got an invalid session and have + // to reauthenticate now + account.refresh().await?; + } else { + return Err(err.into()); + } + attempts += 1; + } + + Ok((key_packet, private_key)) +} + +pub fn reply_to_custom_queries( + mut commands: Commands, + mut events: EventReader, +) { + for event in events.read() { + trace!("Maybe replying to custom query: {event:?}"); + if event.disabled { + continue; + } + + commands.trigger(SendLoginPacketEvent::new( + event.entity, + ServerboundCustomQueryAnswer { + transaction_id: event.packet.transaction_id, + data: None, + }, + )); + } +} diff --git a/azalea-client/src/plugins/mod.rs b/azalea-client/src/plugins/mod.rs index 3b047ccb..16b34205 100644 --- a/azalea-client/src/plugins/mod.rs +++ b/azalea-client/src/plugins/mod.rs @@ -1,11 +1,15 @@ +use bevy_app::{PluginGroup, PluginGroupBuilder}; + pub mod attack; pub mod brand; pub mod chat; pub mod chunks; +pub mod connection; pub mod disconnect; pub mod events; pub mod interact; pub mod inventory; +pub mod login; pub mod mining; pub mod movement; pub mod packet; @@ -14,3 +18,42 @@ pub mod respawn; pub mod task_pool; pub mod tick_broadcast; pub mod tick_end; + +/// This plugin group will add all the default plugins necessary for Azalea to +/// work. +pub struct DefaultPlugins; + +impl PluginGroup for DefaultPlugins { + fn build(self) -> PluginGroupBuilder { + #[allow(unused_mut)] + let mut group = PluginGroupBuilder::start::() + .add(crate::client::AmbiguityLoggerPlugin) + .add(bevy_time::TimePlugin) + .add(packet::PacketPlugin) + .add(crate::client::AzaleaPlugin) + .add(azalea_entity::EntityPlugin) + .add(azalea_physics::PhysicsPlugin) + .add(events::EventsPlugin) + .add(task_pool::TaskPoolPlugin::default()) + .add(inventory::InventoryPlugin) + .add(chat::ChatPlugin) + .add(disconnect::DisconnectPlugin) + .add(movement::MovementPlugin) + .add(interact::InteractPlugin) + .add(respawn::RespawnPlugin) + .add(mining::MiningPlugin) + .add(attack::AttackPlugin) + .add(chunks::ChunksPlugin) + .add(tick_end::TickEndPlugin) + .add(brand::BrandPlugin) + .add(tick_broadcast::TickBroadcastPlugin) + .add(pong::PongPlugin) + .add(connection::ConnectionPlugin) + .add(login::LoginPlugin); + #[cfg(feature = "log")] + { + group = group.add(bevy_log::LogPlugin::default()); + } + group + } +} diff --git a/azalea-client/src/plugins/packet/config/events.rs b/azalea-client/src/plugins/packet/config/events.rs index 24a1157b..a9237e75 100644 --- a/azalea-client/src/plugins/packet/config/events.rs +++ b/azalea-client/src/plugins/packet/config/events.rs @@ -1,23 +1,20 @@ -use std::io::Cursor; +use std::sync::Arc; -use azalea_protocol::{ - packets::{ - Packet, - config::{ClientboundConfigPacket, ServerboundConfigPacket}, - }, - read::deserialize_packet, +use azalea_protocol::packets::{ + Packet, + config::{ClientboundConfigPacket, ServerboundConfigPacket}, }; use bevy_ecs::prelude::*; use tracing::{debug, error}; -use crate::{InConfigState, raw_connection::RawConnection}; +use crate::{InConfigState, connection::RawConnection}; #[derive(Event, Debug, Clone)] pub struct ReceiveConfigPacketEvent { /// The client entity that received the packet. pub entity: Entity, /// The packet that was actually received. - pub packet: ClientboundConfigPacket, + pub packet: Arc, } /// An event for sending a packet to the server while we're in the @@ -39,7 +36,7 @@ pub fn handle_outgoing_packets_observer( mut query: Query<(&mut RawConnection, Option<&InConfigState>)>, ) { let event = trigger.event(); - if let Ok((raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) { + if let Ok((mut raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) { if in_configuration_state.is_none() { error!( "Tried to send a configuration packet {:?} while not in configuration state", @@ -47,8 +44,8 @@ pub fn handle_outgoing_packets_observer( ); return; } - debug!("Sending packet: {:?}", event.packet); - if let Err(e) = raw_conn.write_packet(event.packet.clone()) { + debug!("Sending config packet: {:?}", event.packet); + if let Err(e) = raw_conn.write(event.packet.clone()) { error!("Failed to send packet: {e}"); } } @@ -64,61 +61,6 @@ pub fn handle_outgoing_packets( } } -pub fn emit_receive_config_packet_events( - query: Query<(Entity, &RawConnection), With>, - mut packet_events: ResMut>, -) { - // we manually clear and send the events at the beginning of each update - // since otherwise it'd cause issues with events in process_packet_events - // running twice - packet_events.clear(); - for (player_entity, raw_conn) in &query { - let packets_lock = raw_conn.incoming_packet_queue(); - let mut packets = packets_lock.lock(); - if !packets.is_empty() { - let mut packets_read = 0; - for raw_packet in packets.iter() { - packets_read += 1; - let packet = match deserialize_packet::(&mut Cursor::new( - raw_packet, - )) { - Ok(packet) => packet, - Err(err) => { - error!("failed to read packet: {err:?}"); - debug!("packet bytes: {raw_packet:?}"); - continue; - } - }; - - let should_interrupt = packet_interrupts(&packet); - - packet_events.send(ReceiveConfigPacketEvent { - entity: player_entity, - packet, - }); - - if should_interrupt { - break; - } - } - packets.drain(0..packets_read); - } - } -} - -/// Whether the given packet should make us stop deserializing the received -/// packets until next update. -/// -/// This is used for packets that can switch the client state. -fn packet_interrupts(packet: &ClientboundConfigPacket) -> bool { - matches!( - packet, - ClientboundConfigPacket::FinishConfiguration(_) - | ClientboundConfigPacket::Disconnect(_) - | ClientboundConfigPacket::Transfer(_) - ) -} - /// A Bevy trigger that's sent when our client receives a [`ClientboundPing`] /// packet in the config state. /// diff --git a/azalea-client/src/plugins/packet/config/mod.rs b/azalea-client/src/plugins/packet/config/mod.rs index ae601793..910019a6 100644 --- a/azalea-client/src/plugins/packet/config/mod.rs +++ b/azalea-client/src/plugins/packet/config/mod.rs @@ -1,65 +1,61 @@ mod events; +use std::io::Cursor; + use azalea_entity::LocalEntity; use azalea_protocol::packets::ConnectionProtocol; use azalea_protocol::packets::config::*; +use azalea_protocol::read::ReadPacketError; +use azalea_protocol::read::deserialize_packet; use bevy_ecs::prelude::*; -use bevy_ecs::system::SystemState; pub use events::*; use tracing::{debug, warn}; use super::as_system; use crate::client::InConfigState; +use crate::connection::RawConnection; use crate::disconnect::DisconnectEvent; use crate::packet::game::KeepAliveEvent; use crate::packet::game::ResourcePackEvent; -use crate::raw_connection::RawConnection; use crate::{InstanceHolder, declare_packet_handlers}; -pub fn process_packet_events(ecs: &mut World) { - let mut events_owned = Vec::new(); - let mut system_state: SystemState> = - SystemState::new(ecs); - let mut events = system_state.get_mut(ecs); - for ReceiveConfigPacketEvent { - entity: player_entity, - packet, - } in events.read() - { - // we do this so `ecs` isn't borrowed for the whole loop - events_owned.push((*player_entity, packet.clone())); - } - for (player_entity, packet) in events_owned { - let mut handler = ConfigPacketHandler { - player: player_entity, - ecs, - }; +pub fn process_raw_packet( + ecs: &mut World, + player: Entity, + raw_packet: &[u8], +) -> Result<(), Box> { + let packet = deserialize_packet(&mut Cursor::new(raw_packet))?; + process_packet(ecs, player, &packet); + Ok(()) +} - declare_packet_handlers!( - ClientboundConfigPacket, - packet, - handler, - [ - cookie_request, - custom_payload, - disconnect, - finish_configuration, - keep_alive, - ping, - reset_chat, - registry_data, - resource_pack_pop, - resource_pack_push, - store_cookie, - transfer, - update_enabled_features, - update_tags, - select_known_packs, - custom_report_details, - server_links, - ] - ); - } +pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundConfigPacket) { + let mut handler = ConfigPacketHandler { player, ecs }; + + declare_packet_handlers!( + ClientboundConfigPacket, + packet, + handler, + [ + cookie_request, + custom_payload, + disconnect, + finish_configuration, + keep_alive, + ping, + reset_chat, + registry_data, + resource_pack_pop, + resource_pack_push, + store_cookie, + transfer, + update_enabled_features, + update_tags, + select_known_packs, + custom_report_details, + server_links, + ] + ); } pub struct ConfigPacketHandler<'a> { @@ -67,44 +63,45 @@ pub struct ConfigPacketHandler<'a> { pub player: Entity, } impl ConfigPacketHandler<'_> { - pub fn registry_data(&mut self, p: ClientboundRegistryData) { + pub fn registry_data(&mut self, p: &ClientboundRegistryData) { as_system::>(self.ecs, |mut query| { let instance_holder = query.get_mut(self.player).unwrap(); let mut instance = instance_holder.instance.write(); // add the new registry data - instance.registries.append(p.registry_id, p.entries); + instance + .registries + .append(p.registry_id.clone(), p.entries.clone()); }); } - pub fn custom_payload(&mut self, p: ClientboundCustomPayload) { + pub fn custom_payload(&mut self, p: &ClientboundCustomPayload) { debug!("Got custom payload packet {p:?}"); } - pub fn disconnect(&mut self, p: ClientboundDisconnect) { + pub fn disconnect(&mut self, p: &ClientboundDisconnect) { warn!("Got disconnect packet {p:?}"); as_system::>(self.ecs, |mut events| { events.send(DisconnectEvent { entity: self.player, - reason: Some(p.reason), + reason: Some(p.reason.clone()), }); }); } - pub fn finish_configuration(&mut self, p: ClientboundFinishConfiguration) { - debug!("got FinishConfiguration packet: {p:?}"); + pub fn finish_configuration(&mut self, _p: &ClientboundFinishConfiguration) { + debug!("got FinishConfiguration packet"); as_system::<(Commands, Query<&mut RawConnection>)>( self.ecs, |(mut commands, mut query)| { let mut raw_conn = query.get_mut(self.player).unwrap(); - raw_conn - .write_packet(ServerboundFinishConfiguration) - .expect( - "we should be in the right state and encoding this packet shouldn't fail", - ); - raw_conn.set_state(ConnectionProtocol::Game); + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundFinishConfiguration, + )); + raw_conn.state = ConnectionProtocol::Game; // these components are added now that we're going to be in the Game state commands @@ -120,34 +117,33 @@ impl ConfigPacketHandler<'_> { ); } - pub fn keep_alive(&mut self, p: ClientboundKeepAlive) { + pub fn keep_alive(&mut self, p: &ClientboundKeepAlive) { debug!( "Got keep alive packet (in configuration) {p:?} for {:?}", self.player ); - as_system::<(Query<&RawConnection>, EventWriter<_>)>(self.ecs, |(query, mut events)| { - let raw_conn = query.get(self.player).unwrap(); - + as_system::<(Commands, EventWriter<_>)>(self.ecs, |(mut commands, mut events)| { events.send(KeepAliveEvent { entity: self.player, id: p.id, }); - raw_conn - .write_packet(ServerboundKeepAlive { id: p.id }) - .unwrap(); + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundKeepAlive { id: p.id }, + )); }); } - pub fn ping(&mut self, p: ClientboundPing) { + pub fn ping(&mut self, p: &ClientboundPing) { debug!("Got ping packet (in configuration) {p:?}"); as_system::(self.ecs, |mut commands| { - commands.trigger_targets(ConfigPingEvent(p), self.player); + commands.trigger_targets(ConfigPingEvent(p.clone()), self.player); }); } - pub fn resource_pack_push(&mut self, p: ClientboundResourcePackPush) { + pub fn resource_pack_push(&mut self, p: &ClientboundResourcePackPush) { debug!("Got resource pack push packet {p:?}"); as_system::>(self.ecs, |mut events| { @@ -162,66 +158,64 @@ impl ConfigPacketHandler<'_> { }); } - pub fn resource_pack_pop(&mut self, p: ClientboundResourcePackPop) { + pub fn resource_pack_pop(&mut self, p: &ClientboundResourcePackPop) { debug!("Got resource pack pop packet {p:?}"); } - pub fn update_enabled_features(&mut self, p: ClientboundUpdateEnabledFeatures) { + pub fn update_enabled_features(&mut self, p: &ClientboundUpdateEnabledFeatures) { debug!("Got update enabled features packet {p:?}"); } - pub fn update_tags(&mut self, _p: ClientboundUpdateTags) { + pub fn update_tags(&mut self, _p: &ClientboundUpdateTags) { debug!("Got update tags packet"); } - pub fn cookie_request(&mut self, p: ClientboundCookieRequest) { + pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) { debug!("Got cookie request packet {p:?}"); - as_system::>(self.ecs, |query| { - let raw_conn = query.get(self.player).unwrap(); - - raw_conn - .write_packet(ServerboundCookieResponse { - key: p.key, + as_system::(self.ecs, |mut commands| { + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundCookieResponse { + key: p.key.clone(), // cookies aren't implemented payload: None, - }) - .unwrap(); + }, + )); }); } - pub fn reset_chat(&mut self, p: ClientboundResetChat) { + pub fn reset_chat(&mut self, p: &ClientboundResetChat) { debug!("Got reset chat packet {p:?}"); } - pub fn store_cookie(&mut self, p: ClientboundStoreCookie) { + pub fn store_cookie(&mut self, p: &ClientboundStoreCookie) { debug!("Got store cookie packet {p:?}"); } - pub fn transfer(&mut self, p: ClientboundTransfer) { + pub fn transfer(&mut self, p: &ClientboundTransfer) { debug!("Got transfer packet {p:?}"); } - pub fn select_known_packs(&mut self, p: ClientboundSelectKnownPacks) { + pub fn select_known_packs(&mut self, p: &ClientboundSelectKnownPacks) { debug!("Got select known packs packet {p:?}"); - as_system::>(self.ecs, |query| { - let raw_conn = query.get(self.player).unwrap(); - + as_system::(self.ecs, |mut commands| { // resource pack management isn't implemented - raw_conn - .write_packet(ServerboundSelectKnownPacks { + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundSelectKnownPacks { known_packs: vec![], - }) - .unwrap(); + }, + )); }); } - pub fn server_links(&mut self, p: ClientboundServerLinks) { + pub fn server_links(&mut self, p: &ClientboundServerLinks) { debug!("Got server links packet {p:?}"); } - pub fn custom_report_details(&mut self, p: ClientboundCustomReportDetails) { + pub fn custom_report_details(&mut self, p: &ClientboundCustomReportDetails) { debug!("Got custom report details packet {p:?}"); } } diff --git a/azalea-client/src/plugins/packet/game/events.rs b/azalea-client/src/plugins/packet/game/events.rs index ad81f9bd..68bfb4b3 100644 --- a/azalea-client/src/plugins/packet/game/events.rs +++ b/azalea-client/src/plugins/packet/game/events.rs @@ -1,33 +1,27 @@ -use std::{ - io::Cursor, - sync::{Arc, Weak}, -}; +use std::sync::{Arc, Weak}; use azalea_chat::FormattedText; use azalea_core::resource_location::ResourceLocation; -use azalea_protocol::{ - packets::{ - Packet, - game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket}, - }, - read::deserialize_packet, +use azalea_protocol::packets::{ + Packet, + game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket}, }; use azalea_world::Instance; use bevy_ecs::prelude::*; use parking_lot::RwLock; -use tracing::{debug, error}; +use tracing::error; use uuid::Uuid; -use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection}; +use crate::{PlayerInfo, client::InGameState, connection::RawConnection}; /// An event that's sent when we receive a packet. /// ``` -/// # use azalea_client::packet::game::ReceivePacketEvent; +/// # use azalea_client::packet::game::ReceiveGamePacketEvent; /// # use azalea_protocol::packets::game::ClientboundGamePacket; /// # use bevy_ecs::event::EventReader; /// -/// fn handle_packets(mut events: EventReader) { -/// for ReceivePacketEvent { +/// fn handle_packets(mut events: EventReader) { +/// for ReceiveGamePacketEvent { /// entity, /// packet, /// } in events.read() { @@ -41,7 +35,7 @@ use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection}; /// } /// ``` #[derive(Event, Debug, Clone)] -pub struct ReceivePacketEvent { +pub struct ReceiveGamePacketEvent { /// The client entity that received the packet. pub entity: Entity, /// The packet that was actually received. @@ -67,7 +61,7 @@ pub fn handle_outgoing_packets_observer( ) { let event = trigger.event(); - if let Ok((raw_connection, in_game_state)) = query.get_mut(event.sent_by) { + if let Ok((mut raw_connection, in_game_state)) = query.get_mut(event.sent_by) { if in_game_state.is_none() { error!( "Tried to send a game packet {:?} while not in game state", @@ -76,8 +70,8 @@ pub fn handle_outgoing_packets_observer( return; } - // debug!("Sending packet: {:?}", event.packet); - if let Err(e) = raw_connection.write_packet(event.packet.clone()) { + // debug!("Sending game packet: {:?}", event.packet); + if let Err(e) = raw_connection.write(event.packet.clone()) { error!("Failed to send packet: {e}"); } } @@ -91,61 +85,6 @@ pub fn handle_outgoing_packets(mut commands: Commands, mut events: EventReader>, - mut packet_events: ResMut>, -) { - // we manually clear and send the events at the beginning of each update - // since otherwise it'd cause issues with events in process_packet_events - // running twice - packet_events.clear(); - for (player_entity, raw_connection) in &query { - let packets_lock = raw_connection.incoming_packet_queue(); - let mut packets = packets_lock.lock(); - if !packets.is_empty() { - let mut packets_read = 0; - for raw_packet in packets.iter() { - packets_read += 1; - let packet = - match deserialize_packet::(&mut Cursor::new(raw_packet)) - { - Ok(packet) => packet, - Err(err) => { - error!("failed to read packet: {err:?}"); - debug!("packet bytes: {raw_packet:?}"); - continue; - } - }; - - let should_interrupt = packet_interrupts(&packet); - - packet_events.send(ReceivePacketEvent { - entity: player_entity, - packet: Arc::new(packet), - }); - - if should_interrupt { - break; - } - } - packets.drain(0..packets_read); - } - } -} - -/// Whether the given packet should make us stop deserializing the received -/// packets until next update. -/// -/// This is used for packets that can switch the client state. -fn packet_interrupts(packet: &ClientboundGamePacket) -> bool { - matches!( - packet, - ClientboundGamePacket::StartConfiguration(_) - | ClientboundGamePacket::Disconnect(_) - | ClientboundGamePacket::Transfer(_) - ) -} - /// A player joined the game (or more specifically, was added to the tab /// list of a local player). #[derive(Event, Debug, Clone)] diff --git a/azalea-client/src/plugins/packet/game/mod.rs b/azalea-client/src/plugins/packet/game/mod.rs index 8d896e65..60531d3b 100644 --- a/azalea-client/src/plugins/packet/game/mod.rs +++ b/azalea-client/src/plugins/packet/game/mod.rs @@ -32,171 +32,150 @@ use crate::{ }, movement::{KnockbackEvent, KnockbackType}, packet::as_system, - raw_connection::RawConnection, }; -pub fn process_packet_events(ecs: &mut World) { - let mut events_owned = Vec::<(Entity, Arc)>::new(); +pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundGamePacket) { + let mut handler = GamePacketHandler { player, ecs }; - { - let mut system_state = SystemState::>::new(ecs); - let mut events = system_state.get_mut(ecs); - for ReceivePacketEvent { - entity: player_entity, - packet, - } in events.read() - { - // we do this so `ecs` isn't borrowed for the whole loop - events_owned.push((*player_entity, packet.clone())); - } - } - - for (player_entity, packet) in events_owned { - let mut handler = GamePacketHandler { - player: player_entity, - ecs, - }; - - // the order of these doesn't matter, that's decided by the protocol library - declare_packet_handlers!( - ClientboundGamePacket, - packet.as_ref(), - handler, - [ - login, - set_chunk_cache_radius, - chunk_batch_start, - chunk_batch_finished, - custom_payload, - change_difficulty, - commands, - player_abilities, - set_cursor_item, - update_tags, - disconnect, - update_recipes, - entity_event, - player_position, - player_info_update, - player_info_remove, - set_chunk_cache_center, - chunks_biomes, - light_update, - level_chunk_with_light, - add_entity, - set_entity_data, - update_attributes, - set_entity_motion, - set_entity_link, - initialize_border, - set_time, - set_default_spawn_position, - set_health, - set_experience, - teleport_entity, - update_advancements, - rotate_head, - move_entity_pos, - move_entity_pos_rot, - move_entity_rot, - keep_alive, - remove_entities, - player_chat, - system_chat, - disguised_chat, - sound, - level_event, - block_update, - animate, - section_blocks_update, - game_event, - level_particles, - server_data, - set_equipment, - update_mob_effect, - award_stats, - block_changed_ack, - block_destruction, - block_entity_data, - block_event, - boss_event, - command_suggestions, - container_set_content, - container_set_data, - container_set_slot, - container_close, - cooldown, - custom_chat_completions, - delete_chat, - explode, - forget_level_chunk, - horse_screen_open, - map_item_data, - merchant_offers, - move_vehicle, - open_book, - open_screen, - open_sign_editor, - ping, - place_ghost_recipe, - player_combat_end, - player_combat_enter, - player_combat_kill, - player_look_at, - remove_mob_effect, - resource_pack_push, - resource_pack_pop, - respawn, - start_configuration, - entity_position_sync, - select_advancements_tab, - set_action_bar_text, - set_border_center, - set_border_lerp_size, - set_border_size, - set_border_warning_delay, - set_border_warning_distance, - set_camera, - set_display_objective, - set_objective, - set_passengers, - set_player_team, - set_score, - set_simulation_distance, - set_subtitle_text, - set_title_text, - set_titles_animation, - clear_titles, - sound_entity, - stop_sound, - tab_list, - tag_query, - take_item_entity, - bundle_delimiter, - damage_event, - hurt_animation, - ticking_state, - ticking_step, - reset_score, - cookie_request, - debug_sample, - pong_response, - store_cookie, - transfer, - move_minecart_along_track, - set_held_slot, - set_player_inventory, - projectile_power, - custom_report_details, - server_links, - player_rotation, - recipe_book_add, - recipe_book_remove, - recipe_book_settings, - test_instance_block_status, - ] - ); - } + // the order of these doesn't matter, that's decided by the protocol library + declare_packet_handlers!( + ClientboundGamePacket, + packet, + handler, + [ + login, + set_chunk_cache_radius, + chunk_batch_start, + chunk_batch_finished, + custom_payload, + change_difficulty, + commands, + player_abilities, + set_cursor_item, + update_tags, + disconnect, + update_recipes, + entity_event, + player_position, + player_info_update, + player_info_remove, + set_chunk_cache_center, + chunks_biomes, + light_update, + level_chunk_with_light, + add_entity, + set_entity_data, + update_attributes, + set_entity_motion, + set_entity_link, + initialize_border, + set_time, + set_default_spawn_position, + set_health, + set_experience, + teleport_entity, + update_advancements, + rotate_head, + move_entity_pos, + move_entity_pos_rot, + move_entity_rot, + keep_alive, + remove_entities, + player_chat, + system_chat, + disguised_chat, + sound, + level_event, + block_update, + animate, + section_blocks_update, + game_event, + level_particles, + server_data, + set_equipment, + update_mob_effect, + award_stats, + block_changed_ack, + block_destruction, + block_entity_data, + block_event, + boss_event, + command_suggestions, + container_set_content, + container_set_data, + container_set_slot, + container_close, + cooldown, + custom_chat_completions, + delete_chat, + explode, + forget_level_chunk, + horse_screen_open, + map_item_data, + merchant_offers, + move_vehicle, + open_book, + open_screen, + open_sign_editor, + ping, + place_ghost_recipe, + player_combat_end, + player_combat_enter, + player_combat_kill, + player_look_at, + remove_mob_effect, + resource_pack_push, + resource_pack_pop, + respawn, + start_configuration, + entity_position_sync, + select_advancements_tab, + set_action_bar_text, + set_border_center, + set_border_lerp_size, + set_border_size, + set_border_warning_delay, + set_border_warning_distance, + set_camera, + set_display_objective, + set_objective, + set_passengers, + set_player_team, + set_score, + set_simulation_distance, + set_subtitle_text, + set_title_text, + set_titles_animation, + clear_titles, + sound_entity, + stop_sound, + tab_list, + tag_query, + take_item_entity, + bundle_delimiter, + damage_event, + hurt_animation, + ticking_state, + ticking_step, + reset_score, + cookie_request, + debug_sample, + pong_response, + store_cookie, + transfer, + move_minecart_along_track, + set_held_slot, + set_player_inventory, + projectile_power, + custom_report_details, + server_links, + player_rotation, + recipe_book_add, + recipe_book_remove, + recipe_book_settings, + test_instance_block_status, + ] + ); } pub struct GamePacketHandler<'a> { @@ -342,7 +321,7 @@ impl GamePacketHandler<'_> { client_information ); commands.trigger(SendPacketEvent::new(self.player, - azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { information: client_information.clone() }, + azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { client_information: client_information.clone() }, )); }, ); @@ -1506,9 +1485,11 @@ impl GamePacketHandler<'_> { pub fn start_configuration(&mut self, _p: &ClientboundStartConfiguration) { debug!("Got start configuration packet"); - as_system::<(Query<&RawConnection>, Commands)>(self.ecs, |(query, mut commands)| { - let raw_conn = query.get(self.player).unwrap(); - let _ = raw_conn.write_packet(ServerboundConfigurationAcknowledged); + as_system::(self.ecs, |mut commands| { + commands.trigger(SendPacketEvent::new( + self.player, + ServerboundConfigurationAcknowledged, + )); commands .entity(self.player) diff --git a/azalea-client/src/plugins/packet/login.rs b/azalea-client/src/plugins/packet/login.rs deleted file mode 100644 index 1bb07266..00000000 --- a/azalea-client/src/plugins/packet/login.rs +++ /dev/null @@ -1,114 +0,0 @@ -// login packets aren't actually handled here because compression/encryption -// would make packet handling a lot messier - -use std::{collections::HashSet, sync::Arc}; - -use azalea_protocol::packets::{ - Packet, - login::{ - ClientboundLoginPacket, ServerboundLoginPacket, - s_custom_query_answer::ServerboundCustomQueryAnswer, - }, -}; -use bevy_ecs::{prelude::*, system::SystemState}; -use derive_more::{Deref, DerefMut}; -use tokio::sync::mpsc; -use tracing::error; - -// this struct is defined here anyways though so it's consistent with the other -// ones - -/// An event that's sent when we receive a login packet from the server. Note -/// that if you want to handle this in a system, you must add -/// `.before(azalea::packet::login::process_packet_events)` to it -/// because that system clears the events. -#[derive(Event, Debug, Clone)] -pub struct LoginPacketEvent { - /// The client entity that received the packet. - pub entity: Entity, - /// The packet that was actually received. - pub packet: Arc, -} - -/// Event for sending a login packet to the server. -#[derive(Event)] -pub struct SendLoginPacketEvent { - pub entity: Entity, - pub packet: ServerboundLoginPacket, -} -impl SendLoginPacketEvent { - pub fn new(entity: Entity, packet: impl Packet) -> Self { - let packet = packet.into_variant(); - Self { entity, packet } - } -} - -#[derive(Component)] -pub struct LoginSendPacketQueue { - pub tx: mpsc::UnboundedSender, -} - -/// A marker component for local players that are currently in the -/// `login` state. -#[derive(Component, Clone, Debug)] -pub struct InLoginState; - -pub fn handle_send_packet_event( - mut send_packet_events: EventReader, - mut query: Query<&mut LoginSendPacketQueue>, -) { - for event in send_packet_events.read() { - if let Ok(queue) = query.get_mut(event.entity) { - let _ = queue.tx.send(event.packet.clone()); - } else { - error!("Sent SendPacketEvent for entity that doesn't have a LoginSendPacketQueue"); - } - } -} - -/// Plugins can add to this set if they want to handle a custom query packet -/// themselves. This component removed after the login state ends. -#[derive(Component, Default, Debug, Deref, DerefMut)] -pub struct IgnoreQueryIds(HashSet); - -pub fn process_packet_events(ecs: &mut World) { - let mut events_owned = Vec::new(); - let mut system_state: SystemState>> = SystemState::new(ecs); - let mut events = system_state.get_mut(ecs); - for LoginPacketEvent { - entity: player_entity, - packet, - } in events.drain() - { - // we do this so `ecs` isn't borrowed for the whole loop - events_owned.push((player_entity, packet)); - } - for (player_entity, packet) in events_owned { - #[allow(clippy::single_match)] - match packet.as_ref() { - ClientboundLoginPacket::CustomQuery(p) => { - let mut system_state: SystemState<( - EventWriter, - Query<&IgnoreQueryIds>, - )> = SystemState::new(ecs); - let (mut send_packet_events, query) = system_state.get_mut(ecs); - - let ignore_query_ids = query.get(player_entity).ok().map(|x| x.0.clone()); - if let Some(ignore_query_ids) = ignore_query_ids { - if ignore_query_ids.contains(&p.transaction_id) { - continue; - } - } - - send_packet_events.send(SendLoginPacketEvent::new( - player_entity, - ServerboundCustomQueryAnswer { - transaction_id: p.transaction_id, - data: None, - }, - )); - } - _ => {} - } - } -} diff --git a/azalea-client/src/plugins/packet/login/events.rs b/azalea-client/src/plugins/packet/login/events.rs new file mode 100644 index 00000000..fc7a6b22 --- /dev/null +++ b/azalea-client/src/plugins/packet/login/events.rs @@ -0,0 +1,86 @@ +use std::sync::Arc; + +use azalea_protocol::packets::{ + Packet, + login::{ + ClientboundCustomQuery, ClientboundHello, ClientboundLoginPacket, ServerboundLoginPacket, + }, +}; +use bevy_ecs::prelude::*; +use tracing::{debug, error}; + +use super::InLoginState; +use crate::{Account, connection::RawConnection}; + +#[derive(Event, Debug, Clone)] +pub struct ReceiveLoginPacketEvent { + /// The client entity that received the packet. + pub entity: Entity, + /// The packet that was actually received. + pub packet: Arc, +} + +#[derive(Event, Debug, Clone)] +pub struct ReceiveHelloEvent { + pub account: Account, + pub packet: ClientboundHello, +} + +#[derive(Event, Debug, Clone)] +pub struct ReceiveCustomQueryEvent { + /// The client entity that received the packet. + pub entity: Entity, + pub packet: ClientboundCustomQuery, + /// A system can set this to `true` to make Azalea not reply to the query. + /// You must make sure you modify this before the + /// [`reply_to_custom_queries`] system runs. + /// + /// [`reply_to_custom_queries`]: crate::login::reply_to_custom_queries + pub disabled: bool, +} + +/// Event for sending a login packet to the server. +#[derive(Event, Debug, Clone)] +pub struct SendLoginPacketEvent { + pub sent_by: Entity, + pub packet: ServerboundLoginPacket, +} +impl SendLoginPacketEvent { + pub fn new(entity: Entity, packet: impl Packet) -> Self { + let packet = packet.into_variant(); + Self { + sent_by: entity, + packet, + } + } +} + +pub fn handle_outgoing_packets_observer( + trigger: Trigger, + mut query: Query<(&mut RawConnection, Option<&InLoginState>)>, +) { + let event = trigger.event(); + if let Ok((mut raw_conn, in_login_state)) = query.get_mut(event.sent_by) { + if in_login_state.is_none() { + error!( + "Tried to send a login packet {:?} while not in login state", + event.packet + ); + return; + } + debug!("Sending login packet: {:?}", event.packet); + if let Err(e) = raw_conn.write(event.packet.clone()) { + error!("Failed to send packet: {e}"); + } + } +} +/// A system that converts [`SendLoginPacketEvent`] events into triggers so +/// they get received by [`handle_outgoing_packets_observer`]. +pub fn handle_outgoing_packets( + mut commands: Commands, + mut events: EventReader, +) { + for event in events.read() { + commands.trigger(event.clone()); + } +} diff --git a/azalea-client/src/plugins/packet/login/mod.rs b/azalea-client/src/plugins/packet/login/mod.rs new file mode 100644 index 00000000..d313a767 --- /dev/null +++ b/azalea-client/src/plugins/packet/login/mod.rs @@ -0,0 +1,145 @@ +// login packets aren't actually handled here because compression/encryption +// would make packet handling a lot messier + +mod events; + +use azalea_protocol::packets::{ + ConnectionProtocol, + login::{ + ClientboundCookieRequest, ClientboundCustomQuery, ClientboundHello, + ClientboundLoginCompression, ClientboundLoginDisconnect, ClientboundLoginFinished, + ClientboundLoginPacket, ServerboundCookieResponse, ServerboundLoginAcknowledged, + }, +}; +use bevy_ecs::prelude::*; +pub use events::*; +use tracing::{debug, error}; + +use super::as_system; +use crate::{ + Account, GameProfileComponent, InConfigState, connection::RawConnection, + declare_packet_handlers, disconnect::DisconnectEvent, +}; + +pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundLoginPacket) { + let mut handler = LoginPacketHandler { player, ecs }; + + declare_packet_handlers!( + ClientboundLoginPacket, + packet, + handler, + [ + hello, + login_disconnect, + login_finished, + login_compression, + custom_query, + cookie_request + ] + ); +} + +/// A marker component for local players that are currently in the +/// `login` state. +#[derive(Component, Clone, Debug)] +pub struct InLoginState; + +pub struct LoginPacketHandler<'a> { + pub ecs: &'a mut World, + pub player: Entity, +} +impl LoginPacketHandler<'_> { + pub fn hello(&mut self, p: &ClientboundHello) { + debug!("Got encryption request {p:?}"); + + as_system::<(Commands, Query<&Account>)>(self.ecs, |(mut commands, query)| { + let Ok(account) = query.get(self.player) else { + error!( + "Expected Account component to be present on player when receiving hello packet." + ); + return; + }; + commands.trigger_targets( + ReceiveHelloEvent { + account: account.clone(), + packet: p.clone(), + }, + self.player, + ); + }); + } + pub fn login_disconnect(&mut self, p: &ClientboundLoginDisconnect) { + debug!("Got disconnect {:?}", p); + + as_system::>(self.ecs, |mut events| { + events.send(DisconnectEvent { + entity: self.player, + reason: Some(p.reason.clone()), + }); + }); + } + pub fn login_finished(&mut self, p: &ClientboundLoginFinished) { + debug!( + "Got profile {:?}. login is finished and we're now switching to the config state", + p.game_profile + ); + + as_system::<(Commands, Query<&mut RawConnection>)>( + self.ecs, + |(mut commands, mut query)| { + commands.trigger(SendLoginPacketEvent::new( + self.player, + ServerboundLoginAcknowledged, + )); + + commands + .entity(self.player) + .remove::() + .insert(InConfigState) + .insert(GameProfileComponent(p.game_profile.clone())); + + let mut conn = query + .get_mut(self.player) + .expect("RawConnection component should be present when receiving packets"); + conn.state = ConnectionProtocol::Configuration; + }, + ); + } + pub fn login_compression(&mut self, p: &ClientboundLoginCompression) { + debug!("Got compression request {p:?}"); + + as_system::>(self.ecs, |mut query| { + let mut conn = query + .get_mut(self.player) + .expect("RawConnection component should be present when receiving packets"); + if let Some(net_conn) = &mut conn.net_conn() { + net_conn.set_compression_threshold(Some(p.compression_threshold as u32)); + } + }) + } + pub fn custom_query(&mut self, p: &ClientboundCustomQuery) { + debug!("Got custom query {p:?}"); + + as_system::>(self.ecs, |mut events| { + events.send(ReceiveCustomQueryEvent { + entity: self.player, + packet: p.clone(), + disabled: false, + }); + }); + } + pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) { + debug!("Got cookie request {p:?}"); + + as_system::(self.ecs, |mut commands| { + commands.trigger(SendLoginPacketEvent::new( + self.player, + ServerboundCookieResponse { + key: p.key.clone(), + // cookies aren't implemented + payload: None, + }, + )); + }); + } +} diff --git a/azalea-client/src/plugins/packet/mod.rs b/azalea-client/src/plugins/packet/mod.rs index 362154cc..1c14fa30 100644 --- a/azalea-client/src/plugins/packet/mod.rs +++ b/azalea-client/src/plugins/packet/mod.rs @@ -1,17 +1,11 @@ use azalea_entity::metadata::Health; -use bevy_app::{App, First, Plugin, PreUpdate, Update}; +use bevy_app::{App, Plugin, Update}; use bevy_ecs::{ prelude::*, system::{SystemParam, SystemState}, }; -use self::{ - game::{ - AddPlayerEvent, DeathEvent, InstanceLoadedEvent, KeepAliveEvent, RemovePlayerEvent, - ResourcePackEvent, UpdatePlayerEvent, - }, - login::{LoginPacketEvent, SendLoginPacketEvent}, -}; +use self::game::DeathEvent; use crate::{chat::ChatReceivedEvent, events::death_listener}; pub mod config; @@ -36,50 +30,38 @@ pub fn death_event_on_0_health( impl Plugin for PacketPlugin { fn build(&self, app: &mut App) { - app.add_systems( - First, - ( - game::emit_receive_packet_events, - config::emit_receive_config_packet_events, - ), - ) - .add_systems( - PreUpdate, - ( - game::process_packet_events, - config::process_packet_events, - login::handle_send_packet_event, - login::process_packet_events, - ), - ) - .add_observer(game::handle_outgoing_packets_observer) - .add_observer(config::handle_outgoing_packets_observer) - .add_systems( - Update, - ( + app.add_observer(game::handle_outgoing_packets_observer) + .add_observer(config::handle_outgoing_packets_observer) + .add_observer(login::handle_outgoing_packets_observer) + .add_systems( + Update, ( - config::handle_outgoing_packets, - game::handle_outgoing_packets, - ) - .chain(), - death_event_on_0_health.before(death_listener), - ), - ) - // we do this instead of add_event so we can handle the events ourselves - .init_resource::>() - .init_resource::>() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::(); + ( + config::handle_outgoing_packets, + game::handle_outgoing_packets, + login::handle_outgoing_packets, + ) + .chain(), + death_event_on_0_health.before(death_listener), + ), + ) + .add_event::() + .add_event::() + .add_event::() + // + .add_event::() + .add_event::() + .add_event::() + // + .add_event::() + .add_event::() + .add_event::() + .add_event::() + .add_event::() + .add_event::() + .add_event::() + .add_event::() + .add_event::(); } } diff --git a/azalea-client/src/raw_connection.rs b/azalea-client/src/raw_connection.rs deleted file mode 100644 index 97e93f16..00000000 --- a/azalea-client/src/raw_connection.rs +++ /dev/null @@ -1,208 +0,0 @@ -use std::fmt::Debug; -use std::sync::Arc; - -use azalea_protocol::{ - connect::{RawReadConnection, RawWriteConnection}, - packets::{ConnectionProtocol, Packet, ProtocolPacket}, - read::ReadPacketError, - write::serialize_packet, -}; -use bevy_ecs::prelude::*; -use parking_lot::Mutex; -use thiserror::Error; -use tokio::sync::mpsc::{ - self, - error::{SendError, TrySendError}, -}; -use tracing::error; - -/// A component for clients that can read and write packets to the server. This -/// works with raw bytes, so you'll have to serialize/deserialize packets -/// yourself. It will do the compression and encryption for you though. -#[derive(Component)] -pub struct RawConnection { - pub reader: RawConnectionReader, - pub writer: RawConnectionWriter, - - /// Packets sent to this will be sent to the server. - /// A task that reads packets from the server. The client is disconnected - /// when this task ends. - pub read_packets_task: tokio::task::JoinHandle<()>, - /// A task that writes packets from the server. - pub write_packets_task: tokio::task::JoinHandle<()>, - - pub connection_protocol: ConnectionProtocol, -} - -#[derive(Clone)] -pub struct RawConnectionReader { - pub incoming_packet_queue: Arc>>>, - pub run_schedule_sender: mpsc::Sender<()>, -} -#[derive(Clone)] -pub struct RawConnectionWriter { - pub outgoing_packets_sender: mpsc::UnboundedSender>, -} - -#[derive(Error, Debug)] -pub enum WritePacketError { - #[error("Wrong protocol state: expected {expected:?}, got {got:?}")] - WrongState { - expected: ConnectionProtocol, - got: ConnectionProtocol, - }, - #[error(transparent)] - Encoding(#[from] azalea_protocol::write::PacketEncodeError), - #[error(transparent)] - SendError { - #[from] - #[backtrace] - source: SendError>, - }, -} - -impl RawConnection { - pub fn new( - run_schedule_sender: mpsc::Sender<()>, - connection_protocol: ConnectionProtocol, - raw_read_connection: RawReadConnection, - raw_write_connection: RawWriteConnection, - ) -> Self { - let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel(); - - let incoming_packet_queue = Arc::new(Mutex::new(Vec::new())); - - let reader = RawConnectionReader { - incoming_packet_queue: incoming_packet_queue.clone(), - run_schedule_sender, - }; - let writer = RawConnectionWriter { - outgoing_packets_sender, - }; - - let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection)); - let write_packets_task = tokio::spawn( - writer - .clone() - .write_task(raw_write_connection, outgoing_packets_receiver), - ); - - Self { - reader, - writer, - read_packets_task, - write_packets_task, - connection_protocol, - } - } - - pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> { - self.writer.outgoing_packets_sender.send(raw_packet)?; - Ok(()) - } - - /// Write the packet with the given state to the server. - /// - /// # Errors - /// - /// Returns an error if the packet is not valid for the current state, or if - /// encoding it failed somehow (like it's too big or something). - pub fn write_packet( - &self, - packet: impl Packet

, - ) -> Result<(), WritePacketError> { - let packet = packet.into_variant(); - let raw_packet = serialize_packet(&packet)?; - self.write_raw_packet(raw_packet)?; - - Ok(()) - } - - /// Returns whether the connection is still alive. - pub fn is_alive(&self) -> bool { - !self.read_packets_task.is_finished() - } - - pub fn incoming_packet_queue(&self) -> Arc>>> { - self.reader.incoming_packet_queue.clone() - } - - pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) { - self.connection_protocol = connection_protocol; - } -} - -impl RawConnectionReader { - /// Loop that reads from the connection and adds the packets to the queue + - /// runs the schedule. - pub async fn read_task(self, mut read_conn: RawReadConnection) { - fn log_for_error(error: &ReadPacketError) { - if !matches!(*error, ReadPacketError::ConnectionClosed) { - error!("Error reading packet from Client: {error:?}"); - } - } - - loop { - match read_conn.read().await { - Ok(raw_packet) => { - let mut incoming_packet_queue = self.incoming_packet_queue.lock(); - - incoming_packet_queue.push(raw_packet); - // this makes it so packets received at the same time are guaranteed to be - // handled in the same tick. this is also an attempt at making it so we can't - // receive any packets in the ticks/updates after being disconnected. - loop { - let raw_packet = match read_conn.try_read() { - Ok(p) => p, - Err(err) => { - log_for_error(&err); - return; - } - }; - let Some(raw_packet) = raw_packet else { break }; - incoming_packet_queue.push(raw_packet); - } - - // tell the client to run all the systems - if self.run_schedule_sender.try_send(()) == Err(TrySendError::Closed(())) { - // the client was dropped - break; - } - } - Err(err) => { - log_for_error(&err); - return; - } - } - } - } -} - -impl RawConnectionWriter { - /// Consume the [`ServerboundGamePacket`] queue and actually write the - /// packets to the server. It's like this so writing packets doesn't need to - /// be awaited. - /// - /// [`ServerboundGamePacket`]: azalea_protocol::packets::game::ServerboundGamePacket - pub async fn write_task( - self, - mut write_conn: RawWriteConnection, - mut outgoing_packets_receiver: mpsc::UnboundedReceiver>, - ) { - while let Some(raw_packet) = outgoing_packets_receiver.recv().await { - if let Err(err) = write_conn.write(&raw_packet).await { - error!("Disconnecting because we couldn't write a packet: {err}."); - break; - }; - } - // receiver is automatically closed when it's dropped - } -} - -impl Drop for RawConnection { - /// Stop every active task when this `RawConnection` is dropped. - fn drop(&mut self) { - self.read_packets_task.abort(); - self.write_packets_task.abort(); - } -} diff --git a/azalea-client/src/test_simulation.rs b/azalea-client/src/test_simulation.rs index a09e5dae..4dec01b4 100644 --- a/azalea-client/src/test_simulation.rs +++ b/azalea-client/src/test_simulation.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, sync::Arc, time::Duration}; +use std::{fmt::Debug, sync::Arc}; use azalea_auth::game_profile::GameProfile; use azalea_buf::AzaleaWrite; @@ -21,16 +21,14 @@ use azalea_world::palette::{PalettedContainer, PalettedContainerKind}; use azalea_world::{Chunk, Instance, MinecraftEntityId, Section}; use bevy_app::App; use bevy_ecs::{prelude::*, schedule::ExecutorKind}; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use simdnbt::owned::{NbtCompound, NbtTag}; -use tokio::task::JoinHandle; -use tokio::{sync::mpsc, time::sleep}; use uuid::Uuid; +use crate::connection::RawConnection; use crate::disconnect::DisconnectEvent; use crate::{ ClientInformation, GameProfileComponent, InConfigState, InstanceHolder, LocalPlayerBundle, - raw_connection::{RawConnection, RawConnectionReader, RawConnectionWriter}, }; /// A way to simulate a client in a server, used for some internal tests. @@ -40,16 +38,13 @@ pub struct Simulation { // the runtime needs to be kept around for the tasks to be considered alive pub rt: tokio::runtime::Runtime, - - pub incoming_packet_queue: Arc>>>, - pub clear_outgoing_packets_receiver_task: JoinHandle, } impl Simulation { pub fn new(initial_connection_protocol: ConnectionProtocol) -> Self { let mut app = create_simulation_app(); let mut entity = app.world_mut().spawn_empty(); - let (player, clear_outgoing_packets_receiver_task, incoming_packet_queue, rt) = + let (player, rt) = create_local_player_bundle(entity.id(), ConnectionProtocol::Configuration); entity.insert(player); @@ -58,16 +53,16 @@ impl Simulation { tick_app(&mut app); // start in the config state - app.world_mut().entity_mut(entity).insert(InConfigState); + app.world_mut().entity_mut(entity).insert(( + InConfigState, + GameProfileComponent(GameProfile::new( + Uuid::from_u128(1234), + "azalea".to_string(), + )), + )); tick_app(&mut app); - let mut simulation = Self { - app, - entity, - rt, - incoming_packet_queue, - clear_outgoing_packets_receiver_task, - }; + let mut simulation = Self { app, entity, rt }; #[allow(clippy::single_match)] match initial_connection_protocol { @@ -95,9 +90,11 @@ impl Simulation { simulation } - pub fn receive_packet(&self, packet: impl Packet

) { + pub fn receive_packet(&mut self, packet: impl Packet

) { let buf = azalea_protocol::write::serialize_packet(&packet.into_variant()).unwrap(); - self.incoming_packet_queue.lock().push(buf); + self.with_component_mut::(|raw_conn| { + raw_conn.injected_clientbound_packets.push(buf); + }); } pub fn tick(&mut self) { @@ -112,6 +109,14 @@ impl Simulation { pub fn has_component(&self) -> bool { self.app.world().get::(self.entity).is_some() } + pub fn with_component_mut(&mut self, f: impl FnOnce(&mut T)) { + f(&mut self + .app + .world_mut() + .entity_mut(self.entity) + .get_mut::() + .unwrap()); + } pub fn resource(&self) -> T { self.app.world().get_resource::().unwrap().clone() } @@ -143,70 +148,24 @@ impl Simulation { fn create_local_player_bundle( entity: Entity, connection_protocol: ConnectionProtocol, -) -> ( - LocalPlayerBundle, - JoinHandle, - Arc>>>, - tokio::runtime::Runtime, -) { +) -> (LocalPlayerBundle, tokio::runtime::Runtime) { // unused since we'll trigger ticks ourselves - let (run_schedule_sender, _run_schedule_receiver) = mpsc::channel(1); - - let (outgoing_packets_sender, mut outgoing_packets_receiver) = mpsc::unbounded_channel(); - let incoming_packet_queue = Arc::new(Mutex::new(Vec::new())); - let reader = RawConnectionReader { - incoming_packet_queue: incoming_packet_queue.clone(), - run_schedule_sender, - }; - let writer = RawConnectionWriter { - outgoing_packets_sender, - }; let rt = tokio::runtime::Runtime::new().unwrap(); - // the tasks can't die since that would make us send a DisconnectEvent - let read_packets_task = rt.spawn(async { - loop { - sleep(Duration::from_secs(60)).await; - } - }); - let write_packets_task = rt.spawn(async { - loop { - sleep(Duration::from_secs(60)).await; - } - }); - - let clear_outgoing_packets_receiver_task = rt.spawn(async move { - loop { - let _ = outgoing_packets_receiver.recv().await; - } - }); - - let raw_connection = RawConnection { - reader, - writer, - read_packets_task, - write_packets_task, - connection_protocol, - }; + let raw_connection = RawConnection::new_networkless(connection_protocol); let instance = Instance::default(); let instance_holder = InstanceHolder::new(entity, Arc::new(RwLock::new(instance))); let local_player_bundle = LocalPlayerBundle { raw_connection, - game_profile: GameProfileComponent(GameProfile::new(Uuid::nil(), "azalea".to_owned())), client_information: ClientInformation::default(), instance_holder, metadata: PlayerMetadataBundle::default(), }; - ( - local_player_bundle, - clear_outgoing_packets_receiver_task, - incoming_packet_queue, - rt, - ) + (local_player_bundle, rt) } fn create_simulation_app() -> App { diff --git a/azalea-client/tests/change_dimension_to_nether_and_back.rs b/azalea-client/tests/change_dimension_to_nether_and_back.rs index cc1fcb14..fddbccde 100644 --- a/azalea-client/tests/change_dimension_to_nether_and_back.rs +++ b/azalea-client/tests/change_dimension_to_nether_and_back.rs @@ -12,6 +12,8 @@ use simdnbt::owned::{NbtCompound, NbtTag}; #[test] fn test_change_dimension_to_nether_and_back() { + let _ = tracing_subscriber::fmt().try_init(); + generic_test_change_dimension_to_nether_and_back(true); generic_test_change_dimension_to_nether_and_back(false); } diff --git a/azalea-protocol/src/connect.rs b/azalea-protocol/src/connect.rs index 13a86ed8..77968eed 100644 --- a/azalea-protocol/src/connect.rs +++ b/azalea-protocol/src/connect.rs @@ -344,8 +344,9 @@ impl Connection { impl Connection { /// Set our compression threshold, i.e. the maximum size that a packet is - /// allowed to be without getting compressed. If you set it to less than 0 - /// then compression gets disabled. + /// allowed to be without getting compressed. Setting it to 0 means every + /// packet will be compressed. If you set it to less than 0, + /// then compression is disabled. pub fn set_compression_threshold(&mut self, threshold: i32) { // if you pass a threshold of less than 0, compression is disabled if threshold >= 0 { diff --git a/azalea-protocol/src/packets/game/s_client_information.rs b/azalea-protocol/src/packets/game/s_client_information.rs index 5861212c..c8e76f63 100644 --- a/azalea-protocol/src/packets/game/s_client_information.rs +++ b/azalea-protocol/src/packets/game/s_client_information.rs @@ -5,5 +5,5 @@ use crate::common::client_information::ClientInformation; #[derive(Clone, Debug, AzBuf, ServerboundGamePacket)] pub struct ServerboundClientInformation { - pub information: ClientInformation, + pub client_information: ClientInformation, } diff --git a/azalea-protocol/src/packets/login/s_custom_query.rs b/azalea-protocol/src/packets/login/s_custom_query.rs deleted file mode 100644 index 39ecdcef..00000000 --- a/azalea-protocol/src/packets/login/s_custom_query.rs +++ /dev/null @@ -1,9 +0,0 @@ -use azalea_buf::{AzBuf, UnsizedByteArray}; -use azalea_protocol_macros::ServerboundLoginPacket; - -#[derive(Clone, Debug, AzBuf, ServerboundLoginPacket)] -pub struct ServerboundCustomQuery { - #[var] - pub transaction_id: u32, - pub data: Option, -} diff --git a/azalea-protocol/src/read.rs b/azalea-protocol/src/read.rs index 64d35a08..038af319 100644 --- a/azalea-protocol/src/read.rs +++ b/azalea-protocol/src/read.rs @@ -285,6 +285,8 @@ where buffer.get_mut().extend_from_slice(&bytes); } } +/// Read a packet from the stream, then if necessary decrypt it, decompress +/// it, and split it. pub fn try_read_raw_packet( stream: &mut R, buffer: &mut Cursor>, diff --git a/azalea-protocol/src/write.rs b/azalea-protocol/src/write.rs index adefc340..dd863f9e 100644 --- a/azalea-protocol/src/write.rs +++ b/azalea-protocol/src/write.rs @@ -54,6 +54,15 @@ pub async fn write_raw_packet( where W: AsyncWrite + Unpin + Send, { + let network_packet = encode_to_network_packet(raw_packet, compression_threshold, cipher); + stream.write_all(&network_packet).await +} + +pub fn encode_to_network_packet( + raw_packet: &[u8], + compression_threshold: Option, + cipher: &mut Option, +) -> Vec { trace!("Writing raw packet: {raw_packet:?}"); let mut raw_packet = raw_packet.to_vec(); if let Some(threshold) = compression_threshold { @@ -64,7 +73,7 @@ where if let Some(cipher) = cipher { azalea_crypto::encrypt_packet(cipher, &mut raw_packet); } - stream.write_all(&raw_packet).await + raw_packet } pub fn compression_encoder( diff --git a/azalea/examples/echo.rs b/azalea/examples/echo.rs index 01390982..1e773b7d 100644 --- a/azalea/examples/echo.rs +++ b/azalea/examples/echo.rs @@ -20,7 +20,7 @@ pub struct State {} async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> { if let Event::Chat(m) = event { if let (Some(sender), content) = m.split_sender_and_content() { - if sender == bot.profile.name { + if sender == bot.username() { return Ok(()); // ignore our own messages } bot.chat(&content); diff --git a/azalea/examples/steal.rs b/azalea/examples/steal.rs index 1277fab2..3fa87cc4 100644 --- a/azalea/examples/steal.rs +++ b/azalea/examples/steal.rs @@ -28,7 +28,7 @@ struct State { async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { if let Event::Chat(m) = event { - if m.sender() == Some(bot.profile.name.clone()) { + if m.sender() == Some(bot.username()) { return Ok(()); }; if m.content() != "go" { diff --git a/azalea/examples/testbot/commands/debug.rs b/azalea/examples/testbot/commands/debug.rs index 9de4d97d..3428d117 100644 --- a/azalea/examples/testbot/commands/debug.rs +++ b/azalea/examples/testbot/commands/debug.rs @@ -25,6 +25,12 @@ pub fn register(commands: &mut CommandDispatcher>) { 1 })); + commands.register(literal("disconnect").executes(|ctx: &Ctx| { + let source = ctx.source.lock(); + source.bot.disconnect(); + 1 + })); + commands.register(literal("whereami").executes(|ctx: &Ctx| { let mut source = ctx.source.lock(); let Some(entity) = source.entity() else { @@ -248,7 +254,7 @@ pub fn register(commands: &mut CommandDispatcher>) { } } "bevy_ecs::event::collections::Events" => { - let events = ecs.resource::>(); + let events = ecs.resource::>(); writeln!(report, "- Event count: {}", events.len()).unwrap(); } "bevy_ecs::event::collections::Events" => { diff --git a/azalea/examples/testbot/main.rs b/azalea/examples/testbot/main.rs index 410d1b6d..683a98d7 100644 --- a/azalea/examples/testbot/main.rs +++ b/azalea/examples/testbot/main.rs @@ -134,7 +134,7 @@ async fn handle(bot: Client, event: azalea::Event, state: State) -> anyhow::Resu view_distance: 32, ..Default::default() }) - .await?; + .await; if swarm.args.pathfinder_debug_particles { bot.ecs .lock() diff --git a/azalea/examples/todo/craft_dig_straight_down.rs b/azalea/examples/todo/craft_dig_straight_down.rs index 4f613adf..0dc8e16d 100644 --- a/azalea/examples/todo/craft_dig_straight_down.rs +++ b/azalea/examples/todo/craft_dig_straight_down.rs @@ -24,7 +24,7 @@ async fn main() { async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { match event { Event::Chat(m) => { - if m.sender() == Some(bot.profile.name) { + if m.sender() == Some(bot.username()) { return Ok(()); }; if m.content() == "go" { diff --git a/azalea/src/container.rs b/azalea/src/container.rs index 0ce0fc44..0d1cfb16 100644 --- a/azalea/src/container.rs +++ b/azalea/src/container.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use std::fmt::Formatter; -use azalea_client::packet::game::ReceivePacketEvent; +use azalea_client::packet::game::ReceiveGamePacketEvent; use azalea_client::{ Client, inventory::{CloseContainerEvent, ContainerClickEvent, Inventory}, @@ -234,7 +234,10 @@ impl ContainerHandle { #[derive(Component, Debug)] pub struct WaitingForInventoryOpen; -fn handle_menu_opened_event(mut commands: Commands, mut events: EventReader) { +fn handle_menu_opened_event( + mut commands: Commands, + mut events: EventReader, +) { for event in events.read() { if let ClientboundGamePacket::ContainerSetContent { .. } = event.packet.as_ref() { commands diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index eb56ae2d..ba8fd7bb 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -55,8 +55,6 @@ pub struct Swarm { bots_tx: mpsc::UnboundedSender<(Option, Client)>, swarm_tx: mpsc::UnboundedSender, - - run_schedule_sender: mpsc::Sender<()>, } /// Create a new [`Swarm`]. @@ -396,12 +394,9 @@ where swarm_tx.send(SwarmEvent::Init).unwrap(); - let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1); - let main_schedule_label = self.app.main().update_schedule.unwrap(); - let ecs_lock = - start_ecs_runner(self.app, run_schedule_receiver, run_schedule_sender.clone()); + let ecs_lock = start_ecs_runner(self.app); let swarm = Swarm { ecs_lock: ecs_lock.clone(), @@ -414,8 +409,6 @@ where bots_tx, swarm_tx: swarm_tx.clone(), - - run_schedule_sender, }; // run the main schedule so the startup systems run @@ -495,7 +488,8 @@ where let Some(first_bot_state) = first_bot.query::>(&mut ecs).cloned() else { error!( "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", - first_bot.profile.name, first_bot.entity + first_bot.username(), + first_bot.entity ); continue; }; @@ -513,7 +507,8 @@ where let Some(state) = bot.query::>(&mut ecs).cloned() else { error!( "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", - bot.profile.name, bot.entity + bot.username(), + bot.entity ); continue; }; @@ -665,7 +660,6 @@ impl Swarm { address: &address, resolved_address: &resolved_address, proxy: join_opts.proxy.clone(), - run_schedule_sender: self.run_schedule_sender.clone(), event_sender: Some(tx), }) .await?;