From 8da179b221ae4a7bfd02245b3cb6357464adbc43 Mon Sep 17 00:00:00 2001 From: mat Date: Mon, 2 Jun 2025 04:12:07 -0100 Subject: [PATCH] simplify some join logic so the Entity is returned even on connection error --- azalea-client/src/client.rs | 40 +++++++--------------- azalea-client/src/plugins/join.rs | 54 ++++++------------------------ azalea-client/src/plugins/login.rs | 19 ++++++++--- azalea/src/swarm/mod.rs | 33 +++++++----------- 4 files changed, 50 insertions(+), 96 deletions(-) diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index f53e693c..190e999c 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, fmt::Debug, - io, mem, + mem, net::SocketAddr, sync::Arc, thread, @@ -9,7 +9,6 @@ use std::{ }; use azalea_auth::game_profile::GameProfile; -use azalea_chat::FormattedText; use azalea_core::{ data_registry::ResolvableDataRegistry, position::Vec3, resource_location::ResourceLocation, tick::GameTick, @@ -22,9 +21,9 @@ use azalea_entity::{ use azalea_protocol::{ ServerAddress, common::client_information::ClientInformation, - connect::{ConnectionError, Proxy}, + connect::Proxy, packets::{ - self, Packet, + Packet, game::{self, ServerboundGamePacket}, }, resolver, @@ -54,7 +53,7 @@ use crate::{ events::Event, interact::CurrentSequenceNumber, inventory::Inventory, - join::{ConnectOpts, StartJoinCallback, StartJoinServerEvent}, + join::{ConnectOpts, StartJoinServerEvent}, local_player::{ GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList, }, @@ -89,22 +88,8 @@ pub struct Client { pub enum JoinError { #[error("{0}")] Resolver(#[from] resolver::ResolverError), - #[error("{0}")] - Connection(#[from] ConnectionError), - #[error("{0}")] - ReadPacket(#[from] Box), - #[error("{0}")] - Io(#[from] io::Error), - #[error("Failed to encrypt the challenge from the server for {0:?}")] - EncryptionError(packets::login::ClientboundHello), - #[error("{0}")] - SessionServer(#[from] azalea_auth::sessionserver::ClientSessionServerError), #[error("The given address could not be parsed into a ServerAddress")] InvalidAddress, - #[error("Couldn't refresh access token: {0}")] - Auth(#[from] azalea_auth::AuthError), - #[error("Disconnected: {reason}")] - Disconnect { reason: FormattedText }, } pub struct StartClientOpts { @@ -193,7 +178,7 @@ impl Client { resolved_address, Some(tx), )) - .await?; + .await; Ok((client, rx)) } @@ -209,7 +194,7 @@ impl Client { let client = Self::start_client( StartClientOpts::new(account, address, resolved_address, Some(tx)).proxy(proxy), ) - .await?; + .await; Ok((client, rx)) } @@ -222,25 +207,24 @@ impl Client { connect_opts, event_sender, }: StartClientOpts, - ) -> Result { + ) -> Self { // send a StartJoinServerEvent let (start_join_callback_tx, mut start_join_callback_rx) = - mpsc::unbounded_channel::>(); + mpsc::unbounded_channel::(); ecs_lock.lock().send_event(StartJoinServerEvent { account, connect_opts, event_sender, - start_join_callback_tx: Some(StartJoinCallback(start_join_callback_tx)), + start_join_callback_tx: Some(start_join_callback_tx), }); let entity = start_join_callback_rx.recv().await.expect( - "StartJoinCallback should not be dropped before sending a message, this is a bug in Azalea", - )?; + "start_join_callback should not be dropped before sending a message, this is a bug in Azalea", + ); - let client = Client::new(entity, ecs_lock.clone()); - Ok(client) + Client::new(entity, ecs_lock) } /// Write a packet directly to the server. diff --git a/azalea-client/src/plugins/join.rs b/azalea-client/src/plugins/join.rs index a3447782..09eeff59 100644 --- a/azalea-client/src/plugins/join.rs +++ b/azalea-client/src/plugins/join.rs @@ -1,4 +1,4 @@ -use std::{io, net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; use azalea_entity::{LocalEntity, indexing::EntityUuidIndex}; use azalea_protocol::{ @@ -20,7 +20,7 @@ use tracing::{debug, warn}; use super::events::LocalPlayerEvents; use crate::{ - Account, JoinError, LocalPlayerBundle, + Account, LocalPlayerBundle, connection::RawConnection, packet::login::{InLoginState, SendLoginPacketEvent}, }; @@ -36,7 +36,6 @@ impl Plugin for JoinPlugin { ( handle_start_join_server_event.before(super::login::poll_auth_task), poll_create_connection_task, - handle_connection_failed_events, ) .chain(), ); @@ -53,7 +52,8 @@ pub struct StartJoinServerEvent { pub connect_opts: ConnectOpts, pub event_sender: Option>, - pub start_join_callback_tx: Option, + // this is mpsc instead of oneshot so it can be cloned (since it's sent in an event) + pub start_join_callback_tx: Option>, } /// Options for how the connection to the server will be made. These are @@ -79,11 +79,6 @@ pub struct ConnectionFailedEvent { pub error: ConnectionError, } -// this is mpsc instead of oneshot so it can be cloned (since it's sent in an -// event) -#[derive(Component, Debug, Clone)] -pub struct StartJoinCallback(pub mpsc::UnboundedSender>); - pub fn handle_start_join_server_event( mut commands: Commands, mut events: EventReader, @@ -103,7 +98,7 @@ pub fn handle_start_join_server_event( warn!( "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok." ); - let _ = start_join_callback_tx.0.send(Ok(entity)); + let _ = start_join_callback_tx.send(entity); } else { warn!( "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event." @@ -121,6 +116,10 @@ pub fn handle_start_join_server_event( entity }; + if let Some(start_join_callback) = &event.start_join_callback_tx { + let _ = start_join_callback.send(entity); + } + let mut entity_mut = commands.entity(entity); entity_mut.insert(( @@ -141,9 +140,6 @@ pub fn handle_start_join_server_event( // handle receiving packets entity_mut.insert(LocalPlayerEvents(event_sender.clone())); } - if let Some(start_join_callback) = &event.start_join_callback_tx { - entity_mut.insert(start_join_callback.clone()); - } let task_pool = IoTaskPool::get(); let connect_opts = event.connect_opts.clone(); @@ -184,15 +180,10 @@ pub struct CreateConnectionTask(pub Task>); pub fn poll_create_connection_task( mut commands: Commands, - mut query: Query<( - Entity, - &mut CreateConnectionTask, - &Account, - Option<&StartJoinCallback>, - )>, + mut query: Query<(Entity, &mut CreateConnectionTask, &Account)>, mut connection_failed_events: EventWriter, ) { - for (entity, mut task, account, mut start_join_callback) in query.iter_mut() { + for (entity, mut task, account) in query.iter_mut() { if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) { let mut entity_mut = commands.entity(entity); entity_mut.remove::(); @@ -238,29 +229,6 @@ pub fn poll_create_connection_task( profile_id: account.uuid_or_offline(), }, )); - - if let Some(cb) = start_join_callback.take() { - let _ = cb.0.send(Ok(entity)); - } } } } - -pub fn handle_connection_failed_events( - mut events: EventReader, - query: Query<&StartJoinCallback>, -) { - for event in events.read() { - let Ok(start_join_callback) = query.get(event.entity) else { - // the StartJoinCallback isn't required to be present, so this is fine - continue; - }; - - // io::Error isn't clonable, so we create a new one based on the `kind` and - // `to_string`, - let ConnectionError::Io(err) = &event.error; - let cloned_err = ConnectionError::Io(io::Error::new(err.kind(), err.to_string())); - - let _ = start_join_callback.0.send(Err(cloned_err.into())); - } -} diff --git a/azalea-client/src/plugins/login.rs b/azalea-client/src/plugins/login.rs index ebba5905..5cb2bce7 100644 --- a/azalea-client/src/plugins/login.rs +++ b/azalea-client/src/plugins/login.rs @@ -5,13 +5,14 @@ use azalea_protocol::packets::login::{ use bevy_app::prelude::*; use bevy_ecs::prelude::*; use bevy_tasks::{IoTaskPool, Task, futures_lite::future}; +use thiserror::Error; use tracing::{debug, error, trace}; use super::{ connection::RawConnection, packet::login::{ReceiveCustomQueryEvent, ReceiveHelloEvent, SendLoginPacketEvent}, }; -use crate::{Account, JoinError}; +use crate::Account; /// Some systems that run during the `login` state. pub struct LoginPlugin; @@ -73,14 +74,24 @@ pub fn poll_auth_task( type PrivateKey = [u8; 16]; #[derive(Component)] -pub struct AuthTask(Task>); +pub struct AuthTask(Task>); + +#[derive(Debug, Error)] +pub enum AuthWithAccountError { + #[error("Failed to encrypt the challenge from the server for {0:?}")] + Encryption(ClientboundHello), + #[error("{0}")] + SessionServer(#[from] ClientSessionServerError), + #[error("Couldn't refresh access token: {0}")] + Auth(#[from] azalea_auth::AuthError), +} pub async fn auth_with_account( account: Account, packet: ClientboundHello, -) -> Result<(ServerboundKey, PrivateKey), JoinError> { +) -> Result<(ServerboundKey, PrivateKey), AuthWithAccountError> { let Ok(encrypt_res) = azalea_crypto::encrypt(&packet.public_key, &packet.challenge) else { - return Err(JoinError::EncryptionError(packet)); + return Err(AuthWithAccountError::Encryption(packet)); }; let key_packet = ServerboundKey { key_bytes: encrypt_res.encrypted_public_key, diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index f4fb6f5d..35007b9e 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -687,7 +687,7 @@ impl Swarm { /// /// # Errors /// - /// Returns an `Err` if the bot could not do a handshake successfully. + /// Returns an error if the server's address could not be resolved. pub async fn add( &self, account: &Account, @@ -702,7 +702,7 @@ impl Swarm { /// /// # Errors /// - /// Returns an `Err` if the bot could not do a handshake successfully. + /// Returns an error if the server's address could not be resolved. pub async fn add_with_opts( &self, account: &Account, @@ -725,7 +725,7 @@ impl Swarm { let (tx, rx) = mpsc::unbounded_channel(); - let bot = Client::start_client(StartClientOpts { + let client = Client::start_client(StartClientOpts { ecs_lock: self.ecs_lock.clone(), account: account.clone(), connect_opts: ConnectOpts { @@ -735,14 +735,14 @@ impl Swarm { }, event_sender: Some(tx), }) - .await?; + .await; // add the state to the client { let mut ecs = self.ecs_lock.lock(); - ecs.entity_mut(bot.entity).insert(state); + ecs.entity_mut(client.entity).insert(state); } - let cloned_bot = bot.clone(); + let cloned_bot = client.clone(); let swarm_tx = self.swarm_tx.clone(); let bots_tx = self.bots_tx.clone(); @@ -751,7 +751,7 @@ impl Swarm { rx, swarm_tx, bots_tx, cloned_bot, join_opts, )); - Ok(bot) + Ok(client) } /// Copy the events from a client's receiver into bots_tx, until the bot is @@ -767,7 +767,9 @@ impl Swarm { if rx.len() > 1_000 { static WARNED_1_000: AtomicBool = AtomicBool::new(false); if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) { - warn!("the client's Event channel has more than 1000 items!") + warn!( + "the client's Event channel has more than 1,000 items! this is probably fine but if you're concerned about it, maybe consider disabling the packet-event feature in azalea to reduce the number of events?" + ) } if rx.len() > 10_000 { @@ -786,7 +788,7 @@ impl Swarm { static WARNED_1_000_000: AtomicBool = AtomicBool::new(false); if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) { warn!( - "the client's Event channel has more than 1,000,000 items!!!! i sincerely hope no one ever sees this warning" + "the client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory" ) } } @@ -860,18 +862,7 @@ impl Swarm { .min(Duration::from_secs(15)); let username = account.username.clone(); - match &e { - JoinError::Disconnect { reason } => { - error!( - "Error joining as {username}, server says: \"{reason}\". Waiting {delay:?} and trying again." - ); - } - _ => { - error!( - "Error joining as {username}: {e}. Waiting {delay:?} and trying again." - ); - } - } + error!("Error joining as {username}: {e}. Waiting {delay:?} and trying again."); sleep(delay).await; }