diff --git a/CHANGELOG.md b/CHANGELOG.md index f71d6b49..68702b49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ write down most non-trivial breaking changes. - azalea and azalea-client now have a `packet-event` feature, which can be disabled for efficiency if you're not using `Event::Packet`. - `StartJoinServerEvent` can now be used to join servers exclusively from the ECS without a Tokio runtime. - `FormattedText::to_html` and `FormattedText::to_custom_format`. +- Add auto-reconnecting which is enabled by default. ### Changed diff --git a/azalea-chat/src/lib.rs b/azalea-chat/src/lib.rs index faa54d70..f01d8835 100644 --- a/azalea-chat/src/lib.rs +++ b/azalea-chat/src/lib.rs @@ -8,4 +8,4 @@ pub mod style; pub mod text_component; pub mod translatable_component; -pub use component::{FormattedText, DEFAULT_STYLE}; +pub use component::{DEFAULT_STYLE, FormattedText}; diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index 2ebf44b5..dc9a3d3e 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -54,7 +54,7 @@ use crate::{ events::Event, interact::CurrentSequenceNumber, inventory::Inventory, - join::{StartJoinCallback, StartJoinServerEvent}, + join::{ConnectOpts, StartJoinCallback, StartJoinServerEvent}, local_player::{ GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList, }, @@ -107,22 +107,20 @@ pub enum JoinError { Disconnect { reason: FormattedText }, } -pub struct StartClientOpts<'a> { +pub struct StartClientOpts { pub ecs_lock: Arc>, - pub account: &'a Account, - pub address: &'a ServerAddress, - pub resolved_address: &'a SocketAddr, - pub proxy: Option, + pub account: Account, + pub connect_opts: ConnectOpts, pub event_sender: Option>, } -impl<'a> StartClientOpts<'a> { +impl StartClientOpts { pub fn new( - account: &'a Account, - address: &'a ServerAddress, - resolved_address: &'a SocketAddr, + account: Account, + address: ServerAddress, + resolved_address: SocketAddr, event_sender: Option>, - ) -> StartClientOpts<'a> { + ) -> StartClientOpts { let mut app = App::new(); app.add_plugins(DefaultPlugins); @@ -132,15 +130,17 @@ impl<'a> StartClientOpts<'a> { Self { ecs_lock, account, - address, - resolved_address, - proxy: None, + connect_opts: ConnectOpts { + address, + resolved_address, + proxy: None, + }, event_sender, } } pub fn proxy(mut self, proxy: Proxy) -> Self { - self.proxy = Some(proxy); + self.connect_opts.proxy = Some(proxy); self } } @@ -173,14 +173,14 @@ impl Client { /// #[tokio::main] /// async fn main() -> Result<(), Box> { /// let account = Account::offline("bot"); - /// let (client, rx) = Client::join(&account, "localhost").await?; + /// let (client, rx) = Client::join(account, "localhost").await?; /// client.chat("Hello, world!"); /// client.disconnect(); /// Ok(()) /// } /// ``` pub async fn join( - account: &Account, + account: Account, address: impl TryInto, ) -> Result<(Self, mpsc::UnboundedReceiver), JoinError> { let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?; @@ -189,8 +189,8 @@ impl Client { let client = Self::start_client(StartClientOpts::new( account, - &address, - &resolved_address, + address, + resolved_address, Some(tx), )) .await?; @@ -198,7 +198,7 @@ impl Client { } pub async fn join_with_proxy( - account: &Account, + account: Account, address: impl TryInto, proxy: Proxy, ) -> Result<(Self, mpsc::UnboundedReceiver), JoinError> { @@ -207,7 +207,7 @@ impl Client { let (tx, rx) = mpsc::unbounded_channel(); let client = Self::start_client( - StartClientOpts::new(account, &address, &resolved_address, Some(tx)).proxy(proxy), + StartClientOpts::new(account, address, resolved_address, Some(tx)).proxy(proxy), ) .await?; Ok((client, rx)) @@ -219,28 +219,21 @@ impl Client { StartClientOpts { ecs_lock, account, - address, - resolved_address, - proxy, + connect_opts, event_sender, - }: StartClientOpts<'_>, + }: StartClientOpts, ) -> Result { // send a StartJoinServerEvent let (start_join_callback_tx, mut start_join_callback_rx) = mpsc::unbounded_channel::>(); - { - let mut ecs = ecs_lock.lock(); - ecs.send_event(StartJoinServerEvent { - account: account.clone(), - address: address.clone(), - resolved_address: *resolved_address, - proxy, - event_sender: event_sender.clone(), - start_join_callback_tx: Some(StartJoinCallback(start_join_callback_tx)), - }); - } + ecs_lock.lock().send_event(StartJoinServerEvent { + account, + connect_opts, + event_sender, + start_join_callback_tx: Some(StartJoinCallback(start_join_callback_tx)), + }); let entity = start_join_callback_rx.recv().await.expect( "StartJoinCallback should not be dropped before sending a message, this is a bug in Azalea", diff --git a/azalea-client/src/plugins/auto_reconnect.rs b/azalea-client/src/plugins/auto_reconnect.rs new file mode 100644 index 00000000..280aaa65 --- /dev/null +++ b/azalea-client/src/plugins/auto_reconnect.rs @@ -0,0 +1,138 @@ +//! Auto-reconnect to the server when the client is kicked. +//! +//! See [`AutoReconnectPlugin`] for more information. + +use std::time::{Duration, Instant}; + +use bevy_app::prelude::*; +use bevy_ecs::prelude::*; + +use super::{ + disconnect::DisconnectEvent, + events::LocalPlayerEvents, + join::{ConnectOpts, ConnectionFailedEvent, StartJoinServerEvent}, +}; +use crate::Account; + +/// The default delay that Azalea will use for reconnecting our clients. See +/// [`AutoReconnectPlugin`] for more information. +pub const DEFAULT_RECONNECT_DELAY: Duration = Duration::from_secs(5); + +/// A default plugin that makes clients automatically rejoin the server when +/// they're disconnected. The reconnect delay is configurable globally or +/// per-client with the [`AutoReconnectDelay`] resource/component. Auto +/// reconnecting can be disabled by removing the resource from the ECS. +/// +/// The delay defaults to [`DEFAULT_RECONNECT_DELAY`]. +pub struct AutoReconnectPlugin; +impl Plugin for AutoReconnectPlugin { + fn build(&self, app: &mut App) { + app.insert_resource(AutoReconnectDelay::new(DEFAULT_RECONNECT_DELAY)) + .add_systems( + Update, + (start_rejoin_on_disconnect, rejoin_after_delay) + .chain() + .before(super::join::handle_start_join_server_event), + ); + } +} + +pub fn start_rejoin_on_disconnect( + mut commands: Commands, + mut disconnect_events: EventReader, + mut connection_failed_events: EventReader, + auto_reconnect_delay_res: Option>, + auto_reconnect_delay_query: Query<&AutoReconnectDelay>, +) { + for entity in disconnect_events + .read() + .map(|e| e.entity) + .chain(connection_failed_events.read().map(|e| e.entity)) + { + let Some(delay) = get_delay( + &auto_reconnect_delay_res, + auto_reconnect_delay_query, + entity, + ) else { + // no auto reconnect + continue; + }; + + let reconnect_after = Instant::now() + delay; + commands.entity(entity).insert(InternalReconnectAfter { + instant: reconnect_after, + }); + } +} + +fn get_delay( + auto_reconnect_delay_res: &Option>, + auto_reconnect_delay_query: Query<&AutoReconnectDelay>, + entity: Entity, +) -> Option { + if let Ok(c) = auto_reconnect_delay_query.get(entity) { + Some(c.delay) + } else if let Some(r) = &auto_reconnect_delay_res { + Some(r.delay) + } else { + None + } +} + +pub fn rejoin_after_delay( + mut commands: Commands, + mut join_events: EventWriter, + query: Query<( + Entity, + &InternalReconnectAfter, + &Account, + &ConnectOpts, + Option<&LocalPlayerEvents>, + )>, +) { + for (entity, reconnect_after, account, connect_opts, local_player_events) in query.iter() { + if Instant::now() >= reconnect_after.instant { + // don't keep trying to reconnect + commands.entity(entity).remove::(); + + // our Entity will be reused since the account has the same uuid + join_events.write(StartJoinServerEvent { + account: account.clone(), + connect_opts: connect_opts.clone(), + // not actually necessary since we're reusing the same entity and LocalPlayerEvents + // isn't removed, but this is more readable and just in case it's changed in the + // future + event_sender: local_player_events.map(|e| e.0.clone()), + start_join_callback_tx: None, + }); + } + } +} + +/// A resource *and* component that indicates how long to wait before +/// reconnecting when we're kicked. +/// +/// Initially, it's a resource in the ECS set to 5 seconds. You can modify +/// the resource to update the global reconnect delay, or insert it as a +/// component to set the individual delay for a single client. +/// +/// You can also remove this resource from the ECS to disable the default +/// auto-reconnecting behavior. Inserting the resource/component again will not +/// make clients that were already disconnected automatically reconnect. +#[derive(Resource, Component, Debug, Clone)] +pub struct AutoReconnectDelay { + pub delay: Duration, +} +impl AutoReconnectDelay { + pub fn new(delay: Duration) -> Self { + Self { delay } + } +} + +/// This is inserted when we're disconnected and indicates when we'll reconnect. +/// +/// This is set based on [`AutoReconnectDelay`]. +#[derive(Component, Debug, Clone)] +pub struct InternalReconnectAfter { + pub instant: Instant, +} diff --git a/azalea-client/src/plugins/disconnect.rs b/azalea-client/src/plugins/disconnect.rs index 343c25d8..987007c2 100644 --- a/azalea-client/src/plugins/disconnect.rs +++ b/azalea-client/src/plugins/disconnect.rs @@ -7,10 +7,7 @@ use bevy_ecs::prelude::*; use derive_more::Deref; use tracing::info; -use crate::{ - InstanceHolder, client::JoinedClientBundle, connection::RawConnection, - events::LocalPlayerEvents, -}; +use crate::{InstanceHolder, client::JoinedClientBundle, connection::RawConnection}; pub struct DisconnectPlugin; impl Plugin for DisconnectPlugin { @@ -19,15 +16,28 @@ impl Plugin for DisconnectPlugin { PostUpdate, ( update_read_packets_task_running_component, - disconnect_on_connection_dead, remove_components_from_disconnected_players, + // this happens after `remove_components_from_disconnected_players` since that + // system removes `IsConnectionAlive`, which ensures that + // `DisconnectEvent` won't get called again from + // `disconnect_on_connection_dead` + disconnect_on_connection_dead, ) .chain(), ); } } -/// An event sent when a client is getting disconnected. +/// An event sent when a client got disconnected from the server. +/// +/// If the client was kicked with a reason, that reason will be present in the +/// [`reason`](DisconnectEvent::reason) field. +/// +/// This event won't be sent if creating the initial connection to the server +/// failed, for that see [`ConnectionFailedEvent`]. +/// +/// [`ConnectionFailedEvent`]: crate::join::ConnectionFailedEvent + #[derive(Event)] pub struct DisconnectEvent { pub entity: Entity, @@ -59,8 +69,8 @@ pub fn remove_components_from_disconnected_players( .remove::() // this makes it close the tcp connection .remove::() - // swarm detects when this tx gets dropped to fire SwarmEvent::Disconnect - .remove::(); + // this makes it not send DisconnectEvent again + .remove::(); // note that we don't remove the client from the ECS, so if they decide // to reconnect they'll keep their state diff --git a/azalea-client/src/plugins/join.rs b/azalea-client/src/plugins/join.rs index 3f47d90c..e31c64c4 100644 --- a/azalea-client/src/plugins/join.rs +++ b/azalea-client/src/plugins/join.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{io, net::SocketAddr, sync::Arc}; use azalea_entity::{LocalEntity, indexing::EntityUuidIndex}; use azalea_protocol::{ @@ -29,24 +29,56 @@ use crate::{ pub struct JoinPlugin; impl Plugin for JoinPlugin { fn build(&self, app: &mut App) { - app.add_event::().add_systems( - Update, - (handle_start_join_server_event, poll_create_connection_task), - ); + app.add_event::() + .add_event::() + .add_systems( + Update, + ( + handle_start_join_server_event.before(super::login::poll_auth_task), + poll_create_connection_task, + handle_connection_failed_events, + ) + .chain(), + ); } } +/// An event to make a client join the server and be added to our swarm. +/// +/// This won't do anything if a client with the Account UUID is already +/// connected to the server. #[derive(Event, Debug)] pub struct StartJoinServerEvent { pub account: Account, - pub address: ServerAddress, - pub resolved_address: SocketAddr, - pub proxy: Option, + pub connect_opts: ConnectOpts, pub event_sender: Option>, pub start_join_callback_tx: Option, } +/// Options for how the connection to the server will be made. These are +/// persisted on reconnects. +/// +/// This is inserted as a component on clients to make auto-reconnecting work. +#[derive(Debug, Clone, Component)] +pub struct ConnectOpts { + pub address: ServerAddress, + pub resolved_address: SocketAddr, + pub proxy: Option, +} + +/// An event that's sent when creating the TCP connection and sending the first +/// packet fails. +/// +/// This isn't sent if we're kicked later, see [`DisconnectEvent`]. +/// +/// [`DisconnectEvent`]: crate::disconnect::DisconnectEvent +#[derive(Event)] +pub struct ConnectionFailedEvent { + pub entity: Entity, + pub error: ConnectionError, +} + // this is mpsc instead of oneshot so it can be cloned (since it's sent in an // event) #[derive(Component, Debug, Clone)] @@ -56,11 +88,30 @@ pub fn handle_start_join_server_event( mut commands: Commands, mut events: EventReader, mut entity_uuid_index: ResMut, + connection_query: Query<&RawConnection>, ) { for event in events.read() { let uuid = event.account.uuid_or_offline(); let entity = if let Some(entity) = entity_uuid_index.get(&uuid) { debug!("Reusing entity {entity:?} for client"); + + // check if it's already connected + if let Ok(conn) = connection_query.get(entity) { + if conn.is_alive() { + if let Some(start_join_callback_tx) = &event.start_join_callback_tx { + warn!( + "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok." + ); + let _ = start_join_callback_tx.0.send(Ok(entity)); + } else { + warn!( + "Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event." + ); + } + return; + } + } + entity } else { let entity = commands.spawn_empty().id(); @@ -71,12 +122,15 @@ pub fn handle_start_join_server_event( }; let mut entity_mut = commands.entity(entity); + entity_mut.insert(( // add the Account to the entity now so plugins can access it earlier event.account.to_owned(), // localentity is always present for our clients, even if we're not actually logged // in LocalEntity, + // ConnectOpts is inserted as a component here + event.connect_opts.clone(), // we don't insert InLoginState until we actually create the connection. note that // there's no InHandshakeState component since we switch off of the handshake state // immediately when the connection is created @@ -92,11 +146,9 @@ pub fn handle_start_join_server_event( } let task_pool = IoTaskPool::get(); - let resolved_addr = event.resolved_address; - let address = event.address.clone(); - let proxy = event.proxy.clone(); + let connect_opts = event.connect_opts.clone(); let task = task_pool.spawn(async_compat::Compat::new( - create_conn_and_send_intention_packet(resolved_addr, address, proxy), + create_conn_and_send_intention_packet(connect_opts), )); entity_mut.insert(CreateConnectionTask(task)); @@ -104,20 +156,18 @@ pub fn handle_start_join_server_event( } async fn create_conn_and_send_intention_packet( - resolved_addr: SocketAddr, - address: ServerAddress, - proxy: Option, + opts: ConnectOpts, ) -> Result { - let mut conn = if let Some(proxy) = proxy { - Connection::new_with_proxy(&resolved_addr, proxy).await? + let mut conn = if let Some(proxy) = opts.proxy { + Connection::new_with_proxy(&opts.resolved_address, proxy).await? } else { - Connection::new(&resolved_addr).await? + Connection::new(&opts.resolved_address).await? }; conn.write(ServerboundIntention { protocol_version: PROTOCOL_VERSION, - hostname: address.host.clone(), - port: address.port, + hostname: opts.address.host.clone(), + port: opts.address.port, intention: ClientIntention::Login, }) .await?; @@ -140,6 +190,7 @@ pub fn poll_create_connection_task( &Account, Option<&StartJoinCallback>, )>, + mut connection_failed_events: EventWriter, ) { for (entity, mut task, account, mut start_join_callback) in query.iter_mut() { if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) { @@ -147,11 +198,9 @@ pub fn poll_create_connection_task( entity_mut.remove::(); let conn = match poll_res { Ok(conn) => conn, - Err(err) => { - warn!("failed to create connection: {err}"); - if let Some(cb) = start_join_callback.take() { - let _ = cb.0.send(Err(err.into())); - } + Err(error) => { + warn!("failed to create connection: {error}"); + connection_failed_events.write(ConnectionFailedEvent { entity, error }); return; } }; @@ -196,3 +245,22 @@ pub fn poll_create_connection_task( } } } + +pub fn handle_connection_failed_events( + mut events: EventReader, + query: Query<&StartJoinCallback>, +) { + for event in events.read() { + let Ok(start_join_callback) = query.get(event.entity) else { + // the StartJoinCallback isn't required to be present, so this is fine + continue; + }; + + // io::Error isn't clonable, so we create a new one based on the `kind` and + // `to_string`, + let ConnectionError::Io(err) = &event.error; + let cloned_err = ConnectionError::Io(io::Error::new(err.kind(), err.to_string())); + + let _ = start_join_callback.0.send(Err(cloned_err.into())); + } +} diff --git a/azalea-client/src/plugins/login.rs b/azalea-client/src/plugins/login.rs index 9a871ac3..357769e9 100644 --- a/azalea-client/src/plugins/login.rs +++ b/azalea-client/src/plugins/login.rs @@ -33,7 +33,7 @@ fn handle_receive_hello_event(trigger: Trigger, mut commands: commands.entity(player).insert(AuthTask(task)); } -fn poll_auth_task( +pub fn poll_auth_task( mut commands: Commands, mut query: Query<(Entity, &mut AuthTask, &mut RawConnection)>, ) { diff --git a/azalea-client/src/plugins/mod.rs b/azalea-client/src/plugins/mod.rs index 431d59b2..f657b9e9 100644 --- a/azalea-client/src/plugins/mod.rs +++ b/azalea-client/src/plugins/mod.rs @@ -1,6 +1,7 @@ use bevy_app::{PluginGroup, PluginGroupBuilder}; pub mod attack; +pub mod auto_reconnect; pub mod brand; pub mod chat; pub mod chunks; @@ -51,7 +52,8 @@ impl PluginGroup for DefaultPlugins { .add(pong::PongPlugin) .add(connection::ConnectionPlugin) .add(login::LoginPlugin) - .add(join::JoinPlugin); + .add(join::JoinPlugin) + .add(auto_reconnect::AutoReconnectPlugin); #[cfg(feature = "log")] { group = group.add(bevy_log::LogPlugin::default()); diff --git a/azalea/examples/testbot/main.rs b/azalea/examples/testbot/main.rs index c25904cf..6733d797 100644 --- a/azalea/examples/testbot/main.rs +++ b/azalea/examples/testbot/main.rs @@ -192,14 +192,10 @@ async fn handle(bot: Client, event: azalea::Event, state: State) -> anyhow::Resu Ok(()) } -async fn swarm_handle(swarm: Swarm, event: SwarmEvent, _state: SwarmState) -> anyhow::Result<()> { +async fn swarm_handle(_swarm: Swarm, event: SwarmEvent, _state: SwarmState) -> anyhow::Result<()> { match &event { - SwarmEvent::Disconnect(account, join_opts) => { + SwarmEvent::Disconnect(account, _join_opts) => { println!("bot got kicked! {}", account.username); - tokio::time::sleep(Duration::from_secs(5)).await; - swarm - .add_and_retry_forever_with_opts(account, State::default(), join_opts) - .await; } SwarmEvent::Chat(chat) => { if chat.message().to_string() == "The particle was not visible for anybody" { diff --git a/azalea/src/lib.rs b/azalea/src/lib.rs index d63ea6c3..3f388e42 100644 --- a/azalea/src/lib.rs +++ b/azalea/src/lib.rs @@ -14,6 +14,7 @@ pub mod prelude; pub mod swarm; use std::net::SocketAddr; +use std::time::Duration; use app::Plugins; pub use azalea_auth as auth; @@ -126,7 +127,12 @@ impl ClientBuilder { /// Set the function that's called every time a bot receives an [`Event`]. /// This is the way to handle normal per-bot events. /// - /// Currently you can have up to one client handler. + /// Currently, you can have up to one client handler. + /// + /// Note that if you're creating clients directly from the ECS using + /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then + /// the handler function won't be called for that client. This shouldn't be + /// a concern for most bots, though. /// /// ``` /// # use azalea::prelude::*; @@ -139,6 +145,8 @@ impl ClientBuilder { /// Ok(()) /// } /// ``` + /// + /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent #[must_use] pub fn set_handler(self, handler: HandleFn) -> ClientBuilder where @@ -169,6 +177,22 @@ where self } + /// Configures the auto-reconnection behavior for our bot. + /// + /// If this is `Some`, then it'll set the default reconnection delay for our + /// bot (how long it'll wait after being kicked before it tries + /// rejoining). if it's `None`, then auto-reconnecting will be disabled. + /// + /// If this function isn't called, then our client will reconnect after + /// [`DEFAULT_RECONNECT_DELAY`]. + /// + /// [`DEFAULT_RECONNECT_DELAY`]: azalea_client::auto_reconnect::DEFAULT_RECONNECT_DELAY + #[must_use] + pub fn reconnect_after(mut self, delay: impl Into>) -> Self { + self.swarm.reconnect_after = delay.into(); + self + } + /// Build this `ClientBuilder` into an actual [`Client`] and join the given /// server. If the client can't join, it'll keep retrying forever until it /// can. diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index 4fd5120a..57a12608 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -19,9 +19,13 @@ use std::{ }; use azalea_client::{ - Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts, chat::ChatPacket, + Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts, + auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY}, + chat::ChatPacket, + join::ConnectOpts, start_ecs_runner, }; +use azalea_entity::LocalEntity; use azalea_protocol::{ServerAddress, resolver}; use azalea_world::InstanceContainer; use bevy_app::{App, PluginGroup, PluginGroupBuilder, Plugins, SubApp}; @@ -46,15 +50,15 @@ use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartEr pub struct Swarm { pub ecs_lock: Arc>, - bots: Arc>>, - // the address is public and mutable so plugins can change it pub resolved_address: Arc>, pub address: Arc>, pub instance_container: Arc>, + /// This is used internally to make the client handler function work. bots_tx: mpsc::UnboundedSender<(Option, Client)>, + /// This is used internally to make the swarm handler function work. swarm_tx: mpsc::UnboundedSender, } @@ -92,7 +96,11 @@ where /// None to have every bot connect at the same time. None is different than /// a duration of 0, since if a duration is present the bots will wait for /// the previous one to be ready. - pub(crate) join_delay: Option, + pub(crate) join_delay: Option, + + /// The default reconnection delay for our bots. This will change the value + /// of the `AutoReconnectDelay` resource. + pub(crate) reconnect_after: Option, } impl SwarmBuilder { /// Start creating the swarm. @@ -144,6 +152,7 @@ impl SwarmBuilder { handler: None, swarm_handler: None, join_delay: None, + reconnect_after: Some(DEFAULT_RECONNECT_DELAY), } } } @@ -157,6 +166,12 @@ where /// /// Currently you can have up to one handler. /// + /// Note that if you're creating clients directly from the ECS using + /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then + /// the handler function won't be called for that client. This also applies + /// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for + /// most bots, though. + /// /// ``` /// # use azalea::{prelude::*, swarm::prelude::*}; /// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle); @@ -178,6 +193,8 @@ where /// # Ok(()) /// # } /// ``` + /// + /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent #[must_use] pub fn set_handler(self, handler: HandleFn) -> SwarmBuilder where @@ -205,6 +222,12 @@ where /// /// Currently you can have up to one swarm handler. /// + /// Note that if you're creating clients directly from the ECS using + /// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then + /// this handler function won't be called for that client. This also applies + /// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for + /// most bots, though. + /// /// ``` /// # use azalea::{prelude::*, swarm::prelude::*}; /// # let swarm_builder = SwarmBuilder::new().set_handler(handle); @@ -227,6 +250,8 @@ where /// Ok(()) /// } /// ``` + /// + /// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent #[must_use] pub fn set_swarm_handler( self, @@ -246,6 +271,7 @@ where Box::pin(handler(swarm, event, state)) })), join_delay: self.join_delay, + reconnect_after: self.reconnect_after, } } } @@ -341,11 +367,25 @@ where /// field, however, the bots will wait for the previous one to have /// connected and *then* they'll wait the given duration. #[must_use] - pub fn join_delay(mut self, delay: std::time::Duration) -> Self { + pub fn join_delay(mut self, delay: Duration) -> Self { self.join_delay = Some(delay); self } + /// Configures the auto-reconnection behavior for our bots. + /// + /// If this is `Some`, then it'll set the default reconnection delay for our + /// bots (how long they'll wait after being kicked before they try + /// rejoining). if it's `None`, then auto-reconnecting will be disabled. + /// + /// If this function isn't called, then our clients will reconnect after + /// [`DEFAULT_RECONNECT_DELAY`]. + #[must_use] + pub fn reconnect_after(mut self, delay: impl Into>) -> Self { + self.reconnect_after = delay.into(); + self + } + /// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given /// server. /// @@ -406,7 +446,6 @@ where let swarm = Swarm { ecs_lock: ecs_lock.clone(), - bots: Arc::new(Mutex::new(HashMap::new())), resolved_address: Arc::new(RwLock::new(resolved_address)), address: Arc::new(RwLock::new(address)), @@ -422,6 +461,13 @@ where let mut ecs = ecs_lock.lock(); ecs.insert_resource(swarm.clone()); ecs.insert_resource(self.swarm_state.clone()); + if let Some(reconnect_after) = self.reconnect_after { + ecs.insert_resource(AutoReconnectDelay { + delay: reconnect_after, + }); + } else { + ecs.remove_resource::(); + } ecs.run_schedule(main_schedule_label); ecs.clear_trackers(); } @@ -556,8 +602,12 @@ pub enum SwarmEvent { Init, /// A bot got disconnected from the server. /// - /// You can implement an auto-reconnect by calling [`Swarm::add_with_opts`] - /// with the account and options from this event. + /// If you'd like to implement special auto-reconnect behavior beyond what's + /// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`] + /// and then call [`Swarm::add_with_opts`] with the account and options + /// from this event. + /// + /// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after Disconnect(Box, JoinOpts), /// At least one bot received a chat message. Chat(ChatPacket), @@ -664,15 +714,18 @@ impl Swarm { let resolved_address = join_opts .custom_resolved_address .unwrap_or_else(|| *self.resolved_address.read()); + let proxy = join_opts.proxy.clone(); let (tx, rx) = mpsc::unbounded_channel(); let bot = Client::start_client(StartClientOpts { ecs_lock: self.ecs_lock.clone(), - account, - address: &address, - resolved_address: &resolved_address, - proxy: join_opts.proxy.clone(), + account: account.clone(), + connect_opts: ConnectOpts { + address, + resolved_address, + proxy, + }, event_sender: Some(tx), }) .await?; @@ -682,31 +735,25 @@ impl Swarm { ecs.entity_mut(bot.entity).insert(state); } - self.bots.lock().insert(bot.entity, bot.clone()); - - let cloned_bots = self.bots.clone(); - let cloned_bots_tx = self.bots_tx.clone(); let cloned_bot = bot.clone(); let swarm_tx = self.swarm_tx.clone(); + let bots_tx = self.bots_tx.clone(); + let join_opts = join_opts.clone(); tokio::spawn(Self::event_copying_task( - rx, - cloned_bots, - cloned_bots_tx, - cloned_bot, - swarm_tx, - join_opts, + rx, swarm_tx, bots_tx, cloned_bot, join_opts, )); Ok(bot) } + /// Copy the events from a client's receiver into bots_tx, until the bot is + /// removed from the ECS. async fn event_copying_task( mut rx: mpsc::UnboundedReceiver, - cloned_bots: Arc>>, - cloned_bots_tx: mpsc::UnboundedSender<(Option, Client)>, - cloned_bot: Client, swarm_tx: mpsc::UnboundedSender, + bots_tx: mpsc::UnboundedSender<(Option, Client)>, + bot: Client, join_opts: JoinOpts, ) { while let Some(event) = rx.recv().await { @@ -740,21 +787,33 @@ impl Swarm { } } + if let Event::Disconnect(_) = event { + debug!( + "sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}", + bot.entity + ); + let account = bot + .get_component::() + .expect("bot is missing required Account component"); + swarm_tx + .send(SwarmEvent::Disconnect(Box::new(account), join_opts.clone())) + .unwrap(); + } + // we can't handle events here (since we can't copy the handler), // they're handled above in SwarmBuilder::start - if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) { - error!("Error sending event to swarm: {e}"); + if let Err(e) = bots_tx.send((Some(event), bot.clone())) { + error!( + "Error sending event to swarm, aborting event_copying_task for {}: {e}", + bot.entity + ); + break; } } - debug!("client sender ended, removing from cloned_bots and sending SwarmEvent::Disconnect"); - - cloned_bots.lock().remove(&cloned_bot.entity); - let account = cloned_bot - .get_component::() - .expect("bot is missing required Account component"); - swarm_tx - .send(SwarmEvent::Disconnect(Box::new(account), join_opts)) - .unwrap(); + debug!( + "client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event", + bot.entity + ); } /// Add a new account to the swarm, retrying if it couldn't join. This will @@ -807,6 +866,17 @@ impl Swarm { } } } + + /// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world. + /// This will include clients that were disconnected without being removed + /// from the ECS. + /// + /// [`LocalEntity`]: azalea_entity::LocalEntity + pub fn client_entities(&self) -> Box<[Entity]> { + let mut ecs = self.ecs_lock.lock(); + let mut query = ecs.query_filtered::>(); + query.iter(&ecs).collect::>() + } } impl IntoIterator for Swarm { @@ -827,11 +897,12 @@ impl IntoIterator for Swarm { /// # } /// ``` fn into_iter(self) -> Self::IntoIter { - self.bots - .lock() - .clone() - .into_values() - .collect::>() + let client_entities = self.client_entities(); + + client_entities + .into_iter() + .map(|entity| Client::new(entity, self.ecs_lock.clone())) + .collect::>() .into_iter() } }