From 5557747a971749009dfee9da4fa27f49b99fbbd8 Mon Sep 17 00:00:00 2001 From: mat Date: Fri, 11 Apr 2025 17:31:23 -1000 Subject: [PATCH] fixes --- azalea-client/src/client.rs | 53 ++++------- azalea-client/src/plugins/connection.rs | 95 +++++++++++-------- azalea-client/src/plugins/login.rs | 3 +- .../src/plugins/packet/config/events.rs | 2 +- .../src/plugins/packet/game/events.rs | 2 +- .../src/plugins/packet/login/events.rs | 55 ++++++++++- azalea-client/src/plugins/packet/login/mod.rs | 81 +++++++--------- azalea-client/src/plugins/packet/mod.rs | 2 + .../src/packets/login/s_custom_query.rs | 9 -- 9 files changed, 162 insertions(+), 140 deletions(-) delete mode 100644 azalea-protocol/src/packets/login/s_custom_query.rs diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index fbb043aa..5e985731 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -26,11 +26,8 @@ use azalea_protocol::{ packets::{ self, ClientIntention, ConnectionProtocol, PROTOCOL_VERSION, Packet, game::{self, ServerboundGamePacket}, - handshake::{ - ClientboundHandshakePacket, ServerboundHandshakePacket, - s_intention::ServerboundIntention, - }, - login::{ClientboundLoginPacket, ServerboundLoginPacket, s_hello::ServerboundHello}, + handshake::s_intention::ServerboundIntention, + login::s_hello::ServerboundHello, }, resolver, }; @@ -276,6 +273,7 @@ impl Client { let mut entity_mut = ecs.entity_mut(entity); entity_mut.insert(( + InLoginState, // add the Account to the entity now so plugins can access it earlier account.to_owned(), // localentity is always present for our clients, even if we're not actually logged @@ -291,12 +289,21 @@ impl Client { entity }; - let conn = if let Some(proxy) = proxy { + let mut conn = if let Some(proxy) = proxy { Connection::new_with_proxy(resolved_address, proxy).await? } else { Connection::new(resolved_address).await? }; - let conn = Self::handshake(ecs_lock.clone(), entity, conn, account, address).await?; + debug!("Created connection to {resolved_address:?}"); + + conn.write(ServerboundIntention { + protocol_version: PROTOCOL_VERSION, + hostname: address.host.clone(), + port: address.port, + intention: ClientIntention::Login, + }) + .await?; + let conn = conn.login(); let (read_conn, write_conn) = conn.into_split(); let (read_conn, write_conn) = (read_conn.raw, write_conn.raw); @@ -326,38 +333,9 @@ impl Client { instance_holder, metadata: azalea_entity::metadata::PlayerMetadataBundle::default(), }, - InConfigState, - // this component is never removed - LocalEntity, )); } - let client = Client::new(entity, ecs_lock.clone(), run_schedule_sender.clone()); - Ok(client) - } - - /// Do a handshake with the server and get to the game state from the - /// initial handshake state. - /// - /// This will also automatically refresh the account's access token if - /// it's expired. - pub async fn handshake( - ecs_lock: Arc>, - entity: Entity, - mut conn: Connection, - account: &Account, - address: &ServerAddress, - ) -> Result, JoinError> { - // handshake - conn.write(ServerboundIntention { - protocol_version: PROTOCOL_VERSION, - hostname: address.host.clone(), - port: address.port, - intention: ClientIntention::Login, - }) - .await?; - let conn = conn.login(); - as_system::(&mut ecs_lock.lock(), |mut commands| { commands.entity(entity).insert(( crate::packet::login::IgnoreQueryIds::default(), @@ -374,7 +352,8 @@ impl Client { )) }); - Ok(conn) + let client = Client::new(entity, ecs_lock.clone(), run_schedule_sender.clone()); + Ok(client) } /// Write a packet directly to the server. diff --git a/azalea-client/src/plugins/connection.rs b/azalea-client/src/plugins/connection.rs index 99dc7d11..b9c695bb 100644 --- a/azalea-client/src/plugins/connection.rs +++ b/azalea-client/src/plugins/connection.rs @@ -19,7 +19,7 @@ use tokio::{ net::tcp::OwnedWriteHalf, sync::mpsc::{self}, }; -use tracing::{debug, error}; +use tracing::{debug, error, trace}; use super::packet::{ config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent, @@ -35,63 +35,59 @@ impl Plugin for ConnectionPlugin { pub fn read_packets(ecs: &mut World) { // receive_game_packet_events: EventWriter, - let mut query = ecs.query::<(Entity, &mut RawConnection)>(); + let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>(); + let mut conn_query = ecs.query::<&mut RawConnection>(); let mut entities_handling_packets = Vec::new(); let mut entities_with_injected_packets = Vec::new(); - for (entity, mut raw_conn) in query.iter_mut(ecs) { - let state = raw_conn.state; - + for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) { if !raw_conn.injected_clientbound_packets.is_empty() { entities_with_injected_packets.push(( entity, - state, mem::take(&mut raw_conn.injected_clientbound_packets), )); } - let Some(net_conn) = raw_conn.network.take() else { - // means it's a networkless connection + if raw_conn.network.is_none() { + // no network connection, don't bother with the normal packet handling continue; - }; - entities_handling_packets.push((entity, state, net_conn)); + } + + entities_handling_packets.push(entity); } let mut queued_packet_events = QueuedPacketEvents::default(); // handle injected packets, see the comment on // RawConnection::injected_clientbound_packets for more info - for (entity, mut state, raw_packets) in entities_with_injected_packets { + for (entity, raw_packets) in entities_with_injected_packets { for raw_packet in raw_packets { - handle_raw_packet( - ecs, - &raw_packet, - entity, - &mut state, - None, - &mut queued_packet_events, - ) - .unwrap(); + let conn = conn_query.get(ecs, entity).unwrap(); + let state = conn.state; + + trace!("Received injected packet with bytes: {raw_packet:?}"); + handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events).unwrap(); // update the state and for the client - let (_, mut raw_conn_component) = query.get_mut(ecs, entity).unwrap(); + let (_, mut raw_conn_component) = entity_and_conn_query.get_mut(ecs, entity).unwrap(); raw_conn_component.state = state; } } - // we pass the mutable state and net_conn into the handlers so they're allowed - // to mutate it - for (entity, mut state, mut net_conn) in entities_handling_packets { + for entity in entities_handling_packets { loop { - match net_conn.reader.try_read() { + let mut conn = conn_query.get_mut(ecs, entity).unwrap(); + let net_conn = conn.net_conn().unwrap(); + let read_res = net_conn.reader.try_read(); + let state = conn.state; + match read_res { Ok(Some(raw_packet)) => { let raw_packet = Arc::<[u8]>::from(raw_packet); if let Err(e) = handle_raw_packet( ecs, &raw_packet, entity, - &mut state, - Some(&mut net_conn), + state, &mut queued_packet_events, ) { error!("Error reading packet: {e}"); @@ -108,14 +104,12 @@ pub fn read_packets(ecs: &mut World) { } } - // this needs to be done at some point every update, so we do it here right - // after the handlers are called - net_conn.poll_writer(); - - // update the state and network connections for the client - let (_, mut raw_conn_component) = query.get_mut(ecs, entity).unwrap(); - raw_conn_component.state = state; - raw_conn_component.network = Some(net_conn); + let mut net_conn = conn_query.get_mut(ecs, entity).unwrap(); + if let Some(net_conn) = &mut net_conn.network { + // this needs to be done at some point every update, so we do it here right + // after the handlers are called + net_conn.poll_writer(); + } } queued_packet_events.send_events(ecs); @@ -217,9 +211,11 @@ impl RawConnection { packet: impl Packet

, ) -> Result<(), WritePacketError> { if let Some(network) = &mut self.network { - let packet = packet.into_variant(); - let raw_packet = serialize_packet(&packet)?; - network.write_raw(&raw_packet)?; + network.write(packet)?; + } else { + debug!( + "tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead" + ); } Ok(()) } @@ -233,8 +229,7 @@ pub fn handle_raw_packet( ecs: &mut World, raw_packet: &[u8], entity: Entity, - state: &mut ConnectionProtocol, - net_conn: Option<&mut NetworkConnection>, + state: ConnectionProtocol, queued_packet_events: &mut QueuedPacketEvents, ) -> Result<(), Box> { let stream = &mut Cursor::new(raw_packet); @@ -244,6 +239,7 @@ pub fn handle_raw_packet( } ConnectionProtocol::Game => { let packet = Arc::new(deserialize_packet::(stream)?); + trace!("Packet: {packet:?}"); game::process_packet(ecs, entity, packet.as_ref()); queued_packet_events .game @@ -254,13 +250,15 @@ pub fn handle_raw_packet( } ConnectionProtocol::Login => { let packet = Arc::new(deserialize_packet::(stream)?); - login::process_packet(ecs, entity, &packet, state, net_conn); + trace!("Packet: {packet:?}"); + login::process_packet(ecs, entity, &packet); queued_packet_events .login .push(ReceiveLoginPacketEvent { entity, packet }); } ConnectionProtocol::Configuration => { let packet = Arc::new(deserialize_packet::(stream)?); + trace!("Packet: {packet:?}"); config::process_packet(ecs, entity, &packet); queued_packet_events .config @@ -283,6 +281,17 @@ pub struct NetworkConnection { network_packet_writer_tx: mpsc::UnboundedSender>, } impl NetworkConnection { + pub fn write( + &mut self, + packet: impl Packet

, + ) -> Result<(), WritePacketError> { + let packet = packet.into_variant(); + let raw_packet = serialize_packet(&packet)?; + self.write_raw(&raw_packet)?; + + Ok(()) + } + pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> { let network_packet = azalea_protocol::write::encode_to_network_packet( raw_packet, @@ -299,11 +308,13 @@ impl NetworkConnection { } pub fn set_compression_threshold(&mut self, threshold: Option) { + trace!("Set compression threshold to {threshold:?}"); self.reader.compression_threshold = threshold; } /// Set the encryption key that is used to encrypt and decrypt packets. It's /// the same for both reading and writing. pub fn set_encryption_key(&mut self, key: [u8; 16]) { + trace!("Enabled protocol encryption"); let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key); self.reader.dec_cipher = Some(dec_cipher); self.enc_cipher = Some(enc_cipher); @@ -315,11 +326,13 @@ async fn write_task( mut write_half: OwnedWriteHalf, ) { while let Some(network_packet) = network_packet_writer_rx.recv().await { + trace!("writing encoded raw packet"); if let Err(e) = write_half.write_all(&network_packet).await { debug!("Error writing packet to server: {e}"); break; }; } + trace!("write task is done"); } #[derive(Error, Debug)] diff --git a/azalea-client/src/plugins/login.rs b/azalea-client/src/plugins/login.rs index 58c5f628..91c2b746 100644 --- a/azalea-client/src/plugins/login.rs +++ b/azalea-client/src/plugins/login.rs @@ -3,7 +3,7 @@ use azalea_protocol::packets::login::{ClientboundHello, ServerboundKey}; use bevy_app::prelude::*; use bevy_ecs::prelude::*; use bevy_tasks::{IoTaskPool, Task, futures_lite::future}; -use tracing::error; +use tracing::{debug, error}; use super::{connection::RawConnection, packet::login::ReceiveHelloEvent}; use crate::{Account, JoinError}; @@ -33,6 +33,7 @@ fn poll_auth_task( ) { for (entity, mut auth_task, mut raw_conn) in query.iter_mut() { if let Some(poll_res) = future::block_on(future::poll_once(&mut auth_task.0)) { + debug!("Finished auth"); commands.entity(entity).remove::(); match poll_res { Ok((packet, private_key)) => { diff --git a/azalea-client/src/plugins/packet/config/events.rs b/azalea-client/src/plugins/packet/config/events.rs index 012ea0c1..a9237e75 100644 --- a/azalea-client/src/plugins/packet/config/events.rs +++ b/azalea-client/src/plugins/packet/config/events.rs @@ -44,7 +44,7 @@ pub fn handle_outgoing_packets_observer( ); return; } - debug!("Sending packet: {:?}", event.packet); + debug!("Sending config packet: {:?}", event.packet); if let Err(e) = raw_conn.write(event.packet.clone()) { error!("Failed to send packet: {e}"); } diff --git a/azalea-client/src/plugins/packet/game/events.rs b/azalea-client/src/plugins/packet/game/events.rs index f5d72946..3367f02c 100644 --- a/azalea-client/src/plugins/packet/game/events.rs +++ b/azalea-client/src/plugins/packet/game/events.rs @@ -70,7 +70,7 @@ pub fn handle_outgoing_packets_observer( return; } - // debug!("Sending packet: {:?}", event.packet); + // debug!("Sending game packet: {:?}", event.packet); if let Err(e) = raw_connection.write(event.packet.clone()) { error!("Failed to send packet: {e}"); } diff --git a/azalea-client/src/plugins/packet/login/events.rs b/azalea-client/src/plugins/packet/login/events.rs index 70e9b775..95ad6294 100644 --- a/azalea-client/src/plugins/packet/login/events.rs +++ b/azalea-client/src/plugins/packet/login/events.rs @@ -1,9 +1,14 @@ use std::sync::Arc; -use azalea_protocol::packets::login::{ClientboundHello, ClientboundLoginPacket}; +use azalea_protocol::packets::{ + Packet, + login::{ClientboundHello, ClientboundLoginPacket, ServerboundLoginPacket}, +}; use bevy_ecs::prelude::*; +use tracing::{debug, error}; -use crate::Account; +use super::InLoginState; +use crate::{Account, connection::RawConnection}; #[derive(Event, Debug, Clone)] pub struct ReceiveLoginPacketEvent { @@ -18,3 +23,49 @@ pub struct ReceiveHelloEvent { pub account: Account, pub packet: ClientboundHello, } + +/// Event for sending a login packet to the server. +#[derive(Event, Clone)] +pub struct SendLoginPacketEvent { + pub sent_by: Entity, + pub packet: ServerboundLoginPacket, +} +impl SendLoginPacketEvent { + pub fn new(entity: Entity, packet: impl Packet) -> Self { + let packet = packet.into_variant(); + Self { + sent_by: entity, + packet, + } + } +} + +pub fn handle_outgoing_packets_observer( + trigger: Trigger, + mut query: Query<(&mut RawConnection, Option<&InLoginState>)>, +) { + let event = trigger.event(); + if let Ok((mut raw_conn, in_login_state)) = query.get_mut(event.sent_by) { + if in_login_state.is_none() { + error!( + "Tried to send a login packet {:?} while not in login state", + event.packet + ); + return; + } + debug!("Sending login packet: {:?}", event.packet); + if let Err(e) = raw_conn.write(event.packet.clone()) { + error!("Failed to send packet: {e}"); + } + } +} +/// A system that converts [`SendLoginPacketEvent`] events into triggers so +/// they get received by [`handle_outgoing_packets_observer`]. +pub fn handle_outgoing_packets( + mut commands: Commands, + mut events: EventReader, +) { + for event in events.read() { + commands.trigger(event.clone()); + } +} diff --git a/azalea-client/src/plugins/packet/login/mod.rs b/azalea-client/src/plugins/packet/login/mod.rs index 79990f4a..afa9530a 100644 --- a/azalea-client/src/plugins/packet/login/mod.rs +++ b/azalea-client/src/plugins/packet/login/mod.rs @@ -6,12 +6,12 @@ mod events; use std::collections::HashSet; use azalea_protocol::packets::{ - ConnectionProtocol, Packet, + ConnectionProtocol, login::{ ClientboundCookieRequest, ClientboundCustomQuery, ClientboundHello, ClientboundLoginCompression, ClientboundLoginDisconnect, ClientboundLoginFinished, ClientboundLoginPacket, ServerboundCookieResponse, ServerboundCustomQueryAnswer, - ServerboundLoginAcknowledged, ServerboundLoginPacket, + ServerboundLoginAcknowledged, }, }; use bevy_ecs::prelude::*; @@ -21,23 +21,12 @@ use tracing::{debug, error}; use super::as_system; use crate::{ - Account, GameProfileComponent, InConfigState, connection::NetworkConnection, + Account, GameProfileComponent, InConfigState, connection::RawConnection, declare_packet_handlers, disconnect::DisconnectEvent, }; -pub fn process_packet( - ecs: &mut World, - player: Entity, - packet: &ClientboundLoginPacket, - state: &mut ConnectionProtocol, - net_conn: Option<&mut NetworkConnection>, -) { - let mut handler = LoginPacketHandler { - player, - ecs, - state, - net_conn, - }; +pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundLoginPacket) { + let mut handler = LoginPacketHandler { player, ecs }; declare_packet_handlers!( ClientboundLoginPacket, @@ -54,19 +43,6 @@ pub fn process_packet( ); } -/// Event for sending a login packet to the server. -#[derive(Event)] -pub struct SendLoginPacketEvent { - pub entity: Entity, - pub packet: ServerboundLoginPacket, -} -impl SendLoginPacketEvent { - pub fn new(entity: Entity, packet: impl Packet) -> Self { - let packet = packet.into_variant(); - Self { entity, packet } - } -} - /// A marker component for local players that are currently in the /// `login` state. #[derive(Component, Clone, Debug)] @@ -80,8 +56,6 @@ pub struct IgnoreQueryIds(HashSet); pub struct LoginPacketHandler<'a> { pub ecs: &'a mut World, pub player: Entity, - pub state: &'a mut ConnectionProtocol, - pub net_conn: Option<&'a mut NetworkConnection>, } impl LoginPacketHandler<'_> { pub fn hello(&mut self, p: &ClientboundHello) { @@ -115,32 +89,43 @@ impl LoginPacketHandler<'_> { } pub fn login_finished(&mut self, p: &ClientboundLoginFinished) { debug!( - "Got profile {:?}. handshake is finished and we're now switching to the configuration state", + "Got profile {:?}. login is finished and we're now switching to the config state", p.game_profile ); - as_system::(self.ecs, |mut commands| { - commands.trigger(SendLoginPacketEvent::new( - self.player, - ServerboundLoginAcknowledged, - )); + as_system::<(Commands, Query<&mut RawConnection>)>( + self.ecs, + |(mut commands, mut query)| { + commands.trigger(SendLoginPacketEvent::new( + self.player, + ServerboundLoginAcknowledged, + )); - commands - .entity(self.player) - .remove::() - .remove::() - .insert(InConfigState) - .insert(GameProfileComponent(p.game_profile.clone())); - }); + commands + .entity(self.player) + .remove::() + .remove::() + .insert(InConfigState) + .insert(GameProfileComponent(p.game_profile.clone())); - // break (conn.config(), p.game_profile); + let mut conn = query + .get_mut(self.player) + .expect("RawConnection component should be present when receiving packets"); + conn.state = ConnectionProtocol::Configuration; + }, + ); } pub fn login_compression(&mut self, p: &ClientboundLoginCompression) { debug!("Got compression request {p:?}"); - if let Some(net_conn) = &mut self.net_conn { - net_conn.set_compression_threshold(Some(p.compression_threshold as u32)); - } + as_system::>(self.ecs, |mut query| { + let mut conn = query + .get_mut(self.player) + .expect("RawConnection component should be present when receiving packets"); + if let Some(net_conn) = &mut conn.net_conn() { + net_conn.set_compression_threshold(Some(p.compression_threshold as u32)); + } + }) } pub fn custom_query(&mut self, p: &ClientboundCustomQuery) { debug!("Got custom query {p:?}"); diff --git a/azalea-client/src/plugins/packet/mod.rs b/azalea-client/src/plugins/packet/mod.rs index 5db390e9..e08b57c6 100644 --- a/azalea-client/src/plugins/packet/mod.rs +++ b/azalea-client/src/plugins/packet/mod.rs @@ -35,12 +35,14 @@ impl Plugin for PacketPlugin { fn build(&self, app: &mut App) { app.add_observer(game::handle_outgoing_packets_observer) .add_observer(config::handle_outgoing_packets_observer) + .add_observer(login::handle_outgoing_packets_observer) .add_systems( Update, ( ( config::handle_outgoing_packets, game::handle_outgoing_packets, + login::handle_outgoing_packets, ) .chain(), death_event_on_0_health.before(death_listener), diff --git a/azalea-protocol/src/packets/login/s_custom_query.rs b/azalea-protocol/src/packets/login/s_custom_query.rs deleted file mode 100644 index 39ecdcef..00000000 --- a/azalea-protocol/src/packets/login/s_custom_query.rs +++ /dev/null @@ -1,9 +0,0 @@ -use azalea_buf::{AzBuf, UnsizedByteArray}; -use azalea_protocol_macros::ServerboundLoginPacket; - -#[derive(Clone, Debug, AzBuf, ServerboundLoginPacket)] -pub struct ServerboundCustomQuery { - #[var] - pub transaction_id: u32, - pub data: Option, -}