diff --git a/azalea-auth/src/game_profile.rs b/azalea-auth/src/game_profile.rs index ebff4fce..c2561a9d 100644 --- a/azalea-auth/src/game_profile.rs +++ b/azalea-auth/src/game_profile.rs @@ -10,6 +10,7 @@ pub struct GameProfile { pub uuid: Uuid, /// The username of the player. pub name: String, + // this is an arc to make GameProfile cheaper to clone when the properties are big pub properties: Arc>, } diff --git a/azalea-buf/src/write.rs b/azalea-buf/src/write.rs index c56b0062..0f35dba8 100644 --- a/azalea-buf/src/write.rs +++ b/azalea-buf/src/write.rs @@ -39,7 +39,7 @@ impl AzaleaWriteVar for i32 { let mut buffer = [0]; let mut value = *self; if value == 0 { - buf.write_all(&buffer).unwrap(); + buf.write_all(&buffer)?; } while value != 0 { buffer[0] = (value & 0b0111_1111) as u8; diff --git a/azalea-client/src/account.rs b/azalea-client/src/account.rs index 5e2fafa7..a4b35e81 100644 --- a/azalea-client/src/account.rs +++ b/azalea-client/src/account.rs @@ -15,7 +15,7 @@ use uuid::Uuid; /// To join a server using this account, use [`Client::join`] or /// [`azalea::ClientBuilder`]. /// -/// Note that this is also a component that our clients have. +/// This is also an ECS component that is present on our client entities. /// /// # Examples /// diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index e13368eb..fbb043aa 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -8,48 +8,42 @@ use std::{ time::{Duration, Instant}, }; -use azalea_auth::{game_profile::GameProfile, sessionserver::ClientSessionServerError}; +use azalea_auth::game_profile::GameProfile; use azalea_chat::FormattedText; use azalea_core::{ data_registry::ResolvableDataRegistry, position::Vec3, resource_location::ResourceLocation, tick::GameTick, }; use azalea_entity::{ - EntityPlugin, EntityUpdateSet, EyeHeight, LocalEntity, Position, + EntityUpdateSet, EyeHeight, LocalEntity, Position, indexing::{EntityIdIndex, EntityUuidIndex}, metadata::Health, }; -use azalea_physics::PhysicsPlugin; use azalea_protocol::{ ServerAddress, common::client_information::ClientInformation, connect::{Connection, ConnectionError, Proxy}, packets::{ self, ClientIntention, ConnectionProtocol, PROTOCOL_VERSION, Packet, - config::{ClientboundConfigPacket, ServerboundConfigPacket}, - game::ServerboundGamePacket, + game::{self, ServerboundGamePacket}, handshake::{ ClientboundHandshakePacket, ServerboundHandshakePacket, s_intention::ServerboundIntention, }, - login::{ - ClientboundLoginPacket, s_hello::ServerboundHello, s_key::ServerboundKey, - s_login_acknowledged::ServerboundLoginAcknowledged, - }, + login::{ClientboundLoginPacket, ServerboundLoginPacket, s_hello::ServerboundHello}, }, resolver, }; use azalea_world::{Instance, InstanceContainer, InstanceName, MinecraftEntityId, PartialInstance}; -use bevy_app::{App, Plugin, PluginGroup, PluginGroupBuilder, PluginsState, Update}; +use bevy_app::{App, Plugin, PluginsState, Update}; use bevy_ecs::{ bundle::Bundle, component::Component, entity::Entity, schedule::{InternedScheduleLabel, IntoSystemConfigs, LogLevel, ScheduleBuildSettings}, - system::{ResMut, Resource}, + system::{Commands, ResMut, Resource}, world::World, }; -use bevy_time::TimePlugin; use derive_more::Deref; use parking_lot::{Mutex, RwLock}; use simdnbt::owned::NbtCompound; @@ -65,30 +59,25 @@ use tracing::{debug, error, info}; use uuid::Uuid; use crate::{ - Account, PlayerInfo, - attack::{self, AttackPlugin}, - brand::BrandPlugin, - chat::ChatPlugin, - chunks::{ChunkBatchInfo, ChunksPlugin}, - disconnect::{DisconnectEvent, DisconnectPlugin}, - events::{Event, EventsPlugin, LocalPlayerEvents}, - interact::{CurrentSequenceNumber, InteractPlugin}, - inventory::{Inventory, InventoryPlugin}, + Account, DefaultPlugins, PlayerInfo, + attack::{self}, + chunks::ChunkBatchInfo, + connection::RawConnection, + disconnect::DisconnectEvent, + events::{Event, LocalPlayerEvents}, + interact::CurrentSequenceNumber, + inventory::Inventory, local_player::{ GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList, }, - mining::{self, MiningPlugin}, - movement::{LastSentLookDirection, MovementPlugin, PhysicsState}, + mining::{self}, + movement::{LastSentLookDirection, PhysicsState}, packet::{ - PacketPlugin, - login::{self, InLoginState, LoginSendPacketQueue}, + as_system, + game::SendPacketEvent, + login::{InLoginState, SendLoginPacketEvent}, }, player::retroactively_add_game_profile_component, - pong::PongPlugin, - raw_connection::RawConnection, - respawn::RespawnPlugin, - task_pool::TaskPoolPlugin, - tick_end::TickEndPlugin, }; /// `Client` has the things that a user interacting with the library will want. @@ -102,15 +91,6 @@ use crate::{ /// [`azalea::ClientBuilder`]: https://docs.rs/azalea/latest/azalea/struct.ClientBuilder.html #[derive(Clone)] pub struct Client { - /// The [`GameProfile`] for our client. This contains your username, UUID, - /// and skin data. - /// - /// This is immutable; the server cannot change it. To get the username and - /// skin the server chose for you, get your player from the [`TabList`] - /// component. - /// - /// This as also available from the ECS as [`GameProfileComponent`]. - pub profile: GameProfile, /// The entity for this client in the ECS. pub entity: Entity, @@ -134,6 +114,8 @@ pub enum JoinError { ReadPacket(#[from] Box), #[error("{0}")] Io(#[from] io::Error), + #[error("Failed to encrypt the challenge from the server for {0:?}")] + EncryptionError(packets::login::ClientboundHello), #[error("{0}")] SessionServer(#[from] azalea_auth::sessionserver::ClientSessionServerError), #[error("The given address could not be parsed into a ServerAddress")] @@ -192,13 +174,11 @@ impl Client { /// You should only use this if you want to change these fields from the /// defaults, otherwise use [`Client::join`]. pub fn new( - profile: GameProfile, entity: Entity, ecs: Arc>, run_schedule_sender: mpsc::Sender<()>, ) -> Self { Self { - profile, // default our id to 0, it'll be set later entity, @@ -294,8 +274,19 @@ impl Client { entity }; - // add the Account to the entity now so plugins can access it earlier - ecs.entity_mut(entity).insert(account.to_owned()); + let mut entity_mut = ecs.entity_mut(entity); + entity_mut.insert(( + // add the Account to the entity now so plugins can access it earlier + account.to_owned(), + // localentity is always present for our clients, even if we're not actually logged + // in + LocalEntity, + )); + if let Some(event_sender) = event_sender { + // this is optional so we don't leak memory in case the user doesn't want to + // handle receiving packets + entity_mut.insert(LocalPlayerEvents(event_sender)); + } entity }; @@ -305,59 +296,43 @@ impl Client { } else { Connection::new(resolved_address).await? }; - let (conn, game_profile) = - Self::handshake(ecs_lock.clone(), entity, conn, account, address).await?; - - // note that we send the proper packets in - // crate::configuration::handle_in_configuration_state + let conn = Self::handshake(ecs_lock.clone(), entity, conn, account, address).await?; let (read_conn, write_conn) = conn.into_split(); let (read_conn, write_conn) = (read_conn.raw, write_conn.raw); - // we did the handshake, so now we're connected to the server + // insert the client into the ecs so it finishes logging in + { + let mut ecs = ecs_lock.lock(); - let mut ecs = ecs_lock.lock(); + let instance = Instance::default(); + let instance_holder = crate::local_player::InstanceHolder::new( + entity, + // default to an empty world, it'll be set correctly later when we + // get the login packet + Arc::new(RwLock::new(instance)), + ); - // we got the ConfigurationConnection, so the client is now connected :) - let client = Client::new( - game_profile.clone(), - entity, - ecs_lock.clone(), - run_schedule_sender.clone(), - ); - - let instance = Instance::default(); - let instance_holder = crate::local_player::InstanceHolder::new( - entity, - // default to an empty world, it'll be set correctly later when we - // get the login packet - Arc::new(RwLock::new(instance)), - ); - - let mut entity = ecs.entity_mut(entity); - entity.insert(( - // these stay when we switch to the game state - LocalPlayerBundle { - raw_connection: RawConnection::new( - run_schedule_sender, - ConnectionProtocol::Configuration, - read_conn, - write_conn, - ), - game_profile: GameProfileComponent(game_profile), - client_information: crate::ClientInformation::default(), - instance_holder, - metadata: azalea_entity::metadata::PlayerMetadataBundle::default(), - }, - InConfigState, - // this component is never removed - LocalEntity, - )); - if let Some(event_sender) = event_sender { - // this is optional so we don't leak memory in case the user - entity.insert(LocalPlayerEvents(event_sender)); + let mut entity = ecs.entity_mut(entity); + entity.insert(( + // these stay when we switch to the game state + LocalPlayerBundle { + raw_connection: RawConnection::new( + read_conn, + write_conn, + ConnectionProtocol::Login, + ), + client_information: crate::ClientInformation::default(), + instance_holder, + metadata: azalea_entity::metadata::PlayerMetadataBundle::default(), + }, + InConfigState, + // this component is never removed + LocalEntity, + )); } + let client = Client::new(entity, ecs_lock.clone(), run_schedule_sender.clone()); Ok(client) } @@ -372,13 +347,7 @@ impl Client { mut conn: Connection, account: &Account, address: &ServerAddress, - ) -> Result< - ( - Connection, - GameProfile, - ), - JoinError, - > { + ) -> Result, JoinError> { // handshake conn.write(ServerboundIntention { protocol_version: PROTOCOL_VERSION, @@ -387,147 +356,34 @@ impl Client { intention: ClientIntention::Login, }) .await?; - let mut conn = conn.login(); + let conn = conn.login(); - // this makes it so plugins can send an `SendLoginPacketEvent` event to the ecs - // and we'll send it to the server - let (ecs_packets_tx, mut ecs_packets_rx) = mpsc::unbounded_channel(); - ecs_lock.lock().entity_mut(entity).insert(( - LoginSendPacketQueue { tx: ecs_packets_tx }, - crate::packet::login::IgnoreQueryIds::default(), - InLoginState, - )); - - // login - conn.write(ServerboundHello { - name: account.username.clone(), - // TODO: pretty sure this should generate an offline-mode uuid instead of just - // Uuid::default() - profile_id: account.uuid.unwrap_or_default(), - }) - .await?; - - let (conn, profile) = loop { - let packet = tokio::select! { - packet = conn.read() => packet?, - Some(packet) = ecs_packets_rx.recv() => { - // write this packet to the server - conn.write(packet).await?; - continue; - } - }; - - ecs_lock.lock().send_event(login::LoginPacketEvent { + as_system::(&mut ecs_lock.lock(), |mut commands| { + commands.entity(entity).insert(( + crate::packet::login::IgnoreQueryIds::default(), + InLoginState, + )); + commands.trigger(SendLoginPacketEvent::new( entity, - packet: Arc::new(packet.clone()), - }); + ServerboundHello { + name: account.username.clone(), + // TODO: pretty sure this should generate an offline-mode uuid instead of just + // Uuid::default() + profile_id: account.uuid.unwrap_or_default(), + }, + )) + }); - match packet { - ClientboundLoginPacket::Hello(p) => { - debug!("Got encryption request"); - let Ok(e) = azalea_crypto::encrypt(&p.public_key, &p.challenge) else { - error!("Failed to encrypt the challenge from the server for {p:?}"); - continue; - }; - - if let Some(access_token) = &account.access_token { - // keep track of the number of times we tried - // authenticating so we can give up after too many - let mut attempts: usize = 1; - - while let Err(e) = { - let access_token = access_token.lock().clone(); - conn.authenticate( - &access_token, - &account - .uuid - .expect("Uuid must be present if access token is present."), - e.secret_key, - &p, - ) - .await - } { - if attempts >= 2 { - // if this is the second attempt and we failed - // both times, give up - return Err(e.into()); - } - if matches!( - e, - ClientSessionServerError::InvalidSession - | ClientSessionServerError::ForbiddenOperation - ) { - // uh oh, we got an invalid session and have - // to reauthenticate now - account.refresh().await?; - } else { - return Err(e.into()); - } - attempts += 1; - } - } - - conn.write(ServerboundKey { - key_bytes: e.encrypted_public_key, - encrypted_challenge: e.encrypted_challenge, - }) - .await?; - - conn.set_encryption_key(e.secret_key); - } - ClientboundLoginPacket::LoginCompression(p) => { - debug!("Got compression request {:?}", p.compression_threshold); - conn.set_compression_threshold(p.compression_threshold); - } - ClientboundLoginPacket::LoginFinished(p) => { - debug!( - "Got profile {:?}. handshake is finished and we're now switching to the configuration state", - p.game_profile - ); - conn.write(ServerboundLoginAcknowledged {}).await?; - - break (conn.config(), p.game_profile); - } - ClientboundLoginPacket::LoginDisconnect(p) => { - debug!("Got disconnect {:?}", p); - return Err(JoinError::Disconnect { reason: p.reason }); - } - ClientboundLoginPacket::CustomQuery(p) => { - debug!("Got custom query {:?}", p); - // replying to custom query is done in - // packet::login::process_packet_events - } - ClientboundLoginPacket::CookieRequest(p) => { - debug!("Got cookie request {:?}", p); - - conn.write(packets::login::ServerboundCookieResponse { - key: p.key, - // cookies aren't implemented - payload: None, - }) - .await?; - } - } - }; - - ecs_lock - .lock() - .entity_mut(entity) - .remove::() - .remove::() - .remove::(); - - Ok((conn, profile)) + Ok(conn) } /// Write a packet directly to the server. - pub fn write_packet( - &self, - packet: impl Packet, - ) -> Result<(), crate::raw_connection::WritePacketError> { + pub fn write_packet(&self, packet: impl Packet) { let packet = packet.into_variant(); - self.raw_connection_mut(&mut self.ecs.lock()) - .write_packet(packet) + self.ecs + .lock() + .commands() + .trigger(SendPacketEvent::new(self.entity, packet)); } /// Disconnect this client from the server by ending all tasks. @@ -694,10 +550,7 @@ impl Client { /// # Ok(()) /// # } /// ``` - pub async fn set_client_information( - &self, - client_information: ClientInformation, - ) -> Result<(), crate::raw_connection::WritePacketError> { + pub async fn set_client_information(&self, client_information: ClientInformation) { { let mut ecs = self.ecs.lock(); let mut client_information_mut = self.query::<&mut ClientInformation>(&mut ecs); @@ -709,10 +562,10 @@ impl Client { "Sending client information (already logged in): {:?}", client_information ); - self.write_packet(azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { information: client_information.clone() })?; + self.write_packet(game::s_client_information::ServerboundClientInformation { + client_information, + }); } - - Ok(()) } } @@ -760,14 +613,14 @@ impl Client { /// This is a shortcut for /// `bot.component::().name.to_owned()`. pub fn username(&self) -> String { - self.component::().name.to_owned() + self.profile().name.to_owned() } /// Get the Minecraft UUID of this client. /// /// This is a shortcut for `bot.component::().uuid`. pub fn uuid(&self) -> Uuid { - self.component::().uuid + self.profile().uuid } /// Get a map of player UUIDs to their information in the tab list. @@ -777,6 +630,19 @@ impl Client { (*self.component::()).clone() } + /// Returns the [`GameProfile`] for our client. This contains your username, + /// UUID, and skin data. + /// + /// These values are set by the server upon login, which means they might + /// not match up with your actual game profile. Also, note that the username + /// and skin that gets displayed in-game will actually be the ones from + /// the tab list, which you can get from [`Self::tab_list`]. + /// + /// This as also available from the ECS as [`GameProfileComponent`]. + pub fn profile(&self) -> GameProfile { + (*self.component::()).clone() + } + /// A convenience function to get the Minecraft Uuid of a player by their /// username, if they're present in the tab list. /// @@ -857,15 +723,14 @@ impl Client { } } -/// The bundle of components that's shared when we're either in the -/// `configuration` or `game` state. +/// A bundle of components that's inserted right when we switch to the `login` +/// state and stay present on our clients until we disconnect. /// /// For the components that are only present in the `game` state, see /// [`JoinedClientBundle`]. #[derive(Bundle)] pub struct LocalPlayerBundle { pub raw_connection: RawConnection, - pub game_profile: GameProfileComponent, pub client_information: ClientInformation, pub instance_holder: InstanceHolder, @@ -1056,40 +921,3 @@ impl Plugin for AmbiguityLoggerPlugin { }); } } - -/// This plugin group will add all the default plugins necessary for Azalea to -/// work. -pub struct DefaultPlugins; - -impl PluginGroup for DefaultPlugins { - fn build(self) -> PluginGroupBuilder { - #[allow(unused_mut)] - let mut group = PluginGroupBuilder::start::() - .add(AmbiguityLoggerPlugin) - .add(TimePlugin) - .add(PacketPlugin) - .add(AzaleaPlugin) - .add(EntityPlugin) - .add(PhysicsPlugin) - .add(EventsPlugin) - .add(TaskPoolPlugin::default()) - .add(InventoryPlugin) - .add(ChatPlugin) - .add(DisconnectPlugin) - .add(MovementPlugin) - .add(InteractPlugin) - .add(RespawnPlugin) - .add(MiningPlugin) - .add(AttackPlugin) - .add(ChunksPlugin) - .add(TickEndPlugin) - .add(BrandPlugin) - .add(TickBroadcastPlugin) - .add(PongPlugin); - #[cfg(feature = "log")] - { - group = group.add(bevy_log::LogPlugin::default()); - } - group - } -} diff --git a/azalea-client/src/lib.rs b/azalea-client/src/lib.rs index e8f9db01..518d3dd0 100644 --- a/azalea-client/src/lib.rs +++ b/azalea-client/src/lib.rs @@ -15,7 +15,6 @@ mod local_player; pub mod ping; mod player; mod plugins; -pub mod raw_connection; #[doc(hidden)] pub mod test_simulation; @@ -23,8 +22,8 @@ pub mod test_simulation; pub use account::{Account, AccountOpts}; pub use azalea_protocol::common::client_information::ClientInformation; pub use client::{ - Client, DefaultPlugins, InConfigState, InGameState, JoinError, JoinedClientBundle, - LocalPlayerBundle, StartClientOpts, TickBroadcast, start_ecs_runner, + Client, InConfigState, InGameState, JoinError, JoinedClientBundle, LocalPlayerBundle, + StartClientOpts, TickBroadcast, start_ecs_runner, }; pub use events::Event; pub use local_player::{GameProfileComponent, Hunger, InstanceHolder, TabList}; diff --git a/azalea-client/src/plugins/connection.rs b/azalea-client/src/plugins/connection.rs new file mode 100644 index 00000000..99dc7d11 --- /dev/null +++ b/azalea-client/src/plugins/connection.rs @@ -0,0 +1,340 @@ +use std::{fmt::Debug, io::Cursor, mem, sync::Arc}; + +use azalea_crypto::Aes128CfbEnc; +use azalea_protocol::{ + connect::{RawReadConnection, RawWriteConnection}, + packets::{ + ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket, + game::ClientboundGamePacket, login::ClientboundLoginPacket, + }, + read::{ReadPacketError, deserialize_packet}, + write::serialize_packet, +}; +use bevy_app::prelude::*; +use bevy_ecs::prelude::*; +use bevy_tasks::{IoTaskPool, futures_lite::future}; +use thiserror::Error; +use tokio::{ + io::AsyncWriteExt, + net::tcp::OwnedWriteHalf, + sync::mpsc::{self}, +}; +use tracing::{debug, error}; + +use super::packet::{ + config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent, +}; +use crate::packet::{config, game, login}; + +pub struct ConnectionPlugin; +impl Plugin for ConnectionPlugin { + fn build(&self, app: &mut App) { + app.add_systems(PreUpdate, read_packets); + } +} + +pub fn read_packets(ecs: &mut World) { + // receive_game_packet_events: EventWriter, + let mut query = ecs.query::<(Entity, &mut RawConnection)>(); + + let mut entities_handling_packets = Vec::new(); + let mut entities_with_injected_packets = Vec::new(); + for (entity, mut raw_conn) in query.iter_mut(ecs) { + let state = raw_conn.state; + + if !raw_conn.injected_clientbound_packets.is_empty() { + entities_with_injected_packets.push(( + entity, + state, + mem::take(&mut raw_conn.injected_clientbound_packets), + )); + } + + let Some(net_conn) = raw_conn.network.take() else { + // means it's a networkless connection + continue; + }; + entities_handling_packets.push((entity, state, net_conn)); + } + + let mut queued_packet_events = QueuedPacketEvents::default(); + + // handle injected packets, see the comment on + // RawConnection::injected_clientbound_packets for more info + for (entity, mut state, raw_packets) in entities_with_injected_packets { + for raw_packet in raw_packets { + handle_raw_packet( + ecs, + &raw_packet, + entity, + &mut state, + None, + &mut queued_packet_events, + ) + .unwrap(); + + // update the state and for the client + let (_, mut raw_conn_component) = query.get_mut(ecs, entity).unwrap(); + raw_conn_component.state = state; + } + } + + // we pass the mutable state and net_conn into the handlers so they're allowed + // to mutate it + for (entity, mut state, mut net_conn) in entities_handling_packets { + loop { + match net_conn.reader.try_read() { + Ok(Some(raw_packet)) => { + let raw_packet = Arc::<[u8]>::from(raw_packet); + if let Err(e) = handle_raw_packet( + ecs, + &raw_packet, + entity, + &mut state, + Some(&mut net_conn), + &mut queued_packet_events, + ) { + error!("Error reading packet: {e}"); + } + } + Ok(None) => { + // no packets available + break; + } + Err(err) => { + log_for_error(&err); + break; + } + } + } + + // this needs to be done at some point every update, so we do it here right + // after the handlers are called + net_conn.poll_writer(); + + // update the state and network connections for the client + let (_, mut raw_conn_component) = query.get_mut(ecs, entity).unwrap(); + raw_conn_component.state = state; + raw_conn_component.network = Some(net_conn); + } + + queued_packet_events.send_events(ecs); +} + +#[derive(Default)] +pub struct QueuedPacketEvents { + login: Vec, + config: Vec, + game: Vec, +} +impl QueuedPacketEvents { + fn send_events(&mut self, ecs: &mut World) { + ecs.send_event_batch(self.login.drain(..)); + ecs.send_event_batch(self.config.drain(..)); + ecs.send_event_batch(self.game.drain(..)); + } +} + +fn log_for_error(error: &ReadPacketError) { + if !matches!(*error, ReadPacketError::ConnectionClosed) { + error!("Error reading packet from Client: {error:?}"); + } +} + +/// The client's connection to the server. +#[derive(Component)] +pub struct RawConnection { + /// The network connection to the server. + /// + /// This isn't guaranteed to be present, for example during the main packet + /// handlers or at all times during tests. + /// + /// You shouldn't rely on this. Instead, use the events for sending packets + /// like [`SendPacketEvent`](crate::packet::game::SendPacketEvent) / + /// [`SendConfigPacketEvent`](crate::packet::config::SendConfigPacketEvent) + /// / [`SendLoginPacketEvent`](crate::packet::login::SendLoginPacketEvent). + /// + /// To check if we haven't disconnected from the server, use + /// [`Self::is_alive`]. + network: Option, + pub state: ConnectionProtocol, + is_alive: bool, + + /// This exists for internal testing purposes and probably shouldn't be used + /// for normal bots. It's basically a way to make our client think it + /// received a packet from the server without needing to interact with the + /// network. + pub injected_clientbound_packets: Vec>, +} +impl RawConnection { + pub fn new( + reader: RawReadConnection, + writer: RawWriteConnection, + state: ConnectionProtocol, + ) -> Self { + let task_pool = IoTaskPool::get(); + + let (network_packet_writer_tx, network_packet_writer_rx) = + mpsc::unbounded_channel::>(); + + let writer_task = + task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream)); + + let mut conn = Self::new_networkless(state); + conn.network = Some(NetworkConnection { + reader, + enc_cipher: writer.enc_cipher, + network_packet_writer_tx, + writer_task, + }); + + conn + } + + pub fn new_networkless(state: ConnectionProtocol) -> Self { + Self { + network: None, + state, + is_alive: true, + injected_clientbound_packets: Vec::new(), + } + } + + pub fn is_alive(&self) -> bool { + self.is_alive + } + + /// Write a packet to the server without emitting any events. + /// + /// This is called by the handlers for [`SendPacketEvent`], + /// [`SendConfigPacketEvent`], and [`SendLoginPacketEvent`]. + /// + /// [`SendPacketEvent`]: crate::packet::game::SendPacketEvent + /// [`SendConfigPacketEvent`]: crate::packet::config::SendConfigPacketEvent + /// [`SendLoginPacketEvent`]: crate::packet::login::SendLoginPacketEvent + pub fn write( + &mut self, + packet: impl Packet

, + ) -> Result<(), WritePacketError> { + if let Some(network) = &mut self.network { + let packet = packet.into_variant(); + let raw_packet = serialize_packet(&packet)?; + network.write_raw(&raw_packet)?; + } + Ok(()) + } + + pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> { + self.network.as_mut() + } +} + +pub fn handle_raw_packet( + ecs: &mut World, + raw_packet: &[u8], + entity: Entity, + state: &mut ConnectionProtocol, + net_conn: Option<&mut NetworkConnection>, + queued_packet_events: &mut QueuedPacketEvents, +) -> Result<(), Box> { + let stream = &mut Cursor::new(raw_packet); + match state { + ConnectionProtocol::Handshake => { + unreachable!() + } + ConnectionProtocol::Game => { + let packet = Arc::new(deserialize_packet::(stream)?); + game::process_packet(ecs, entity, packet.as_ref()); + queued_packet_events + .game + .push(ReceiveGamePacketEvent { entity, packet }); + } + ConnectionProtocol::Status => { + unreachable!() + } + ConnectionProtocol::Login => { + let packet = Arc::new(deserialize_packet::(stream)?); + login::process_packet(ecs, entity, &packet, state, net_conn); + queued_packet_events + .login + .push(ReceiveLoginPacketEvent { entity, packet }); + } + ConnectionProtocol::Configuration => { + let packet = Arc::new(deserialize_packet::(stream)?); + config::process_packet(ecs, entity, &packet); + queued_packet_events + .config + .push(ReceiveConfigPacketEvent { entity, packet }); + } + }; + + Ok(()) +} + +pub struct NetworkConnection { + reader: RawReadConnection, + // compression threshold is in the RawReadConnection + pub enc_cipher: Option, + + pub writer_task: bevy_tasks::Task<()>, + /// A queue of raw TCP packets to send. These will not be modified further, + /// they should already be serialized and encrypted and everything before + /// being added here. + network_packet_writer_tx: mpsc::UnboundedSender>, +} +impl NetworkConnection { + pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> { + let network_packet = azalea_protocol::write::encode_to_network_packet( + raw_packet, + self.reader.compression_threshold, + &mut self.enc_cipher, + ); + self.network_packet_writer_tx + .send(network_packet.into_boxed_slice())?; + Ok(()) + } + + pub fn poll_writer(&mut self) { + future::block_on(future::poll_once(&mut self.writer_task)); + } + + pub fn set_compression_threshold(&mut self, threshold: Option) { + self.reader.compression_threshold = threshold; + } + /// Set the encryption key that is used to encrypt and decrypt packets. It's + /// the same for both reading and writing. + pub fn set_encryption_key(&mut self, key: [u8; 16]) { + let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key); + self.reader.dec_cipher = Some(dec_cipher); + self.enc_cipher = Some(enc_cipher); + } +} + +async fn write_task( + mut network_packet_writer_rx: mpsc::UnboundedReceiver>, + mut write_half: OwnedWriteHalf, +) { + while let Some(network_packet) = network_packet_writer_rx.recv().await { + if let Err(e) = write_half.write_all(&network_packet).await { + debug!("Error writing packet to server: {e}"); + break; + }; + } +} + +#[derive(Error, Debug)] +pub enum WritePacketError { + #[error("Wrong protocol state: expected {expected:?}, got {got:?}")] + WrongState { + expected: ConnectionProtocol, + got: ConnectionProtocol, + }, + #[error(transparent)] + Encoding(#[from] azalea_protocol::write::PacketEncodeError), + #[error(transparent)] + SendError { + #[from] + #[backtrace] + source: mpsc::error::SendError>, + }, +} diff --git a/azalea-client/src/plugins/disconnect.rs b/azalea-client/src/plugins/disconnect.rs index bd10ac75..09606435 100644 --- a/azalea-client/src/plugins/disconnect.rs +++ b/azalea-client/src/plugins/disconnect.rs @@ -16,8 +16,8 @@ use derive_more::Deref; use tracing::trace; use crate::{ - InstanceHolder, client::JoinedClientBundle, events::LocalPlayerEvents, - raw_connection::RawConnection, + InstanceHolder, client::JoinedClientBundle, connection::RawConnection, + events::LocalPlayerEvents, }; pub struct DisconnectPlugin; diff --git a/azalea-client/src/plugins/events.rs b/azalea-client/src/plugins/events.rs index 64dcf4f5..85f50ea5 100644 --- a/azalea-client/src/plugins/events.rs +++ b/azalea-client/src/plugins/events.rs @@ -27,7 +27,7 @@ use crate::{ chat::{ChatPacket, ChatReceivedEvent}, disconnect::DisconnectEvent, packet::game::{ - AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceivePacketEvent, RemovePlayerEvent, + AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceiveGamePacketEvent, RemovePlayerEvent, UpdatePlayerEvent, }, }; @@ -157,7 +157,7 @@ impl Plugin for EventsPlugin { ) .add_systems( PreUpdate, - init_listener.before(crate::packet::game::process_packet_events), + init_listener.before(super::connection::read_packets), ) .add_systems(GameTick, tick_listener); } @@ -217,7 +217,7 @@ pub fn tick_listener(query: Query<&LocalPlayerEvents, With>) { pub fn packet_listener( query: Query<&LocalPlayerEvents>, - mut events: EventReader, + mut events: EventReader, ) { for event in events.read() { if let Ok(local_player_events) = query.get(event.entity) { diff --git a/azalea-client/src/plugins/login.rs b/azalea-client/src/plugins/login.rs new file mode 100644 index 00000000..58c5f628 --- /dev/null +++ b/azalea-client/src/plugins/login.rs @@ -0,0 +1,120 @@ +use azalea_auth::sessionserver::ClientSessionServerError; +use azalea_protocol::packets::login::{ClientboundHello, ServerboundKey}; +use bevy_app::prelude::*; +use bevy_ecs::prelude::*; +use bevy_tasks::{IoTaskPool, Task, futures_lite::future}; +use tracing::error; + +use super::{connection::RawConnection, packet::login::ReceiveHelloEvent}; +use crate::{Account, JoinError}; + +pub struct LoginPlugin; +impl Plugin for LoginPlugin { + fn build(&self, app: &mut App) { + app.add_observer(handle_receive_hello_event) + .add_systems(Update, poll_auth_task); + } +} + +fn handle_receive_hello_event(trigger: Trigger, mut commands: Commands) { + let task_pool = IoTaskPool::get(); + + let account = trigger.account.clone(); + let packet = trigger.packet.clone(); + let player = trigger.entity(); + + let task = task_pool.spawn(auth_with_account(account, packet)); + commands.entity(player).insert(AuthTask(task)); +} + +fn poll_auth_task( + mut commands: Commands, + mut query: Query<(Entity, &mut AuthTask, &mut RawConnection)>, +) { + for (entity, mut auth_task, mut raw_conn) in query.iter_mut() { + if let Some(poll_res) = future::block_on(future::poll_once(&mut auth_task.0)) { + commands.entity(entity).remove::(); + match poll_res { + Ok((packet, private_key)) => { + // we use this instead of SendLoginPacketEvent to ensure that it's sent right + // before encryption is enabled. i guess another option would be to make a + // Trigger+observer for set_encryption_key; the current implementation is + // simpler though. + if let Err(e) = raw_conn.write(packet) { + error!("Error sending key packet: {e:?}"); + } + if let Some(net_conn) = raw_conn.net_conn() { + net_conn.set_encryption_key(private_key); + } + } + Err(err) => { + error!("Error during authentication: {err:?}"); + } + } + } + } +} + +type PrivateKey = [u8; 16]; + +#[derive(Component)] +pub struct AuthTask(Task>); + +pub async fn auth_with_account( + account: Account, + packet: ClientboundHello, +) -> Result<(ServerboundKey, PrivateKey), JoinError> { + let Ok(encrypt_res) = azalea_crypto::encrypt(&packet.public_key, &packet.challenge) else { + return Err(JoinError::EncryptionError(packet)); + }; + let key_packet = ServerboundKey { + key_bytes: encrypt_res.encrypted_public_key, + encrypted_challenge: encrypt_res.encrypted_challenge, + }; + let private_key = encrypt_res.secret_key; + + let Some(access_token) = &account.access_token else { + // offline mode account, no need to do auth + return Ok((key_packet, private_key)); + }; + + // keep track of the number of times we tried authenticating so we can give up + // after too many + let mut attempts: usize = 1; + + while let Err(err) = { + let access_token = access_token.lock().clone(); + + let uuid = &account + .uuid + .expect("Uuid must be present if access token is present."); + + azalea_auth::sessionserver::join( + &access_token, + &packet.public_key, + &private_key, + uuid, + &packet.server_id, + ) + .await + } { + if attempts >= 2 { + // if this is the second attempt and we failed + // both times, give up + return Err(err.into()); + } + if matches!( + err, + ClientSessionServerError::InvalidSession | ClientSessionServerError::ForbiddenOperation + ) { + // uh oh, we got an invalid session and have + // to reauthenticate now + account.refresh().await?; + } else { + return Err(err.into()); + } + attempts += 1; + } + + Ok((key_packet, private_key)) +} diff --git a/azalea-client/src/plugins/mod.rs b/azalea-client/src/plugins/mod.rs index b5005b22..1afe1729 100644 --- a/azalea-client/src/plugins/mod.rs +++ b/azalea-client/src/plugins/mod.rs @@ -1,11 +1,15 @@ +use bevy_app::{PluginGroup, PluginGroupBuilder}; + pub mod attack; pub mod brand; pub mod chat; pub mod chunks; +pub mod connection; pub mod disconnect; pub mod events; pub mod interact; pub mod inventory; +pub mod login; pub mod mining; pub mod movement; pub mod packet; @@ -13,3 +17,42 @@ pub mod pong; pub mod respawn; pub mod task_pool; pub mod tick_end; + +/// This plugin group will add all the default plugins necessary for Azalea to +/// work. +pub struct DefaultPlugins; + +impl PluginGroup for DefaultPlugins { + fn build(self) -> PluginGroupBuilder { + #[allow(unused_mut)] + let mut group = PluginGroupBuilder::start::() + .add(crate::client::AmbiguityLoggerPlugin) + .add(bevy_time::TimePlugin) + .add(packet::PacketPlugin) + .add(crate::client::AzaleaPlugin) + .add(azalea_entity::EntityPlugin) + .add(azalea_physics::PhysicsPlugin) + .add(events::EventsPlugin) + .add(task_pool::TaskPoolPlugin::default()) + .add(inventory::InventoryPlugin) + .add(chat::ChatPlugin) + .add(disconnect::DisconnectPlugin) + .add(movement::MovementPlugin) + .add(interact::InteractPlugin) + .add(respawn::RespawnPlugin) + .add(mining::MiningPlugin) + .add(attack::AttackPlugin) + .add(chunks::ChunksPlugin) + .add(tick_end::TickEndPlugin) + .add(brand::BrandPlugin) + .add(crate::client::TickBroadcastPlugin) + .add(pong::PongPlugin) + .add(connection::ConnectionPlugin) + .add(login::LoginPlugin); + #[cfg(feature = "log")] + { + group = group.add(bevy_log::LogPlugin::default()); + } + group + } +} diff --git a/azalea-client/src/plugins/packet/config/events.rs b/azalea-client/src/plugins/packet/config/events.rs index 24a1157b..012ea0c1 100644 --- a/azalea-client/src/plugins/packet/config/events.rs +++ b/azalea-client/src/plugins/packet/config/events.rs @@ -1,23 +1,20 @@ -use std::io::Cursor; +use std::sync::Arc; -use azalea_protocol::{ - packets::{ - Packet, - config::{ClientboundConfigPacket, ServerboundConfigPacket}, - }, - read::deserialize_packet, +use azalea_protocol::packets::{ + Packet, + config::{ClientboundConfigPacket, ServerboundConfigPacket}, }; use bevy_ecs::prelude::*; use tracing::{debug, error}; -use crate::{InConfigState, raw_connection::RawConnection}; +use crate::{InConfigState, connection::RawConnection}; #[derive(Event, Debug, Clone)] pub struct ReceiveConfigPacketEvent { /// The client entity that received the packet. pub entity: Entity, /// The packet that was actually received. - pub packet: ClientboundConfigPacket, + pub packet: Arc, } /// An event for sending a packet to the server while we're in the @@ -39,7 +36,7 @@ pub fn handle_outgoing_packets_observer( mut query: Query<(&mut RawConnection, Option<&InConfigState>)>, ) { let event = trigger.event(); - if let Ok((raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) { + if let Ok((mut raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) { if in_configuration_state.is_none() { error!( "Tried to send a configuration packet {:?} while not in configuration state", @@ -48,7 +45,7 @@ pub fn handle_outgoing_packets_observer( return; } debug!("Sending packet: {:?}", event.packet); - if let Err(e) = raw_conn.write_packet(event.packet.clone()) { + if let Err(e) = raw_conn.write(event.packet.clone()) { error!("Failed to send packet: {e}"); } } @@ -64,61 +61,6 @@ pub fn handle_outgoing_packets( } } -pub fn emit_receive_config_packet_events( - query: Query<(Entity, &RawConnection), With>, - mut packet_events: ResMut>, -) { - // we manually clear and send the events at the beginning of each update - // since otherwise it'd cause issues with events in process_packet_events - // running twice - packet_events.clear(); - for (player_entity, raw_conn) in &query { - let packets_lock = raw_conn.incoming_packet_queue(); - let mut packets = packets_lock.lock(); - if !packets.is_empty() { - let mut packets_read = 0; - for raw_packet in packets.iter() { - packets_read += 1; - let packet = match deserialize_packet::(&mut Cursor::new( - raw_packet, - )) { - Ok(packet) => packet, - Err(err) => { - error!("failed to read packet: {err:?}"); - debug!("packet bytes: {raw_packet:?}"); - continue; - } - }; - - let should_interrupt = packet_interrupts(&packet); - - packet_events.send(ReceiveConfigPacketEvent { - entity: player_entity, - packet, - }); - - if should_interrupt { - break; - } - } - packets.drain(0..packets_read); - } - } -} - -/// Whether the given packet should make us stop deserializing the received -/// packets until next update. -/// -/// This is used for packets that can switch the client state. -fn packet_interrupts(packet: &ClientboundConfigPacket) -> bool { - matches!( - packet, - ClientboundConfigPacket::FinishConfiguration(_) - | ClientboundConfigPacket::Disconnect(_) - | ClientboundConfigPacket::Transfer(_) - ) -} - /// A Bevy trigger that's sent when our client receives a [`ClientboundPing`] /// packet in the config state. /// diff --git a/azalea-client/src/plugins/packet/config/mod.rs b/azalea-client/src/plugins/packet/config/mod.rs index ae601793..3c7dcbb4 100644 --- a/azalea-client/src/plugins/packet/config/mod.rs +++ b/azalea-client/src/plugins/packet/config/mod.rs @@ -1,65 +1,61 @@ mod events; +use std::io::Cursor; + use azalea_entity::LocalEntity; use azalea_protocol::packets::ConnectionProtocol; use azalea_protocol::packets::config::*; +use azalea_protocol::read::ReadPacketError; +use azalea_protocol::read::deserialize_packet; use bevy_ecs::prelude::*; -use bevy_ecs::system::SystemState; pub use events::*; use tracing::{debug, warn}; use super::as_system; use crate::client::InConfigState; +use crate::connection::RawConnection; use crate::disconnect::DisconnectEvent; use crate::packet::game::KeepAliveEvent; use crate::packet::game::ResourcePackEvent; -use crate::raw_connection::RawConnection; use crate::{InstanceHolder, declare_packet_handlers}; -pub fn process_packet_events(ecs: &mut World) { - let mut events_owned = Vec::new(); - let mut system_state: SystemState> = - SystemState::new(ecs); - let mut events = system_state.get_mut(ecs); - for ReceiveConfigPacketEvent { - entity: player_entity, - packet, - } in events.read() - { - // we do this so `ecs` isn't borrowed for the whole loop - events_owned.push((*player_entity, packet.clone())); - } - for (player_entity, packet) in events_owned { - let mut handler = ConfigPacketHandler { - player: player_entity, - ecs, - }; +pub fn process_raw_packet( + ecs: &mut World, + player: Entity, + raw_packet: &[u8], +) -> Result<(), Box> { + let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?; + process_packet(ecs, player, &packet); + Ok(()) +} - declare_packet_handlers!( - ClientboundConfigPacket, - packet, - handler, - [ - cookie_request, - custom_payload, - disconnect, - finish_configuration, - keep_alive, - ping, - reset_chat, - registry_data, - resource_pack_pop, - resource_pack_push, - store_cookie, - transfer, - update_enabled_features, - update_tags, - select_known_packs, - custom_report_details, - server_links, - ] - ); - } +pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundConfigPacket) { + let mut handler = ConfigPacketHandler { player, ecs }; + + declare_packet_handlers!( + ClientboundConfigPacket, + packet, + handler, + [ + cookie_request, + custom_payload, + disconnect, + finish_configuration, + keep_alive, + ping, + reset_chat, + registry_data, + resource_pack_pop, + resource_pack_push, + store_cookie, + transfer, + update_enabled_features, + update_tags, + select_known_packs, + custom_report_details, + server_links, + ] + ); } pub struct ConfigPacketHandler<'a> { @@ -67,31 +63,33 @@ pub struct ConfigPacketHandler<'a> { pub player: Entity, } impl ConfigPacketHandler<'_> { - pub fn registry_data(&mut self, p: ClientboundRegistryData) { + pub fn registry_data(&mut self, p: &ClientboundRegistryData) { as_system::>(self.ecs, |mut query| { let instance_holder = query.get_mut(self.player).unwrap(); let mut instance = instance_holder.instance.write(); // add the new registry data - instance.registries.append(p.registry_id, p.entries); + instance + .registries + .append(p.registry_id.clone(), p.entries.clone()); }); } - pub fn custom_payload(&mut self, p: ClientboundCustomPayload) { + pub fn custom_payload(&mut self, p: &ClientboundCustomPayload) { debug!("Got custom payload packet {p:?}"); } - pub fn disconnect(&mut self, p: ClientboundDisconnect) { + pub fn disconnect(&mut self, p: &ClientboundDisconnect) { warn!("Got disconnect packet {p:?}"); as_system::>(self.ecs, |mut events| { events.send(DisconnectEvent { entity: self.player, - reason: Some(p.reason), + reason: Some(p.reason.clone()), }); }); } - pub fn finish_configuration(&mut self, p: ClientboundFinishConfiguration) { + pub fn finish_configuration(&mut self, p: &ClientboundFinishConfiguration) { debug!("got FinishConfiguration packet: {p:?}"); as_system::<(Commands, Query<&mut RawConnection>)>( @@ -99,12 +97,11 @@ impl ConfigPacketHandler<'_> { |(mut commands, mut query)| { let mut raw_conn = query.get_mut(self.player).unwrap(); - raw_conn - .write_packet(ServerboundFinishConfiguration) - .expect( - "we should be in the right state and encoding this packet shouldn't fail", - ); - raw_conn.set_state(ConnectionProtocol::Game); + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundFinishConfiguration, + )); + raw_conn.state = ConnectionProtocol::Game; // these components are added now that we're going to be in the Game state commands @@ -120,34 +117,33 @@ impl ConfigPacketHandler<'_> { ); } - pub fn keep_alive(&mut self, p: ClientboundKeepAlive) { + pub fn keep_alive(&mut self, p: &ClientboundKeepAlive) { debug!( "Got keep alive packet (in configuration) {p:?} for {:?}", self.player ); - as_system::<(Query<&RawConnection>, EventWriter<_>)>(self.ecs, |(query, mut events)| { - let raw_conn = query.get(self.player).unwrap(); - + as_system::<(Commands, EventWriter<_>)>(self.ecs, |(mut commands, mut events)| { events.send(KeepAliveEvent { entity: self.player, id: p.id, }); - raw_conn - .write_packet(ServerboundKeepAlive { id: p.id }) - .unwrap(); + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundKeepAlive { id: p.id }, + )); }); } - pub fn ping(&mut self, p: ClientboundPing) { + pub fn ping(&mut self, p: &ClientboundPing) { debug!("Got ping packet (in configuration) {p:?}"); as_system::(self.ecs, |mut commands| { - commands.trigger_targets(ConfigPingEvent(p), self.player); + commands.trigger_targets(ConfigPingEvent(p.clone()), self.player); }); } - pub fn resource_pack_push(&mut self, p: ClientboundResourcePackPush) { + pub fn resource_pack_push(&mut self, p: &ClientboundResourcePackPush) { debug!("Got resource pack push packet {p:?}"); as_system::>(self.ecs, |mut events| { @@ -162,66 +158,64 @@ impl ConfigPacketHandler<'_> { }); } - pub fn resource_pack_pop(&mut self, p: ClientboundResourcePackPop) { + pub fn resource_pack_pop(&mut self, p: &ClientboundResourcePackPop) { debug!("Got resource pack pop packet {p:?}"); } - pub fn update_enabled_features(&mut self, p: ClientboundUpdateEnabledFeatures) { + pub fn update_enabled_features(&mut self, p: &ClientboundUpdateEnabledFeatures) { debug!("Got update enabled features packet {p:?}"); } - pub fn update_tags(&mut self, _p: ClientboundUpdateTags) { + pub fn update_tags(&mut self, _p: &ClientboundUpdateTags) { debug!("Got update tags packet"); } - pub fn cookie_request(&mut self, p: ClientboundCookieRequest) { + pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) { debug!("Got cookie request packet {p:?}"); - as_system::>(self.ecs, |query| { - let raw_conn = query.get(self.player).unwrap(); - - raw_conn - .write_packet(ServerboundCookieResponse { - key: p.key, + as_system::(self.ecs, |mut commands| { + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundCookieResponse { + key: p.key.clone(), // cookies aren't implemented payload: None, - }) - .unwrap(); + }, + )); }); } - pub fn reset_chat(&mut self, p: ClientboundResetChat) { + pub fn reset_chat(&mut self, p: &ClientboundResetChat) { debug!("Got reset chat packet {p:?}"); } - pub fn store_cookie(&mut self, p: ClientboundStoreCookie) { + pub fn store_cookie(&mut self, p: &ClientboundStoreCookie) { debug!("Got store cookie packet {p:?}"); } - pub fn transfer(&mut self, p: ClientboundTransfer) { + pub fn transfer(&mut self, p: &ClientboundTransfer) { debug!("Got transfer packet {p:?}"); } - pub fn select_known_packs(&mut self, p: ClientboundSelectKnownPacks) { + pub fn select_known_packs(&mut self, p: &ClientboundSelectKnownPacks) { debug!("Got select known packs packet {p:?}"); - as_system::>(self.ecs, |query| { - let raw_conn = query.get(self.player).unwrap(); - + as_system::(self.ecs, |mut commands| { // resource pack management isn't implemented - raw_conn - .write_packet(ServerboundSelectKnownPacks { + commands.trigger(SendConfigPacketEvent::new( + self.player, + ServerboundSelectKnownPacks { known_packs: vec![], - }) - .unwrap(); + }, + )); }); } - pub fn server_links(&mut self, p: ClientboundServerLinks) { + pub fn server_links(&mut self, p: &ClientboundServerLinks) { debug!("Got server links packet {p:?}"); } - pub fn custom_report_details(&mut self, p: ClientboundCustomReportDetails) { + pub fn custom_report_details(&mut self, p: &ClientboundCustomReportDetails) { debug!("Got custom report details packet {p:?}"); } } diff --git a/azalea-client/src/plugins/packet/game/events.rs b/azalea-client/src/plugins/packet/game/events.rs index ad81f9bd..f5d72946 100644 --- a/azalea-client/src/plugins/packet/game/events.rs +++ b/azalea-client/src/plugins/packet/game/events.rs @@ -1,24 +1,18 @@ -use std::{ - io::Cursor, - sync::{Arc, Weak}, -}; +use std::sync::{Arc, Weak}; use azalea_chat::FormattedText; use azalea_core::resource_location::ResourceLocation; -use azalea_protocol::{ - packets::{ - Packet, - game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket}, - }, - read::deserialize_packet, +use azalea_protocol::packets::{ + Packet, + game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket}, }; use azalea_world::Instance; use bevy_ecs::prelude::*; use parking_lot::RwLock; -use tracing::{debug, error}; +use tracing::error; use uuid::Uuid; -use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection}; +use crate::{PlayerInfo, client::InGameState, connection::RawConnection}; /// An event that's sent when we receive a packet. /// ``` @@ -41,7 +35,7 @@ use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection}; /// } /// ``` #[derive(Event, Debug, Clone)] -pub struct ReceivePacketEvent { +pub struct ReceiveGamePacketEvent { /// The client entity that received the packet. pub entity: Entity, /// The packet that was actually received. @@ -67,7 +61,7 @@ pub fn handle_outgoing_packets_observer( ) { let event = trigger.event(); - if let Ok((raw_connection, in_game_state)) = query.get_mut(event.sent_by) { + if let Ok((mut raw_connection, in_game_state)) = query.get_mut(event.sent_by) { if in_game_state.is_none() { error!( "Tried to send a game packet {:?} while not in game state", @@ -77,7 +71,7 @@ pub fn handle_outgoing_packets_observer( } // debug!("Sending packet: {:?}", event.packet); - if let Err(e) = raw_connection.write_packet(event.packet.clone()) { + if let Err(e) = raw_connection.write(event.packet.clone()) { error!("Failed to send packet: {e}"); } } @@ -91,61 +85,6 @@ pub fn handle_outgoing_packets(mut commands: Commands, mut events: EventReader>, - mut packet_events: ResMut>, -) { - // we manually clear and send the events at the beginning of each update - // since otherwise it'd cause issues with events in process_packet_events - // running twice - packet_events.clear(); - for (player_entity, raw_connection) in &query { - let packets_lock = raw_connection.incoming_packet_queue(); - let mut packets = packets_lock.lock(); - if !packets.is_empty() { - let mut packets_read = 0; - for raw_packet in packets.iter() { - packets_read += 1; - let packet = - match deserialize_packet::(&mut Cursor::new(raw_packet)) - { - Ok(packet) => packet, - Err(err) => { - error!("failed to read packet: {err:?}"); - debug!("packet bytes: {raw_packet:?}"); - continue; - } - }; - - let should_interrupt = packet_interrupts(&packet); - - packet_events.send(ReceivePacketEvent { - entity: player_entity, - packet: Arc::new(packet), - }); - - if should_interrupt { - break; - } - } - packets.drain(0..packets_read); - } - } -} - -/// Whether the given packet should make us stop deserializing the received -/// packets until next update. -/// -/// This is used for packets that can switch the client state. -fn packet_interrupts(packet: &ClientboundGamePacket) -> bool { - matches!( - packet, - ClientboundGamePacket::StartConfiguration(_) - | ClientboundGamePacket::Disconnect(_) - | ClientboundGamePacket::Transfer(_) - ) -} - /// A player joined the game (or more specifically, was added to the tab /// list of a local player). #[derive(Event, Debug, Clone)] diff --git a/azalea-client/src/plugins/packet/game/mod.rs b/azalea-client/src/plugins/packet/game/mod.rs index 8d896e65..60531d3b 100644 --- a/azalea-client/src/plugins/packet/game/mod.rs +++ b/azalea-client/src/plugins/packet/game/mod.rs @@ -32,171 +32,150 @@ use crate::{ }, movement::{KnockbackEvent, KnockbackType}, packet::as_system, - raw_connection::RawConnection, }; -pub fn process_packet_events(ecs: &mut World) { - let mut events_owned = Vec::<(Entity, Arc)>::new(); +pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundGamePacket) { + let mut handler = GamePacketHandler { player, ecs }; - { - let mut system_state = SystemState::>::new(ecs); - let mut events = system_state.get_mut(ecs); - for ReceivePacketEvent { - entity: player_entity, - packet, - } in events.read() - { - // we do this so `ecs` isn't borrowed for the whole loop - events_owned.push((*player_entity, packet.clone())); - } - } - - for (player_entity, packet) in events_owned { - let mut handler = GamePacketHandler { - player: player_entity, - ecs, - }; - - // the order of these doesn't matter, that's decided by the protocol library - declare_packet_handlers!( - ClientboundGamePacket, - packet.as_ref(), - handler, - [ - login, - set_chunk_cache_radius, - chunk_batch_start, - chunk_batch_finished, - custom_payload, - change_difficulty, - commands, - player_abilities, - set_cursor_item, - update_tags, - disconnect, - update_recipes, - entity_event, - player_position, - player_info_update, - player_info_remove, - set_chunk_cache_center, - chunks_biomes, - light_update, - level_chunk_with_light, - add_entity, - set_entity_data, - update_attributes, - set_entity_motion, - set_entity_link, - initialize_border, - set_time, - set_default_spawn_position, - set_health, - set_experience, - teleport_entity, - update_advancements, - rotate_head, - move_entity_pos, - move_entity_pos_rot, - move_entity_rot, - keep_alive, - remove_entities, - player_chat, - system_chat, - disguised_chat, - sound, - level_event, - block_update, - animate, - section_blocks_update, - game_event, - level_particles, - server_data, - set_equipment, - update_mob_effect, - award_stats, - block_changed_ack, - block_destruction, - block_entity_data, - block_event, - boss_event, - command_suggestions, - container_set_content, - container_set_data, - container_set_slot, - container_close, - cooldown, - custom_chat_completions, - delete_chat, - explode, - forget_level_chunk, - horse_screen_open, - map_item_data, - merchant_offers, - move_vehicle, - open_book, - open_screen, - open_sign_editor, - ping, - place_ghost_recipe, - player_combat_end, - player_combat_enter, - player_combat_kill, - player_look_at, - remove_mob_effect, - resource_pack_push, - resource_pack_pop, - respawn, - start_configuration, - entity_position_sync, - select_advancements_tab, - set_action_bar_text, - set_border_center, - set_border_lerp_size, - set_border_size, - set_border_warning_delay, - set_border_warning_distance, - set_camera, - set_display_objective, - set_objective, - set_passengers, - set_player_team, - set_score, - set_simulation_distance, - set_subtitle_text, - set_title_text, - set_titles_animation, - clear_titles, - sound_entity, - stop_sound, - tab_list, - tag_query, - take_item_entity, - bundle_delimiter, - damage_event, - hurt_animation, - ticking_state, - ticking_step, - reset_score, - cookie_request, - debug_sample, - pong_response, - store_cookie, - transfer, - move_minecart_along_track, - set_held_slot, - set_player_inventory, - projectile_power, - custom_report_details, - server_links, - player_rotation, - recipe_book_add, - recipe_book_remove, - recipe_book_settings, - test_instance_block_status, - ] - ); - } + // the order of these doesn't matter, that's decided by the protocol library + declare_packet_handlers!( + ClientboundGamePacket, + packet, + handler, + [ + login, + set_chunk_cache_radius, + chunk_batch_start, + chunk_batch_finished, + custom_payload, + change_difficulty, + commands, + player_abilities, + set_cursor_item, + update_tags, + disconnect, + update_recipes, + entity_event, + player_position, + player_info_update, + player_info_remove, + set_chunk_cache_center, + chunks_biomes, + light_update, + level_chunk_with_light, + add_entity, + set_entity_data, + update_attributes, + set_entity_motion, + set_entity_link, + initialize_border, + set_time, + set_default_spawn_position, + set_health, + set_experience, + teleport_entity, + update_advancements, + rotate_head, + move_entity_pos, + move_entity_pos_rot, + move_entity_rot, + keep_alive, + remove_entities, + player_chat, + system_chat, + disguised_chat, + sound, + level_event, + block_update, + animate, + section_blocks_update, + game_event, + level_particles, + server_data, + set_equipment, + update_mob_effect, + award_stats, + block_changed_ack, + block_destruction, + block_entity_data, + block_event, + boss_event, + command_suggestions, + container_set_content, + container_set_data, + container_set_slot, + container_close, + cooldown, + custom_chat_completions, + delete_chat, + explode, + forget_level_chunk, + horse_screen_open, + map_item_data, + merchant_offers, + move_vehicle, + open_book, + open_screen, + open_sign_editor, + ping, + place_ghost_recipe, + player_combat_end, + player_combat_enter, + player_combat_kill, + player_look_at, + remove_mob_effect, + resource_pack_push, + resource_pack_pop, + respawn, + start_configuration, + entity_position_sync, + select_advancements_tab, + set_action_bar_text, + set_border_center, + set_border_lerp_size, + set_border_size, + set_border_warning_delay, + set_border_warning_distance, + set_camera, + set_display_objective, + set_objective, + set_passengers, + set_player_team, + set_score, + set_simulation_distance, + set_subtitle_text, + set_title_text, + set_titles_animation, + clear_titles, + sound_entity, + stop_sound, + tab_list, + tag_query, + take_item_entity, + bundle_delimiter, + damage_event, + hurt_animation, + ticking_state, + ticking_step, + reset_score, + cookie_request, + debug_sample, + pong_response, + store_cookie, + transfer, + move_minecart_along_track, + set_held_slot, + set_player_inventory, + projectile_power, + custom_report_details, + server_links, + player_rotation, + recipe_book_add, + recipe_book_remove, + recipe_book_settings, + test_instance_block_status, + ] + ); } pub struct GamePacketHandler<'a> { @@ -342,7 +321,7 @@ impl GamePacketHandler<'_> { client_information ); commands.trigger(SendPacketEvent::new(self.player, - azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { information: client_information.clone() }, + azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { client_information: client_information.clone() }, )); }, ); @@ -1506,9 +1485,11 @@ impl GamePacketHandler<'_> { pub fn start_configuration(&mut self, _p: &ClientboundStartConfiguration) { debug!("Got start configuration packet"); - as_system::<(Query<&RawConnection>, Commands)>(self.ecs, |(query, mut commands)| { - let raw_conn = query.get(self.player).unwrap(); - let _ = raw_conn.write_packet(ServerboundConfigurationAcknowledged); + as_system::(self.ecs, |mut commands| { + commands.trigger(SendPacketEvent::new( + self.player, + ServerboundConfigurationAcknowledged, + )); commands .entity(self.player) diff --git a/azalea-client/src/plugins/packet/login/events.rs b/azalea-client/src/plugins/packet/login/events.rs index e3d8958e..70e9b775 100644 --- a/azalea-client/src/plugins/packet/login/events.rs +++ b/azalea-client/src/plugins/packet/login/events.rs @@ -1,24 +1,20 @@ use std::sync::Arc; -use azalea_protocol::packets::login::ClientboundLoginPacket; +use azalea_protocol::packets::login::{ClientboundHello, ClientboundLoginPacket}; use bevy_ecs::prelude::*; -/// An event that's sent when we receive a login packet from the server. Note -/// that if you want to handle this in a system, you must add -/// `.before(azalea::packet::login::process_packet_events)` to it -/// because that system clears the events. -#[derive(Event, Debug, Clone)] -pub struct LoginPacketEvent { - /// The client entity that received the packet. - pub entity: Entity, - /// The packet that was actually received. - pub packet: Arc, -} +use crate::Account; #[derive(Event, Debug, Clone)] pub struct ReceiveLoginPacketEvent { /// The client entity that received the packet. pub entity: Entity, /// The packet that was actually received. - pub packet: ClientboundLoginPacket, + pub packet: Arc, +} + +#[derive(Event)] +pub struct ReceiveHelloEvent { + pub account: Account, + pub packet: ClientboundHello, } diff --git a/azalea-client/src/plugins/packet/login/mod.rs b/azalea-client/src/plugins/packet/login/mod.rs index b42b9875..79990f4a 100644 --- a/azalea-client/src/plugins/packet/login/mod.rs +++ b/azalea-client/src/plugins/packet/login/mod.rs @@ -6,21 +6,53 @@ mod events; use std::collections::HashSet; use azalea_protocol::packets::{ - Packet, + ConnectionProtocol, Packet, login::{ ClientboundCookieRequest, ClientboundCustomQuery, ClientboundHello, ClientboundLoginCompression, ClientboundLoginDisconnect, ClientboundLoginFinished, - ClientboundLoginPacket, ServerboundCustomQueryAnswer, ServerboundLoginPacket, + ClientboundLoginPacket, ServerboundCookieResponse, ServerboundCustomQueryAnswer, + ServerboundLoginAcknowledged, ServerboundLoginPacket, }, }; -use bevy_ecs::{prelude::*, system::SystemState}; +use bevy_ecs::prelude::*; use derive_more::{Deref, DerefMut}; pub use events::*; -use tokio::sync::mpsc; -use tracing::error; +use tracing::{debug, error}; use super::as_system; -use crate::declare_packet_handlers; +use crate::{ + Account, GameProfileComponent, InConfigState, connection::NetworkConnection, + declare_packet_handlers, disconnect::DisconnectEvent, +}; + +pub fn process_packet( + ecs: &mut World, + player: Entity, + packet: &ClientboundLoginPacket, + state: &mut ConnectionProtocol, + net_conn: Option<&mut NetworkConnection>, +) { + let mut handler = LoginPacketHandler { + player, + ecs, + state, + net_conn, + }; + + declare_packet_handlers!( + ClientboundLoginPacket, + packet, + handler, + [ + hello, + login_disconnect, + login_finished, + login_compression, + custom_query, + cookie_request + ] + ); +} /// Event for sending a login packet to the server. #[derive(Event)] @@ -35,103 +67,113 @@ impl SendLoginPacketEvent { } } -#[derive(Component)] -pub struct LoginSendPacketQueue { - pub tx: mpsc::UnboundedSender, -} - /// A marker component for local players that are currently in the /// `login` state. #[derive(Component, Clone, Debug)] pub struct InLoginState; -pub fn handle_send_packet_event( - mut send_packet_events: EventReader, - mut query: Query<&mut LoginSendPacketQueue>, -) { - for event in send_packet_events.read() { - if let Ok(queue) = query.get_mut(event.entity) { - let _ = queue.tx.send(event.packet.clone()); - } else { - error!("Sent SendPacketEvent for entity that doesn't have a LoginSendPacketQueue"); - } - } -} - /// Plugins can add to this set if they want to handle a custom query packet /// themselves. This component removed after the login state ends. #[derive(Component, Default, Debug, Deref, DerefMut)] pub struct IgnoreQueryIds(HashSet); -pub fn process_packet_events(ecs: &mut World) { - let mut events_owned = Vec::new(); - let mut system_state: SystemState> = SystemState::new(ecs); - let mut events = system_state.get_mut(ecs); - for ReceiveLoginPacketEvent { - entity: player_entity, - packet, - } in events.read() - { - // we do this so `ecs` isn't borrowed for the whole loop - events_owned.push((*player_entity, packet.clone())); - } - for (player_entity, packet) in events_owned { - let mut handler = LoginPacketHandler { - player: player_entity, - ecs, - }; - - declare_packet_handlers!( - ClientboundLoginPacket, - packet, - handler, - [ - hello, - login_disconnect, - login_finished, - login_compression, - custom_query, - cookie_request - ] - ); - } -} - pub struct LoginPacketHandler<'a> { pub ecs: &'a mut World, pub player: Entity, + pub state: &'a mut ConnectionProtocol, + pub net_conn: Option<&'a mut NetworkConnection>, } impl LoginPacketHandler<'_> { - pub fn hello(&mut self, _p: ClientboundHello) {} - pub fn login_disconnect(&mut self, _p: ClientboundLoginDisconnect) {} - pub fn login_finished(&mut self, _p: ClientboundLoginFinished) {} - pub fn login_compression(&mut self, _p: ClientboundLoginCompression) { - // as_system::>(self.ecs, |mut query| { - // if let Ok(mut raw_conn) = query.get_mut(self.player) { - // raw_conn.set_compression_threshold(p.compression_threshold); - // } - // }); - } - pub fn custom_query(&mut self, p: ClientboundCustomQuery) { - as_system::<(EventWriter, Query<&IgnoreQueryIds>)>( - self.ecs, - |(mut events, query)| { - let ignore_query_ids = query.get(self.player).ok().map(|x| x.0.clone()); - if let Some(ignore_query_ids) = ignore_query_ids { - if ignore_query_ids.contains(&p.transaction_id) { - return; - } - } + pub fn hello(&mut self, p: &ClientboundHello) { + debug!("Got encryption request {p:?}"); - events.send(SendLoginPacketEvent::new( - self.player, - ServerboundCustomQueryAnswer { - transaction_id: p.transaction_id, - data: None, - }, - )); - }, - ); + as_system::<(Commands, Query<&Account>)>(self.ecs, |(mut commands, query)| { + let Ok(account) = query.get(self.player) else { + error!( + "Expected Account component to be present on player when receiving hello packet." + ); + return; + }; + commands.trigger_targets( + ReceiveHelloEvent { + account: account.clone(), + packet: p.clone(), + }, + self.player, + ); + }); + } + pub fn login_disconnect(&mut self, p: &ClientboundLoginDisconnect) { + debug!("Got disconnect {:?}", p); + + as_system::>(self.ecs, |mut events| { + events.send(DisconnectEvent { + entity: self.player, + reason: Some(p.reason.clone()), + }); + }); + } + pub fn login_finished(&mut self, p: &ClientboundLoginFinished) { + debug!( + "Got profile {:?}. handshake is finished and we're now switching to the configuration state", + p.game_profile + ); + + as_system::(self.ecs, |mut commands| { + commands.trigger(SendLoginPacketEvent::new( + self.player, + ServerboundLoginAcknowledged, + )); + + commands + .entity(self.player) + .remove::() + .remove::() + .insert(InConfigState) + .insert(GameProfileComponent(p.game_profile.clone())); + }); + + // break (conn.config(), p.game_profile); + } + pub fn login_compression(&mut self, p: &ClientboundLoginCompression) { + debug!("Got compression request {p:?}"); + + if let Some(net_conn) = &mut self.net_conn { + net_conn.set_compression_threshold(Some(p.compression_threshold as u32)); + } + } + pub fn custom_query(&mut self, p: &ClientboundCustomQuery) { + debug!("Got custom query {p:?}"); + + as_system::<(Commands, Query<&IgnoreQueryIds>)>(self.ecs, |(mut commands, query)| { + let ignore_query_ids = query.get(self.player).ok().map(|x| x.0.clone()); + if let Some(ignore_query_ids) = ignore_query_ids { + if ignore_query_ids.contains(&p.transaction_id) { + return; + } + } + + commands.trigger(SendLoginPacketEvent::new( + self.player, + ServerboundCustomQueryAnswer { + transaction_id: p.transaction_id, + data: None, + }, + )); + }); + } + pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) { + debug!("Got cookie request {p:?}"); + + as_system::(self.ecs, |mut commands| { + commands.trigger(SendLoginPacketEvent::new( + self.player, + ServerboundCookieResponse { + key: p.key.clone(), + // cookies aren't implemented + payload: None, + }, + )); + }); } - pub fn cookie_request(&mut self, _p: ClientboundCookieRequest) {} } diff --git a/azalea-client/src/plugins/packet/mod.rs b/azalea-client/src/plugins/packet/mod.rs index 362154cc..5db390e9 100644 --- a/azalea-client/src/plugins/packet/mod.rs +++ b/azalea-client/src/plugins/packet/mod.rs @@ -1,16 +1,13 @@ use azalea_entity::metadata::Health; -use bevy_app::{App, First, Plugin, PreUpdate, Update}; +use bevy_app::{App, Plugin, Update}; use bevy_ecs::{ prelude::*, system::{SystemParam, SystemState}, }; -use self::{ - game::{ - AddPlayerEvent, DeathEvent, InstanceLoadedEvent, KeepAliveEvent, RemovePlayerEvent, - ResourcePackEvent, UpdatePlayerEvent, - }, - login::{LoginPacketEvent, SendLoginPacketEvent}, +use self::game::{ + AddPlayerEvent, DeathEvent, InstanceLoadedEvent, KeepAliveEvent, RemovePlayerEvent, + ResourcePackEvent, UpdatePlayerEvent, }; use crate::{chat::ChatReceivedEvent, events::death_listener}; @@ -36,50 +33,35 @@ pub fn death_event_on_0_health( impl Plugin for PacketPlugin { fn build(&self, app: &mut App) { - app.add_systems( - First, - ( - game::emit_receive_packet_events, - config::emit_receive_config_packet_events, - ), - ) - .add_systems( - PreUpdate, - ( - game::process_packet_events, - config::process_packet_events, - login::handle_send_packet_event, - login::process_packet_events, - ), - ) - .add_observer(game::handle_outgoing_packets_observer) - .add_observer(config::handle_outgoing_packets_observer) - .add_systems( - Update, - ( + app.add_observer(game::handle_outgoing_packets_observer) + .add_observer(config::handle_outgoing_packets_observer) + .add_systems( + Update, ( - config::handle_outgoing_packets, - game::handle_outgoing_packets, - ) - .chain(), - death_event_on_0_health.before(death_listener), - ), - ) - // we do this instead of add_event so we can handle the events ourselves - .init_resource::>() - .init_resource::>() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::() - .add_event::(); + ( + config::handle_outgoing_packets, + game::handle_outgoing_packets, + ) + .chain(), + death_event_on_0_health.before(death_listener), + ), + ) + .add_event::() + .add_event::() + .add_event::() + // + .add_event::() + .add_event::() + .add_event::() + // + .add_event::() + .add_event::() + .add_event::() + .add_event::() + .add_event::() + .add_event::() + .add_event::() + .add_event::(); } } diff --git a/azalea-client/src/raw_connection.rs b/azalea-client/src/raw_connection.rs deleted file mode 100644 index 97e93f16..00000000 --- a/azalea-client/src/raw_connection.rs +++ /dev/null @@ -1,208 +0,0 @@ -use std::fmt::Debug; -use std::sync::Arc; - -use azalea_protocol::{ - connect::{RawReadConnection, RawWriteConnection}, - packets::{ConnectionProtocol, Packet, ProtocolPacket}, - read::ReadPacketError, - write::serialize_packet, -}; -use bevy_ecs::prelude::*; -use parking_lot::Mutex; -use thiserror::Error; -use tokio::sync::mpsc::{ - self, - error::{SendError, TrySendError}, -}; -use tracing::error; - -/// A component for clients that can read and write packets to the server. This -/// works with raw bytes, so you'll have to serialize/deserialize packets -/// yourself. It will do the compression and encryption for you though. -#[derive(Component)] -pub struct RawConnection { - pub reader: RawConnectionReader, - pub writer: RawConnectionWriter, - - /// Packets sent to this will be sent to the server. - /// A task that reads packets from the server. The client is disconnected - /// when this task ends. - pub read_packets_task: tokio::task::JoinHandle<()>, - /// A task that writes packets from the server. - pub write_packets_task: tokio::task::JoinHandle<()>, - - pub connection_protocol: ConnectionProtocol, -} - -#[derive(Clone)] -pub struct RawConnectionReader { - pub incoming_packet_queue: Arc>>>, - pub run_schedule_sender: mpsc::Sender<()>, -} -#[derive(Clone)] -pub struct RawConnectionWriter { - pub outgoing_packets_sender: mpsc::UnboundedSender>, -} - -#[derive(Error, Debug)] -pub enum WritePacketError { - #[error("Wrong protocol state: expected {expected:?}, got {got:?}")] - WrongState { - expected: ConnectionProtocol, - got: ConnectionProtocol, - }, - #[error(transparent)] - Encoding(#[from] azalea_protocol::write::PacketEncodeError), - #[error(transparent)] - SendError { - #[from] - #[backtrace] - source: SendError>, - }, -} - -impl RawConnection { - pub fn new( - run_schedule_sender: mpsc::Sender<()>, - connection_protocol: ConnectionProtocol, - raw_read_connection: RawReadConnection, - raw_write_connection: RawWriteConnection, - ) -> Self { - let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel(); - - let incoming_packet_queue = Arc::new(Mutex::new(Vec::new())); - - let reader = RawConnectionReader { - incoming_packet_queue: incoming_packet_queue.clone(), - run_schedule_sender, - }; - let writer = RawConnectionWriter { - outgoing_packets_sender, - }; - - let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection)); - let write_packets_task = tokio::spawn( - writer - .clone() - .write_task(raw_write_connection, outgoing_packets_receiver), - ); - - Self { - reader, - writer, - read_packets_task, - write_packets_task, - connection_protocol, - } - } - - pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> { - self.writer.outgoing_packets_sender.send(raw_packet)?; - Ok(()) - } - - /// Write the packet with the given state to the server. - /// - /// # Errors - /// - /// Returns an error if the packet is not valid for the current state, or if - /// encoding it failed somehow (like it's too big or something). - pub fn write_packet( - &self, - packet: impl Packet

, - ) -> Result<(), WritePacketError> { - let packet = packet.into_variant(); - let raw_packet = serialize_packet(&packet)?; - self.write_raw_packet(raw_packet)?; - - Ok(()) - } - - /// Returns whether the connection is still alive. - pub fn is_alive(&self) -> bool { - !self.read_packets_task.is_finished() - } - - pub fn incoming_packet_queue(&self) -> Arc>>> { - self.reader.incoming_packet_queue.clone() - } - - pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) { - self.connection_protocol = connection_protocol; - } -} - -impl RawConnectionReader { - /// Loop that reads from the connection and adds the packets to the queue + - /// runs the schedule. - pub async fn read_task(self, mut read_conn: RawReadConnection) { - fn log_for_error(error: &ReadPacketError) { - if !matches!(*error, ReadPacketError::ConnectionClosed) { - error!("Error reading packet from Client: {error:?}"); - } - } - - loop { - match read_conn.read().await { - Ok(raw_packet) => { - let mut incoming_packet_queue = self.incoming_packet_queue.lock(); - - incoming_packet_queue.push(raw_packet); - // this makes it so packets received at the same time are guaranteed to be - // handled in the same tick. this is also an attempt at making it so we can't - // receive any packets in the ticks/updates after being disconnected. - loop { - let raw_packet = match read_conn.try_read() { - Ok(p) => p, - Err(err) => { - log_for_error(&err); - return; - } - }; - let Some(raw_packet) = raw_packet else { break }; - incoming_packet_queue.push(raw_packet); - } - - // tell the client to run all the systems - if self.run_schedule_sender.try_send(()) == Err(TrySendError::Closed(())) { - // the client was dropped - break; - } - } - Err(err) => { - log_for_error(&err); - return; - } - } - } - } -} - -impl RawConnectionWriter { - /// Consume the [`ServerboundGamePacket`] queue and actually write the - /// packets to the server. It's like this so writing packets doesn't need to - /// be awaited. - /// - /// [`ServerboundGamePacket`]: azalea_protocol::packets::game::ServerboundGamePacket - pub async fn write_task( - self, - mut write_conn: RawWriteConnection, - mut outgoing_packets_receiver: mpsc::UnboundedReceiver>, - ) { - while let Some(raw_packet) = outgoing_packets_receiver.recv().await { - if let Err(err) = write_conn.write(&raw_packet).await { - error!("Disconnecting because we couldn't write a packet: {err}."); - break; - }; - } - // receiver is automatically closed when it's dropped - } -} - -impl Drop for RawConnection { - /// Stop every active task when this `RawConnection` is dropped. - fn drop(&mut self) { - self.read_packets_task.abort(); - self.write_packets_task.abort(); - } -} diff --git a/azalea-client/src/test_simulation.rs b/azalea-client/src/test_simulation.rs index a09e5dae..3e4dd99a 100644 --- a/azalea-client/src/test_simulation.rs +++ b/azalea-client/src/test_simulation.rs @@ -1,6 +1,5 @@ -use std::{fmt::Debug, sync::Arc, time::Duration}; +use std::{fmt::Debug, sync::Arc}; -use azalea_auth::game_profile::GameProfile; use azalea_buf::AzaleaWrite; use azalea_core::delta::PositionDelta8; use azalea_core::game_type::{GameMode, OptionalGameType}; @@ -21,17 +20,13 @@ use azalea_world::palette::{PalettedContainer, PalettedContainerKind}; use azalea_world::{Chunk, Instance, MinecraftEntityId, Section}; use bevy_app::App; use bevy_ecs::{prelude::*, schedule::ExecutorKind}; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use simdnbt::owned::{NbtCompound, NbtTag}; -use tokio::task::JoinHandle; -use tokio::{sync::mpsc, time::sleep}; use uuid::Uuid; +use crate::connection::RawConnection; use crate::disconnect::DisconnectEvent; -use crate::{ - ClientInformation, GameProfileComponent, InConfigState, InstanceHolder, LocalPlayerBundle, - raw_connection::{RawConnection, RawConnectionReader, RawConnectionWriter}, -}; +use crate::{ClientInformation, InConfigState, InstanceHolder, LocalPlayerBundle}; /// A way to simulate a client in a server, used for some internal tests. pub struct Simulation { @@ -40,16 +35,13 @@ pub struct Simulation { // the runtime needs to be kept around for the tasks to be considered alive pub rt: tokio::runtime::Runtime, - - pub incoming_packet_queue: Arc>>>, - pub clear_outgoing_packets_receiver_task: JoinHandle, } impl Simulation { pub fn new(initial_connection_protocol: ConnectionProtocol) -> Self { let mut app = create_simulation_app(); let mut entity = app.world_mut().spawn_empty(); - let (player, clear_outgoing_packets_receiver_task, incoming_packet_queue, rt) = + let (player, rt) = create_local_player_bundle(entity.id(), ConnectionProtocol::Configuration); entity.insert(player); @@ -61,13 +53,7 @@ impl Simulation { app.world_mut().entity_mut(entity).insert(InConfigState); tick_app(&mut app); - let mut simulation = Self { - app, - entity, - rt, - incoming_packet_queue, - clear_outgoing_packets_receiver_task, - }; + let mut simulation = Self { app, entity, rt }; #[allow(clippy::single_match)] match initial_connection_protocol { @@ -95,9 +81,11 @@ impl Simulation { simulation } - pub fn receive_packet(&self, packet: impl Packet

) { + pub fn receive_packet(&mut self, packet: impl Packet

) { let buf = azalea_protocol::write::serialize_packet(&packet.into_variant()).unwrap(); - self.incoming_packet_queue.lock().push(buf); + self.with_component_mut::(|raw_conn| { + raw_conn.injected_clientbound_packets.push(buf.clone()); + }); } pub fn tick(&mut self) { @@ -112,6 +100,14 @@ impl Simulation { pub fn has_component(&self) -> bool { self.app.world().get::(self.entity).is_some() } + pub fn with_component_mut(&mut self, f: impl FnOnce(&mut T)) { + f(&mut self + .app + .world_mut() + .entity_mut(self.entity) + .get_mut::() + .unwrap()); + } pub fn resource(&self) -> T { self.app.world().get_resource::().unwrap().clone() } @@ -143,70 +139,24 @@ impl Simulation { fn create_local_player_bundle( entity: Entity, connection_protocol: ConnectionProtocol, -) -> ( - LocalPlayerBundle, - JoinHandle, - Arc>>>, - tokio::runtime::Runtime, -) { +) -> (LocalPlayerBundle, tokio::runtime::Runtime) { // unused since we'll trigger ticks ourselves - let (run_schedule_sender, _run_schedule_receiver) = mpsc::channel(1); - - let (outgoing_packets_sender, mut outgoing_packets_receiver) = mpsc::unbounded_channel(); - let incoming_packet_queue = Arc::new(Mutex::new(Vec::new())); - let reader = RawConnectionReader { - incoming_packet_queue: incoming_packet_queue.clone(), - run_schedule_sender, - }; - let writer = RawConnectionWriter { - outgoing_packets_sender, - }; let rt = tokio::runtime::Runtime::new().unwrap(); - // the tasks can't die since that would make us send a DisconnectEvent - let read_packets_task = rt.spawn(async { - loop { - sleep(Duration::from_secs(60)).await; - } - }); - let write_packets_task = rt.spawn(async { - loop { - sleep(Duration::from_secs(60)).await; - } - }); - - let clear_outgoing_packets_receiver_task = rt.spawn(async move { - loop { - let _ = outgoing_packets_receiver.recv().await; - } - }); - - let raw_connection = RawConnection { - reader, - writer, - read_packets_task, - write_packets_task, - connection_protocol, - }; + let raw_connection = RawConnection::new_networkless(connection_protocol); let instance = Instance::default(); let instance_holder = InstanceHolder::new(entity, Arc::new(RwLock::new(instance))); let local_player_bundle = LocalPlayerBundle { raw_connection, - game_profile: GameProfileComponent(GameProfile::new(Uuid::nil(), "azalea".to_owned())), client_information: ClientInformation::default(), instance_holder, metadata: PlayerMetadataBundle::default(), }; - ( - local_player_bundle, - clear_outgoing_packets_receiver_task, - incoming_packet_queue, - rt, - ) + (local_player_bundle, rt) } fn create_simulation_app() -> App { diff --git a/azalea-protocol/src/packets/game/s_client_information.rs b/azalea-protocol/src/packets/game/s_client_information.rs index 5861212c..c8e76f63 100644 --- a/azalea-protocol/src/packets/game/s_client_information.rs +++ b/azalea-protocol/src/packets/game/s_client_information.rs @@ -5,5 +5,5 @@ use crate::common::client_information::ClientInformation; #[derive(Clone, Debug, AzBuf, ServerboundGamePacket)] pub struct ServerboundClientInformation { - pub information: ClientInformation, + pub client_information: ClientInformation, } diff --git a/azalea-protocol/src/read.rs b/azalea-protocol/src/read.rs index 84c307d7..de196984 100644 --- a/azalea-protocol/src/read.rs +++ b/azalea-protocol/src/read.rs @@ -285,6 +285,8 @@ where buffer.get_mut().extend_from_slice(&bytes); } } +/// Read a packet from the stream, then if necessary decrypt it, decompress +/// it, and split it. pub fn try_read_raw_packet( stream: &mut R, buffer: &mut Cursor>, diff --git a/azalea-protocol/src/write.rs b/azalea-protocol/src/write.rs index adefc340..dd863f9e 100644 --- a/azalea-protocol/src/write.rs +++ b/azalea-protocol/src/write.rs @@ -54,6 +54,15 @@ pub async fn write_raw_packet( where W: AsyncWrite + Unpin + Send, { + let network_packet = encode_to_network_packet(raw_packet, compression_threshold, cipher); + stream.write_all(&network_packet).await +} + +pub fn encode_to_network_packet( + raw_packet: &[u8], + compression_threshold: Option, + cipher: &mut Option, +) -> Vec { trace!("Writing raw packet: {raw_packet:?}"); let mut raw_packet = raw_packet.to_vec(); if let Some(threshold) = compression_threshold { @@ -64,7 +73,7 @@ where if let Some(cipher) = cipher { azalea_crypto::encrypt_packet(cipher, &mut raw_packet); } - stream.write_all(&raw_packet).await + raw_packet } pub fn compression_encoder( diff --git a/azalea/examples/testbot/commands/debug.rs b/azalea/examples/testbot/commands/debug.rs index 9de4d97d..a42fb93f 100644 --- a/azalea/examples/testbot/commands/debug.rs +++ b/azalea/examples/testbot/commands/debug.rs @@ -248,7 +248,7 @@ pub fn register(commands: &mut CommandDispatcher>) { } } "bevy_ecs::event::collections::Events" => { - let events = ecs.resource::>(); + let events = ecs.resource::>(); writeln!(report, "- Event count: {}", events.len()).unwrap(); } "bevy_ecs::event::collections::Events" => { diff --git a/azalea/examples/testbot/main.rs b/azalea/examples/testbot/main.rs index 410d1b6d..683a98d7 100644 --- a/azalea/examples/testbot/main.rs +++ b/azalea/examples/testbot/main.rs @@ -134,7 +134,7 @@ async fn handle(bot: Client, event: azalea::Event, state: State) -> anyhow::Resu view_distance: 32, ..Default::default() }) - .await?; + .await; if swarm.args.pathfinder_debug_particles { bot.ecs .lock() diff --git a/azalea/src/container.rs b/azalea/src/container.rs index 0ce0fc44..0d1cfb16 100644 --- a/azalea/src/container.rs +++ b/azalea/src/container.rs @@ -1,7 +1,7 @@ use std::fmt::Debug; use std::fmt::Formatter; -use azalea_client::packet::game::ReceivePacketEvent; +use azalea_client::packet::game::ReceiveGamePacketEvent; use azalea_client::{ Client, inventory::{CloseContainerEvent, ContainerClickEvent, Inventory}, @@ -234,7 +234,10 @@ impl ContainerHandle { #[derive(Component, Debug)] pub struct WaitingForInventoryOpen; -fn handle_menu_opened_event(mut commands: Commands, mut events: EventReader) { +fn handle_menu_opened_event( + mut commands: Commands, + mut events: EventReader, +) { for event in events.read() { if let ClientboundGamePacket::ContainerSetContent { .. } = event.packet.as_ref() { commands diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index eb56ae2d..48e91a36 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -495,7 +495,8 @@ where let Some(first_bot_state) = first_bot.query::>(&mut ecs).cloned() else { error!( "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", - first_bot.profile.name, first_bot.entity + first_bot.username(), + first_bot.entity ); continue; }; @@ -513,7 +514,8 @@ where let Some(state) = bot.query::>(&mut ecs).cloned() else { error!( "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", - bot.profile.name, bot.entity + bot.username(), + bot.entity ); continue; };