diff --git a/azalea-auth/src/game_profile.rs b/azalea-auth/src/game_profile.rs index af00712d..ebff4fce 100755 --- a/azalea-auth/src/game_profile.rs +++ b/azalea-auth/src/game_profile.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; use azalea_buf::AzBuf; use serde::{Deserialize, Serialize}; @@ -10,7 +10,7 @@ pub struct GameProfile { pub uuid: Uuid, /// The username of the player. pub name: String, - pub properties: HashMap, + pub properties: Arc>, } impl GameProfile { @@ -18,7 +18,7 @@ impl GameProfile { GameProfile { uuid, name, - properties: HashMap::new(), + properties: Arc::new(HashMap::new()), } } } @@ -38,7 +38,7 @@ impl From for GameProfile { Self { uuid: value.id, name: value.name, - properties, + properties: Arc::new(properties), } } } @@ -59,11 +59,11 @@ pub struct SerializableGameProfile { impl From for SerializableGameProfile { fn from(value: GameProfile) -> Self { let mut properties = Vec::new(); - for (key, value) in value.properties { + for (key, value) in &*value.properties { properties.push(SerializableProfilePropertyValue { - name: key, - value: value.value, - signature: value.signature, + name: key.clone(), + value: value.value.clone(), + signature: value.signature.clone(), }); } Self { @@ -114,7 +114,7 @@ mod tests { signature: Some("zxcv".to_string()), }, ); - map + map.into() }, } ); diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index 7315f197..5863690e 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -334,6 +334,8 @@ impl Client { metadata: azalea_entity::metadata::PlayerMetadataBundle::default(), }, InConfigState, + // this component is never removed + LocalEntity, )); Ok((client, rx)) @@ -813,8 +815,6 @@ pub struct JoinedClientBundle { pub mining: mining::MineBundle, pub attack: attack::AttackBundle, - - pub _local_entity: LocalEntity, } /// A marker component for local players that are currently in the diff --git a/azalea-protocol/src/connect.rs b/azalea-protocol/src/connect.rs index 99a8a6b4..a4c558a1 100755 --- a/azalea-protocol/src/connect.rs +++ b/azalea-protocol/src/connect.rs @@ -243,6 +243,15 @@ where pub fn into_split(self) -> (ReadConnection, WriteConnection) { (self.reader, self.writer) } + + /// Split the reader and writer into the state-agnostic + /// [`RawReadConnection`] and [`RawWriteConnection`] types. + /// + /// This is meant to help with some types of proxies. + #[must_use] + pub fn into_split_raw(self) -> (RawReadConnection, RawWriteConnection) { + (self.reader.raw, self.writer.raw) + } } #[derive(Error, Debug)] diff --git a/azalea-protocol/src/lib.rs b/azalea-protocol/src/lib.rs index f9d29785..e0304979 100644 --- a/azalea-protocol/src/lib.rs +++ b/azalea-protocol/src/lib.rs @@ -35,7 +35,7 @@ pub mod write; /// assert_eq!(addr.host, "localhost"); /// assert_eq!(addr.port, 25565); /// ``` -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ServerAddress { pub host: String, pub port: u16, diff --git a/azalea-protocol/src/packets/game/c_player_info_update.rs b/azalea-protocol/src/packets/game/c_player_info_update.rs index eb019167..32bfe934 100644 --- a/azalea-protocol/src/packets/game/c_player_info_update.rs +++ b/azalea-protocol/src/packets/game/c_player_info_update.rs @@ -80,7 +80,7 @@ impl AzaleaRead for ClientboundPlayerInfoUpdate { if actions.add_player { let action = AddPlayerAction::azalea_read(buf)?; entry.profile.name = action.name; - entry.profile.properties = action.properties; + entry.profile.properties = action.properties.into(); } if actions.initialize_chat { let action = InitializeChatAction::azalea_read(buf)?; @@ -129,7 +129,7 @@ impl AzaleaWrite for ClientboundPlayerInfoUpdate { if self.actions.add_player { AddPlayerAction { name: entry.profile.name.clone(), - properties: entry.profile.properties.clone(), + properties: (*entry.profile.properties).clone(), } .azalea_write(buf)?; } diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index c82bea82..a9b2512b 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -6,7 +6,16 @@ mod chat; mod events; pub mod prelude; -use std::{collections::HashMap, future::Future, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, hash_map}, + future::Future, + net::SocketAddr, + sync::{ + Arc, + atomic::{self, AtomicBool}, + }, + time::Duration, +}; use azalea_client::{ Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts, chat::ChatPacket, @@ -19,7 +28,7 @@ use bevy_ecs::{component::Component, entity::Entity, system::Resource, world::Wo use futures::future::{BoxFuture, join_all}; use parking_lot::{Mutex, RwLock}; use tokio::sync::mpsc; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartError}; @@ -471,8 +480,21 @@ where // bot events while let Some((Some(first_event), first_bot)) = bots_rx.recv().await { + if bots_rx.len() > 1_000 { + static WARNED: AtomicBool = AtomicBool::new(false); + if !WARNED.swap(true, atomic::Ordering::Relaxed) { + warn!("the Client Event channel has more than 1000 items!") + } + } + if let Some(handler) = &self.handler { - let first_bot_state = first_bot.component::(); + let Some(first_bot_state) = first_bot.get_component::() else { + error!( + "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", + first_bot.profile.name, first_bot.entity + ); + continue; + }; let first_bot_entity = first_bot.entity; tokio::spawn((handler)(first_bot, first_event, first_bot_state.clone())); @@ -481,9 +503,19 @@ where let mut states = HashMap::new(); states.insert(first_bot_entity, first_bot_state); while let Ok((Some(event), bot)) = bots_rx.try_recv() { - let state = states - .entry(bot.entity) - .or_insert_with(|| bot.component::().clone()); + let state = match states.entry(bot.entity) { + hash_map::Entry::Occupied(e) => e.into_mut(), + hash_map::Entry::Vacant(e) => { + let Some(state) = bot.get_component::() else { + error!( + "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", + bot.profile.name, bot.entity + ); + continue; + }; + e.insert(state) + } + }; tokio::spawn((handler)(bot, event, state.clone())); } } @@ -610,6 +642,8 @@ impl Swarm { state: S, join_opts: &JoinOpts, ) -> Result { + debug!("add_with_opts called for account {}", account.username); + let address = join_opts .custom_address .clone() @@ -618,7 +652,7 @@ impl Swarm { .custom_resolved_address .unwrap_or_else(|| *self.resolved_address.read()); - let (bot, mut rx) = Client::start_client(StartClientOpts { + let (bot, rx) = Client::start_client(StartClientOpts { ecs_lock: self.ecs_lock.clone(), account, address: &address, @@ -640,26 +674,51 @@ impl Swarm { let cloned_bot = bot.clone(); let swarm_tx = self.swarm_tx.clone(); let join_opts = join_opts.clone(); - tokio::spawn(async move { - while let Some(event) = rx.recv().await { - // we can't handle events here (since we can't copy the handler), - // they're handled above in SwarmBuilder::start - if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) { - error!("Error sending event to swarm: {e}"); - } - } - cloned_bots.lock().remove(&bot.entity); - let account = cloned_bot - .get_component::() - .expect("bot is missing required Account component"); - swarm_tx - .send(SwarmEvent::Disconnect(Box::new(account), join_opts)) - .unwrap(); - }); + tokio::spawn(Self::event_copying_task( + rx, + cloned_bots, + cloned_bots_tx, + cloned_bot, + swarm_tx, + join_opts, + )); Ok(bot) } + async fn event_copying_task( + mut rx: mpsc::UnboundedReceiver, + cloned_bots: Arc>>, + cloned_bots_tx: mpsc::UnboundedSender<(Option, Client)>, + cloned_bot: Client, + swarm_tx: mpsc::UnboundedSender, + join_opts: JoinOpts, + ) { + while let Some(event) = rx.recv().await { + if rx.len() > 1_000 { + static WARNED: AtomicBool = AtomicBool::new(false); + if !WARNED.swap(true, atomic::Ordering::Relaxed) { + warn!("the client's Event channel has more than 1000 items!") + } + } + + // we can't handle events here (since we can't copy the handler), + // they're handled above in SwarmBuilder::start + if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) { + error!("Error sending event to swarm: {e}"); + } + } + debug!("client sender ended, removing from cloned_bots and sending SwarmEvent::Disconnect"); + + cloned_bots.lock().remove(&cloned_bot.entity); + let account = cloned_bot + .get_component::() + .expect("bot is missing required Account component"); + swarm_tx + .send(SwarmEvent::Disconnect(Box::new(account), join_opts)) + .unwrap(); + } + /// Add a new account to the swarm, retrying if it couldn't join. This will /// run forever until the bot joins or the task is aborted. ///