1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

Add AutoReconnectPlugin (#221)

* add AutoReconnectPlugin

* merge main

* start simplifying swarm internals

* fix Swarm::into_iter, handler functions, DisconnectEvent, and add some more docs

* add ClientBuilder/SwarmBuilder::reconnect_after

* fix a doctest

* reword SwarmEvent::Disconnect doc

* better behavior when we try to join twice

* reconnect on ConnectionFailedEvent too

* autoreconnect is less breaking now
This commit is contained in:
mat 2025-05-02 15:55:58 -05:00 committed by GitHub
parent 52e34de95c
commit 9a40b65bc1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 423 additions and 120 deletions

View file

@ -17,6 +17,7 @@ write down most non-trivial breaking changes.
- azalea and azalea-client now have a `packet-event` feature, which can be disabled for efficiency if you're not using `Event::Packet`.
- `StartJoinServerEvent` can now be used to join servers exclusively from the ECS without a Tokio runtime.
- `FormattedText::to_html` and `FormattedText::to_custom_format`.
- Add auto-reconnecting which is enabled by default.
### Changed

View file

@ -8,4 +8,4 @@ pub mod style;
pub mod text_component;
pub mod translatable_component;
pub use component::{FormattedText, DEFAULT_STYLE};
pub use component::{DEFAULT_STYLE, FormattedText};

View file

@ -54,7 +54,7 @@ use crate::{
events::Event,
interact::CurrentSequenceNumber,
inventory::Inventory,
join::{StartJoinCallback, StartJoinServerEvent},
join::{ConnectOpts, StartJoinCallback, StartJoinServerEvent},
local_player::{
GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList,
},
@ -107,22 +107,20 @@ pub enum JoinError {
Disconnect { reason: FormattedText },
}
pub struct StartClientOpts<'a> {
pub struct StartClientOpts {
pub ecs_lock: Arc<Mutex<World>>,
pub account: &'a Account,
pub address: &'a ServerAddress,
pub resolved_address: &'a SocketAddr,
pub proxy: Option<Proxy>,
pub account: Account,
pub connect_opts: ConnectOpts,
pub event_sender: Option<mpsc::UnboundedSender<Event>>,
}
impl<'a> StartClientOpts<'a> {
impl StartClientOpts {
pub fn new(
account: &'a Account,
address: &'a ServerAddress,
resolved_address: &'a SocketAddr,
account: Account,
address: ServerAddress,
resolved_address: SocketAddr,
event_sender: Option<mpsc::UnboundedSender<Event>>,
) -> StartClientOpts<'a> {
) -> StartClientOpts {
let mut app = App::new();
app.add_plugins(DefaultPlugins);
@ -132,15 +130,17 @@ impl<'a> StartClientOpts<'a> {
Self {
ecs_lock,
account,
address,
resolved_address,
proxy: None,
connect_opts: ConnectOpts {
address,
resolved_address,
proxy: None,
},
event_sender,
}
}
pub fn proxy(mut self, proxy: Proxy) -> Self {
self.proxy = Some(proxy);
self.connect_opts.proxy = Some(proxy);
self
}
}
@ -173,14 +173,14 @@ impl Client {
/// #[tokio::main]
/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
/// let account = Account::offline("bot");
/// let (client, rx) = Client::join(&account, "localhost").await?;
/// let (client, rx) = Client::join(account, "localhost").await?;
/// client.chat("Hello, world!");
/// client.disconnect();
/// Ok(())
/// }
/// ```
pub async fn join(
account: &Account,
account: Account,
address: impl TryInto<ServerAddress>,
) -> Result<(Self, mpsc::UnboundedReceiver<Event>), JoinError> {
let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?;
@ -189,8 +189,8 @@ impl Client {
let client = Self::start_client(StartClientOpts::new(
account,
&address,
&resolved_address,
address,
resolved_address,
Some(tx),
))
.await?;
@ -198,7 +198,7 @@ impl Client {
}
pub async fn join_with_proxy(
account: &Account,
account: Account,
address: impl TryInto<ServerAddress>,
proxy: Proxy,
) -> Result<(Self, mpsc::UnboundedReceiver<Event>), JoinError> {
@ -207,7 +207,7 @@ impl Client {
let (tx, rx) = mpsc::unbounded_channel();
let client = Self::start_client(
StartClientOpts::new(account, &address, &resolved_address, Some(tx)).proxy(proxy),
StartClientOpts::new(account, address, resolved_address, Some(tx)).proxy(proxy),
)
.await?;
Ok((client, rx))
@ -219,28 +219,21 @@ impl Client {
StartClientOpts {
ecs_lock,
account,
address,
resolved_address,
proxy,
connect_opts,
event_sender,
}: StartClientOpts<'_>,
}: StartClientOpts,
) -> Result<Self, JoinError> {
// send a StartJoinServerEvent
let (start_join_callback_tx, mut start_join_callback_rx) =
mpsc::unbounded_channel::<Result<Entity, JoinError>>();
{
let mut ecs = ecs_lock.lock();
ecs.send_event(StartJoinServerEvent {
account: account.clone(),
address: address.clone(),
resolved_address: *resolved_address,
proxy,
event_sender: event_sender.clone(),
start_join_callback_tx: Some(StartJoinCallback(start_join_callback_tx)),
});
}
ecs_lock.lock().send_event(StartJoinServerEvent {
account,
connect_opts,
event_sender,
start_join_callback_tx: Some(StartJoinCallback(start_join_callback_tx)),
});
let entity = start_join_callback_rx.recv().await.expect(
"StartJoinCallback should not be dropped before sending a message, this is a bug in Azalea",

View file

@ -0,0 +1,138 @@
//! Auto-reconnect to the server when the client is kicked.
//!
//! See [`AutoReconnectPlugin`] for more information.
use std::time::{Duration, Instant};
use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use super::{
disconnect::DisconnectEvent,
events::LocalPlayerEvents,
join::{ConnectOpts, ConnectionFailedEvent, StartJoinServerEvent},
};
use crate::Account;
/// The default delay that Azalea will use for reconnecting our clients. See
/// [`AutoReconnectPlugin`] for more information.
pub const DEFAULT_RECONNECT_DELAY: Duration = Duration::from_secs(5);
/// A default plugin that makes clients automatically rejoin the server when
/// they're disconnected. The reconnect delay is configurable globally or
/// per-client with the [`AutoReconnectDelay`] resource/component. Auto
/// reconnecting can be disabled by removing the resource from the ECS.
///
/// The delay defaults to [`DEFAULT_RECONNECT_DELAY`].
pub struct AutoReconnectPlugin;
impl Plugin for AutoReconnectPlugin {
fn build(&self, app: &mut App) {
app.insert_resource(AutoReconnectDelay::new(DEFAULT_RECONNECT_DELAY))
.add_systems(
Update,
(start_rejoin_on_disconnect, rejoin_after_delay)
.chain()
.before(super::join::handle_start_join_server_event),
);
}
}
pub fn start_rejoin_on_disconnect(
mut commands: Commands,
mut disconnect_events: EventReader<DisconnectEvent>,
mut connection_failed_events: EventReader<ConnectionFailedEvent>,
auto_reconnect_delay_res: Option<Res<AutoReconnectDelay>>,
auto_reconnect_delay_query: Query<&AutoReconnectDelay>,
) {
for entity in disconnect_events
.read()
.map(|e| e.entity)
.chain(connection_failed_events.read().map(|e| e.entity))
{
let Some(delay) = get_delay(
&auto_reconnect_delay_res,
auto_reconnect_delay_query,
entity,
) else {
// no auto reconnect
continue;
};
let reconnect_after = Instant::now() + delay;
commands.entity(entity).insert(InternalReconnectAfter {
instant: reconnect_after,
});
}
}
fn get_delay(
auto_reconnect_delay_res: &Option<Res<AutoReconnectDelay>>,
auto_reconnect_delay_query: Query<&AutoReconnectDelay>,
entity: Entity,
) -> Option<Duration> {
if let Ok(c) = auto_reconnect_delay_query.get(entity) {
Some(c.delay)
} else if let Some(r) = &auto_reconnect_delay_res {
Some(r.delay)
} else {
None
}
}
pub fn rejoin_after_delay(
mut commands: Commands,
mut join_events: EventWriter<StartJoinServerEvent>,
query: Query<(
Entity,
&InternalReconnectAfter,
&Account,
&ConnectOpts,
Option<&LocalPlayerEvents>,
)>,
) {
for (entity, reconnect_after, account, connect_opts, local_player_events) in query.iter() {
if Instant::now() >= reconnect_after.instant {
// don't keep trying to reconnect
commands.entity(entity).remove::<InternalReconnectAfter>();
// our Entity will be reused since the account has the same uuid
join_events.write(StartJoinServerEvent {
account: account.clone(),
connect_opts: connect_opts.clone(),
// not actually necessary since we're reusing the same entity and LocalPlayerEvents
// isn't removed, but this is more readable and just in case it's changed in the
// future
event_sender: local_player_events.map(|e| e.0.clone()),
start_join_callback_tx: None,
});
}
}
}
/// A resource *and* component that indicates how long to wait before
/// reconnecting when we're kicked.
///
/// Initially, it's a resource in the ECS set to 5 seconds. You can modify
/// the resource to update the global reconnect delay, or insert it as a
/// component to set the individual delay for a single client.
///
/// You can also remove this resource from the ECS to disable the default
/// auto-reconnecting behavior. Inserting the resource/component again will not
/// make clients that were already disconnected automatically reconnect.
#[derive(Resource, Component, Debug, Clone)]
pub struct AutoReconnectDelay {
pub delay: Duration,
}
impl AutoReconnectDelay {
pub fn new(delay: Duration) -> Self {
Self { delay }
}
}
/// This is inserted when we're disconnected and indicates when we'll reconnect.
///
/// This is set based on [`AutoReconnectDelay`].
#[derive(Component, Debug, Clone)]
pub struct InternalReconnectAfter {
pub instant: Instant,
}

View file

@ -7,10 +7,7 @@ use bevy_ecs::prelude::*;
use derive_more::Deref;
use tracing::info;
use crate::{
InstanceHolder, client::JoinedClientBundle, connection::RawConnection,
events::LocalPlayerEvents,
};
use crate::{InstanceHolder, client::JoinedClientBundle, connection::RawConnection};
pub struct DisconnectPlugin;
impl Plugin for DisconnectPlugin {
@ -19,15 +16,28 @@ impl Plugin for DisconnectPlugin {
PostUpdate,
(
update_read_packets_task_running_component,
disconnect_on_connection_dead,
remove_components_from_disconnected_players,
// this happens after `remove_components_from_disconnected_players` since that
// system removes `IsConnectionAlive`, which ensures that
// `DisconnectEvent` won't get called again from
// `disconnect_on_connection_dead`
disconnect_on_connection_dead,
)
.chain(),
);
}
}
/// An event sent when a client is getting disconnected.
/// An event sent when a client got disconnected from the server.
///
/// If the client was kicked with a reason, that reason will be present in the
/// [`reason`](DisconnectEvent::reason) field.
///
/// This event won't be sent if creating the initial connection to the server
/// failed, for that see [`ConnectionFailedEvent`].
///
/// [`ConnectionFailedEvent`]: crate::join::ConnectionFailedEvent
#[derive(Event)]
pub struct DisconnectEvent {
pub entity: Entity,
@ -59,8 +69,8 @@ pub fn remove_components_from_disconnected_players(
.remove::<InLoadedChunk>()
// this makes it close the tcp connection
.remove::<RawConnection>()
// swarm detects when this tx gets dropped to fire SwarmEvent::Disconnect
.remove::<LocalPlayerEvents>();
// this makes it not send DisconnectEvent again
.remove::<IsConnectionAlive>();
// note that we don't remove the client from the ECS, so if they decide
// to reconnect they'll keep their state

View file

@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc};
use std::{io, net::SocketAddr, sync::Arc};
use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
use azalea_protocol::{
@ -29,24 +29,56 @@ use crate::{
pub struct JoinPlugin;
impl Plugin for JoinPlugin {
fn build(&self, app: &mut App) {
app.add_event::<StartJoinServerEvent>().add_systems(
Update,
(handle_start_join_server_event, poll_create_connection_task),
);
app.add_event::<StartJoinServerEvent>()
.add_event::<ConnectionFailedEvent>()
.add_systems(
Update,
(
handle_start_join_server_event.before(super::login::poll_auth_task),
poll_create_connection_task,
handle_connection_failed_events,
)
.chain(),
);
}
}
/// An event to make a client join the server and be added to our swarm.
///
/// This won't do anything if a client with the Account UUID is already
/// connected to the server.
#[derive(Event, Debug)]
pub struct StartJoinServerEvent {
pub account: Account,
pub address: ServerAddress,
pub resolved_address: SocketAddr,
pub proxy: Option<Proxy>,
pub connect_opts: ConnectOpts,
pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
pub start_join_callback_tx: Option<StartJoinCallback>,
}
/// Options for how the connection to the server will be made. These are
/// persisted on reconnects.
///
/// This is inserted as a component on clients to make auto-reconnecting work.
#[derive(Debug, Clone, Component)]
pub struct ConnectOpts {
pub address: ServerAddress,
pub resolved_address: SocketAddr,
pub proxy: Option<Proxy>,
}
/// An event that's sent when creating the TCP connection and sending the first
/// packet fails.
///
/// This isn't sent if we're kicked later, see [`DisconnectEvent`].
///
/// [`DisconnectEvent`]: crate::disconnect::DisconnectEvent
#[derive(Event)]
pub struct ConnectionFailedEvent {
pub entity: Entity,
pub error: ConnectionError,
}
// this is mpsc instead of oneshot so it can be cloned (since it's sent in an
// event)
#[derive(Component, Debug, Clone)]
@ -56,11 +88,30 @@ pub fn handle_start_join_server_event(
mut commands: Commands,
mut events: EventReader<StartJoinServerEvent>,
mut entity_uuid_index: ResMut<EntityUuidIndex>,
connection_query: Query<&RawConnection>,
) {
for event in events.read() {
let uuid = event.account.uuid_or_offline();
let entity = if let Some(entity) = entity_uuid_index.get(&uuid) {
debug!("Reusing entity {entity:?} for client");
// check if it's already connected
if let Ok(conn) = connection_query.get(entity) {
if conn.is_alive() {
if let Some(start_join_callback_tx) = &event.start_join_callback_tx {
warn!(
"Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
);
let _ = start_join_callback_tx.0.send(Ok(entity));
} else {
warn!(
"Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
);
}
return;
}
}
entity
} else {
let entity = commands.spawn_empty().id();
@ -71,12 +122,15 @@ pub fn handle_start_join_server_event(
};
let mut entity_mut = commands.entity(entity);
entity_mut.insert((
// add the Account to the entity now so plugins can access it earlier
event.account.to_owned(),
// localentity is always present for our clients, even if we're not actually logged
// in
LocalEntity,
// ConnectOpts is inserted as a component here
event.connect_opts.clone(),
// we don't insert InLoginState until we actually create the connection. note that
// there's no InHandshakeState component since we switch off of the handshake state
// immediately when the connection is created
@ -92,11 +146,9 @@ pub fn handle_start_join_server_event(
}
let task_pool = IoTaskPool::get();
let resolved_addr = event.resolved_address;
let address = event.address.clone();
let proxy = event.proxy.clone();
let connect_opts = event.connect_opts.clone();
let task = task_pool.spawn(async_compat::Compat::new(
create_conn_and_send_intention_packet(resolved_addr, address, proxy),
create_conn_and_send_intention_packet(connect_opts),
));
entity_mut.insert(CreateConnectionTask(task));
@ -104,20 +156,18 @@ pub fn handle_start_join_server_event(
}
async fn create_conn_and_send_intention_packet(
resolved_addr: SocketAddr,
address: ServerAddress,
proxy: Option<Proxy>,
opts: ConnectOpts,
) -> Result<LoginConn, ConnectionError> {
let mut conn = if let Some(proxy) = proxy {
Connection::new_with_proxy(&resolved_addr, proxy).await?
let mut conn = if let Some(proxy) = opts.proxy {
Connection::new_with_proxy(&opts.resolved_address, proxy).await?
} else {
Connection::new(&resolved_addr).await?
Connection::new(&opts.resolved_address).await?
};
conn.write(ServerboundIntention {
protocol_version: PROTOCOL_VERSION,
hostname: address.host.clone(),
port: address.port,
hostname: opts.address.host.clone(),
port: opts.address.port,
intention: ClientIntention::Login,
})
.await?;
@ -140,6 +190,7 @@ pub fn poll_create_connection_task(
&Account,
Option<&StartJoinCallback>,
)>,
mut connection_failed_events: EventWriter<ConnectionFailedEvent>,
) {
for (entity, mut task, account, mut start_join_callback) in query.iter_mut() {
if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
@ -147,11 +198,9 @@ pub fn poll_create_connection_task(
entity_mut.remove::<CreateConnectionTask>();
let conn = match poll_res {
Ok(conn) => conn,
Err(err) => {
warn!("failed to create connection: {err}");
if let Some(cb) = start_join_callback.take() {
let _ = cb.0.send(Err(err.into()));
}
Err(error) => {
warn!("failed to create connection: {error}");
connection_failed_events.write(ConnectionFailedEvent { entity, error });
return;
}
};
@ -196,3 +245,22 @@ pub fn poll_create_connection_task(
}
}
}
pub fn handle_connection_failed_events(
mut events: EventReader<ConnectionFailedEvent>,
query: Query<&StartJoinCallback>,
) {
for event in events.read() {
let Ok(start_join_callback) = query.get(event.entity) else {
// the StartJoinCallback isn't required to be present, so this is fine
continue;
};
// io::Error isn't clonable, so we create a new one based on the `kind` and
// `to_string`,
let ConnectionError::Io(err) = &event.error;
let cloned_err = ConnectionError::Io(io::Error::new(err.kind(), err.to_string()));
let _ = start_join_callback.0.send(Err(cloned_err.into()));
}
}

View file

@ -33,7 +33,7 @@ fn handle_receive_hello_event(trigger: Trigger<ReceiveHelloEvent>, mut commands:
commands.entity(player).insert(AuthTask(task));
}
fn poll_auth_task(
pub fn poll_auth_task(
mut commands: Commands,
mut query: Query<(Entity, &mut AuthTask, &mut RawConnection)>,
) {

View file

@ -1,6 +1,7 @@
use bevy_app::{PluginGroup, PluginGroupBuilder};
pub mod attack;
pub mod auto_reconnect;
pub mod brand;
pub mod chat;
pub mod chunks;
@ -51,7 +52,8 @@ impl PluginGroup for DefaultPlugins {
.add(pong::PongPlugin)
.add(connection::ConnectionPlugin)
.add(login::LoginPlugin)
.add(join::JoinPlugin);
.add(join::JoinPlugin)
.add(auto_reconnect::AutoReconnectPlugin);
#[cfg(feature = "log")]
{
group = group.add(bevy_log::LogPlugin::default());

View file

@ -192,14 +192,10 @@ async fn handle(bot: Client, event: azalea::Event, state: State) -> anyhow::Resu
Ok(())
}
async fn swarm_handle(swarm: Swarm, event: SwarmEvent, _state: SwarmState) -> anyhow::Result<()> {
async fn swarm_handle(_swarm: Swarm, event: SwarmEvent, _state: SwarmState) -> anyhow::Result<()> {
match &event {
SwarmEvent::Disconnect(account, join_opts) => {
SwarmEvent::Disconnect(account, _join_opts) => {
println!("bot got kicked! {}", account.username);
tokio::time::sleep(Duration::from_secs(5)).await;
swarm
.add_and_retry_forever_with_opts(account, State::default(), join_opts)
.await;
}
SwarmEvent::Chat(chat) => {
if chat.message().to_string() == "The particle was not visible for anybody" {

View file

@ -14,6 +14,7 @@ pub mod prelude;
pub mod swarm;
use std::net::SocketAddr;
use std::time::Duration;
use app::Plugins;
pub use azalea_auth as auth;
@ -126,7 +127,12 @@ impl ClientBuilder<NoState, ()> {
/// Set the function that's called every time a bot receives an [`Event`].
/// This is the way to handle normal per-bot events.
///
/// Currently you can have up to one client handler.
/// Currently, you can have up to one client handler.
///
/// Note that if you're creating clients directly from the ECS using
/// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
/// the handler function won't be called for that client. This shouldn't be
/// a concern for most bots, though.
///
/// ```
/// # use azalea::prelude::*;
@ -139,6 +145,8 @@ impl ClientBuilder<NoState, ()> {
/// Ok(())
/// }
/// ```
///
/// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
#[must_use]
pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> ClientBuilder<S, R>
where
@ -169,6 +177,22 @@ where
self
}
/// Configures the auto-reconnection behavior for our bot.
///
/// If this is `Some`, then it'll set the default reconnection delay for our
/// bot (how long it'll wait after being kicked before it tries
/// rejoining). if it's `None`, then auto-reconnecting will be disabled.
///
/// If this function isn't called, then our client will reconnect after
/// [`DEFAULT_RECONNECT_DELAY`].
///
/// [`DEFAULT_RECONNECT_DELAY`]: azalea_client::auto_reconnect::DEFAULT_RECONNECT_DELAY
#[must_use]
pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
self.swarm.reconnect_after = delay.into();
self
}
/// Build this `ClientBuilder` into an actual [`Client`] and join the given
/// server. If the client can't join, it'll keep retrying forever until it
/// can.

View file

@ -19,9 +19,13 @@ use std::{
};
use azalea_client::{
Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts, chat::ChatPacket,
Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts,
auto_reconnect::{AutoReconnectDelay, DEFAULT_RECONNECT_DELAY},
chat::ChatPacket,
join::ConnectOpts,
start_ecs_runner,
};
use azalea_entity::LocalEntity;
use azalea_protocol::{ServerAddress, resolver};
use azalea_world::InstanceContainer;
use bevy_app::{App, PluginGroup, PluginGroupBuilder, Plugins, SubApp};
@ -46,15 +50,15 @@ use crate::{BoxHandleFn, DefaultBotPlugins, HandleFn, JoinOpts, NoState, StartEr
pub struct Swarm {
pub ecs_lock: Arc<Mutex<World>>,
bots: Arc<Mutex<HashMap<Entity, Client>>>,
// the address is public and mutable so plugins can change it
pub resolved_address: Arc<RwLock<SocketAddr>>,
pub address: Arc<RwLock<ServerAddress>>,
pub instance_container: Arc<RwLock<InstanceContainer>>,
/// This is used internally to make the client handler function work.
bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
/// This is used internally to make the swarm handler function work.
swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
}
@ -92,7 +96,11 @@ where
/// None to have every bot connect at the same time. None is different than
/// a duration of 0, since if a duration is present the bots will wait for
/// the previous one to be ready.
pub(crate) join_delay: Option<std::time::Duration>,
pub(crate) join_delay: Option<Duration>,
/// The default reconnection delay for our bots. This will change the value
/// of the `AutoReconnectDelay` resource.
pub(crate) reconnect_after: Option<Duration>,
}
impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
/// Start creating the swarm.
@ -144,6 +152,7 @@ impl SwarmBuilder<NoState, NoSwarmState, (), ()> {
handler: None,
swarm_handler: None,
join_delay: None,
reconnect_after: Some(DEFAULT_RECONNECT_DELAY),
}
}
}
@ -157,6 +166,12 @@ where
///
/// Currently you can have up to one handler.
///
/// Note that if you're creating clients directly from the ECS using
/// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
/// the handler function won't be called for that client. This also applies
/// to [`SwarmBuilder::set_swarm_handler`]. This shouldn't be a concern for
/// most bots, though.
///
/// ```
/// # use azalea::{prelude::*, swarm::prelude::*};
/// # let swarm_builder = SwarmBuilder::new().set_swarm_handler(swarm_handle);
@ -178,6 +193,8 @@ where
/// # Ok(())
/// # }
/// ```
///
/// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
#[must_use]
pub fn set_handler<S, Fut, R>(self, handler: HandleFn<S, Fut>) -> SwarmBuilder<S, SS, R, SR>
where
@ -205,6 +222,12 @@ where
///
/// Currently you can have up to one swarm handler.
///
/// Note that if you're creating clients directly from the ECS using
/// [`StartJoinServerEvent`] and the client wasn't already in the ECS, then
/// this handler function won't be called for that client. This also applies
/// to [`SwarmBuilder::set_handler`]. This shouldn't be a concern for
/// most bots, though.
///
/// ```
/// # use azalea::{prelude::*, swarm::prelude::*};
/// # let swarm_builder = SwarmBuilder::new().set_handler(handle);
@ -227,6 +250,8 @@ where
/// Ok(())
/// }
/// ```
///
/// [`StartJoinServerEvent`]: azalea_client::join::StartJoinServerEvent
#[must_use]
pub fn set_swarm_handler<SS, Fut, SR>(
self,
@ -246,6 +271,7 @@ where
Box::pin(handler(swarm, event, state))
})),
join_delay: self.join_delay,
reconnect_after: self.reconnect_after,
}
}
}
@ -341,11 +367,25 @@ where
/// field, however, the bots will wait for the previous one to have
/// connected and *then* they'll wait the given duration.
#[must_use]
pub fn join_delay(mut self, delay: std::time::Duration) -> Self {
pub fn join_delay(mut self, delay: Duration) -> Self {
self.join_delay = Some(delay);
self
}
/// Configures the auto-reconnection behavior for our bots.
///
/// If this is `Some`, then it'll set the default reconnection delay for our
/// bots (how long they'll wait after being kicked before they try
/// rejoining). if it's `None`, then auto-reconnecting will be disabled.
///
/// If this function isn't called, then our clients will reconnect after
/// [`DEFAULT_RECONNECT_DELAY`].
#[must_use]
pub fn reconnect_after(mut self, delay: impl Into<Option<Duration>>) -> Self {
self.reconnect_after = delay.into();
self
}
/// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
/// server.
///
@ -406,7 +446,6 @@ where
let swarm = Swarm {
ecs_lock: ecs_lock.clone(),
bots: Arc::new(Mutex::new(HashMap::new())),
resolved_address: Arc::new(RwLock::new(resolved_address)),
address: Arc::new(RwLock::new(address)),
@ -422,6 +461,13 @@ where
let mut ecs = ecs_lock.lock();
ecs.insert_resource(swarm.clone());
ecs.insert_resource(self.swarm_state.clone());
if let Some(reconnect_after) = self.reconnect_after {
ecs.insert_resource(AutoReconnectDelay {
delay: reconnect_after,
});
} else {
ecs.remove_resource::<AutoReconnectDelay>();
}
ecs.run_schedule(main_schedule_label);
ecs.clear_trackers();
}
@ -556,8 +602,12 @@ pub enum SwarmEvent {
Init,
/// A bot got disconnected from the server.
///
/// You can implement an auto-reconnect by calling [`Swarm::add_with_opts`]
/// with the account and options from this event.
/// If you'd like to implement special auto-reconnect behavior beyond what's
/// built-in, you can disable that with [`SwarmBuilder::reconnect_delay`]
/// and then call [`Swarm::add_with_opts`] with the account and options
/// from this event.
///
/// [`SwarmBuilder::reconnect_delay`]: crate::swarm::SwarmBuilder::reconnect_after
Disconnect(Box<Account>, JoinOpts),
/// At least one bot received a chat message.
Chat(ChatPacket),
@ -664,15 +714,18 @@ impl Swarm {
let resolved_address = join_opts
.custom_resolved_address
.unwrap_or_else(|| *self.resolved_address.read());
let proxy = join_opts.proxy.clone();
let (tx, rx) = mpsc::unbounded_channel();
let bot = Client::start_client(StartClientOpts {
ecs_lock: self.ecs_lock.clone(),
account,
address: &address,
resolved_address: &resolved_address,
proxy: join_opts.proxy.clone(),
account: account.clone(),
connect_opts: ConnectOpts {
address,
resolved_address,
proxy,
},
event_sender: Some(tx),
})
.await?;
@ -682,31 +735,25 @@ impl Swarm {
ecs.entity_mut(bot.entity).insert(state);
}
self.bots.lock().insert(bot.entity, bot.clone());
let cloned_bots = self.bots.clone();
let cloned_bots_tx = self.bots_tx.clone();
let cloned_bot = bot.clone();
let swarm_tx = self.swarm_tx.clone();
let bots_tx = self.bots_tx.clone();
let join_opts = join_opts.clone();
tokio::spawn(Self::event_copying_task(
rx,
cloned_bots,
cloned_bots_tx,
cloned_bot,
swarm_tx,
join_opts,
rx, swarm_tx, bots_tx, cloned_bot, join_opts,
));
Ok(bot)
}
/// Copy the events from a client's receiver into bots_tx, until the bot is
/// removed from the ECS.
async fn event_copying_task(
mut rx: mpsc::UnboundedReceiver<Event>,
cloned_bots: Arc<Mutex<HashMap<Entity, Client>>>,
cloned_bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
cloned_bot: Client,
swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
bot: Client,
join_opts: JoinOpts,
) {
while let Some(event) = rx.recv().await {
@ -740,21 +787,33 @@ impl Swarm {
}
}
if let Event::Disconnect(_) = event {
debug!(
"sending SwarmEvent::Disconnect due to receiving an Event::Disconnect from client {}",
bot.entity
);
let account = bot
.get_component::<Account>()
.expect("bot is missing required Account component");
swarm_tx
.send(SwarmEvent::Disconnect(Box::new(account), join_opts.clone()))
.unwrap();
}
// we can't handle events here (since we can't copy the handler),
// they're handled above in SwarmBuilder::start
if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) {
error!("Error sending event to swarm: {e}");
if let Err(e) = bots_tx.send((Some(event), bot.clone())) {
error!(
"Error sending event to swarm, aborting event_copying_task for {}: {e}",
bot.entity
);
break;
}
}
debug!("client sender ended, removing from cloned_bots and sending SwarmEvent::Disconnect");
cloned_bots.lock().remove(&cloned_bot.entity);
let account = cloned_bot
.get_component::<Account>()
.expect("bot is missing required Account component");
swarm_tx
.send(SwarmEvent::Disconnect(Box::new(account), join_opts))
.unwrap();
debug!(
"client sender ended for {}, this won't trigger SwarmEvent::Disconnect unless the client already sent its own disconnect event",
bot.entity
);
}
/// Add a new account to the swarm, retrying if it couldn't join. This will
@ -807,6 +866,17 @@ impl Swarm {
}
}
}
/// Get an array of ECS [`Entity`]s for all [`LocalEntity`]s in our world.
/// This will include clients that were disconnected without being removed
/// from the ECS.
///
/// [`LocalEntity`]: azalea_entity::LocalEntity
pub fn client_entities(&self) -> Box<[Entity]> {
let mut ecs = self.ecs_lock.lock();
let mut query = ecs.query_filtered::<Entity, With<LocalEntity>>();
query.iter(&ecs).collect::<Box<[Entity]>>()
}
}
impl IntoIterator for Swarm {
@ -827,11 +897,12 @@ impl IntoIterator for Swarm {
/// # }
/// ```
fn into_iter(self) -> Self::IntoIter {
self.bots
.lock()
.clone()
.into_values()
.collect::<Vec<_>>()
let client_entities = self.client_entities();
client_entities
.into_iter()
.map(|entity| Client::new(entity, self.ecs_lock.clone()))
.collect::<Box<[Client]>>()
.into_iter()
}
}