1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

add AutoReconnectPlugin

This commit is contained in:
mat 2025-04-20 09:55:59 -09:30
parent 8045b4eda2
commit aa6d11bc0a
7 changed files with 227 additions and 59 deletions

View file

@ -58,7 +58,7 @@ use crate::{
events::Event,
interact::CurrentSequenceNumber,
inventory::Inventory,
join::{StartJoinCallback, StartJoinServerEvent},
join::{ConnectOpts, StartJoinCallback, StartJoinServerEvent},
local_player::{
GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList,
},
@ -114,9 +114,7 @@ pub enum JoinError {
pub struct StartClientOpts<'a> {
pub ecs_lock: Arc<Mutex<World>>,
pub account: &'a Account,
pub address: &'a ServerAddress,
pub resolved_address: &'a SocketAddr,
pub proxy: Option<Proxy>,
pub connect_opts: ConnectOpts,
pub event_sender: Option<mpsc::UnboundedSender<Event>>,
}
@ -124,7 +122,7 @@ impl<'a> StartClientOpts<'a> {
pub fn new(
account: &'a Account,
address: &'a ServerAddress,
resolved_address: &'a SocketAddr,
resolved_address: SocketAddr,
event_sender: Option<mpsc::UnboundedSender<Event>>,
) -> StartClientOpts<'a> {
let mut app = App::new();
@ -135,15 +133,17 @@ impl<'a> StartClientOpts<'a> {
Self {
ecs_lock,
account,
address,
resolved_address,
proxy: None,
connect_opts: ConnectOpts {
address: address.clone(),
resolved_address,
proxy: None,
},
event_sender,
}
}
pub fn proxy(mut self, proxy: Proxy) -> Self {
self.proxy = Some(proxy);
self.connect_opts.proxy = Some(proxy);
self
}
}
@ -193,7 +193,7 @@ impl Client {
let client = Self::start_client(StartClientOpts::new(
account,
&address,
&resolved_address,
resolved_address,
Some(tx),
))
.await?;
@ -210,7 +210,7 @@ impl Client {
let (tx, rx) = mpsc::unbounded_channel();
let client = Self::start_client(
StartClientOpts::new(account, &address, &resolved_address, Some(tx)).proxy(proxy),
StartClientOpts::new(account, &address, resolved_address, Some(tx)).proxy(proxy),
)
.await?;
Ok((client, rx))
@ -222,9 +222,7 @@ impl Client {
StartClientOpts {
ecs_lock,
account,
address,
resolved_address,
proxy,
connect_opts,
event_sender,
}: StartClientOpts<'_>,
) -> Result<Self, JoinError> {
@ -237,9 +235,7 @@ impl Client {
let mut ecs = ecs_lock.lock();
ecs.send_event(StartJoinServerEvent {
account: account.clone(),
address: address.clone(),
resolved_address: *resolved_address,
proxy,
connect_opts,
event_sender: event_sender.clone(),
start_join_callback_tx: Some(StartJoinCallback(start_join_callback_tx)),
});

View file

@ -0,0 +1,111 @@
//! Auto-reconnect to the server when the client is kicked.
//!
//! This is enabled by default.
use std::time::{Duration, Instant};
use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use super::{
disconnect::DisconnectEvent,
events::LocalPlayerEvents,
join::{ConnectOpts, StartJoinServerEvent},
};
use crate::Account;
pub struct AutoReconnectPlugin;
impl Plugin for AutoReconnectPlugin {
fn build(&self, app: &mut App) {
app.insert_resource(AutoReconnectDelay::new(Duration::from_secs(5)))
.add_systems(
Update,
(start_rejoin_on_disconnect, rejoin_after_delay)
.chain()
.before(super::join::handle_start_join_server_event),
);
}
}
pub fn start_rejoin_on_disconnect(
mut commands: Commands,
mut events: EventReader<DisconnectEvent>,
auto_reconnect_delay_res: Option<Res<AutoReconnectDelay>>,
auto_reconnect_delay_query: Query<&AutoReconnectDelay>,
) {
for event in events.read() {
let delay = if let Ok(c) = auto_reconnect_delay_query.get(event.entity) {
c.delay
} else if let Some(r) = &auto_reconnect_delay_res {
r.delay
} else {
// no auto reconnect
continue;
};
let reconnect_after = Instant::now() + delay;
commands
.entity(event.entity)
.insert(InternalReconnectAfter {
instant: reconnect_after,
});
}
}
pub fn rejoin_after_delay(
mut commands: Commands,
mut join_events: EventWriter<StartJoinServerEvent>,
query: Query<(
Entity,
&InternalReconnectAfter,
&Account,
&ConnectOpts,
Option<&LocalPlayerEvents>,
)>,
) {
for (entity, reconnect_after, account, connect_opts, local_player_events) in query.iter() {
if Instant::now() >= reconnect_after.instant {
// don't keep trying to reconnect
commands.entity(entity).remove::<InternalReconnectAfter>();
// our Entity will be reused since the account has the same uuid
join_events.send(StartJoinServerEvent {
account: account.clone(),
connect_opts: connect_opts.clone(),
// not actually necessary since we're reusing the same entity and LocalPlayerEvents
// isn't removed, but this is more readable and just in case it's changed in the
// future
event_sender: local_player_events.map(|e| e.0.clone()),
start_join_callback_tx: None,
});
}
}
}
/// A resource *and* component that indicates how long to wait before
/// reconnecting when we're kicked.
///
/// Initially, it's a resource in the ECS set to 5 seconds. You can modify
/// the resource to update the global reconnect delay, or insert it as a
/// component to set the individual delay for a single client.
///
/// You can also remove this resource from the ECS to disable the default
/// auto-reconnecting behavior. Inserting the resource/component again will not
/// make clients that were already disconnected automatically reconnect.
#[derive(Resource, Component, Debug, Clone)]
pub struct AutoReconnectDelay {
pub delay: Duration,
}
impl AutoReconnectDelay {
pub fn new(delay: Duration) -> Self {
Self { delay }
}
}
/// This is inserted when we're disconnected and indicates when we'll reconnect.
///
/// This is set based on [`AutoReconnectDelay`].
#[derive(Component, Debug, Clone)]
pub struct InternalReconnectAfter {
pub instant: Instant,
}

View file

@ -15,10 +15,7 @@ use bevy_ecs::{
use derive_more::Deref;
use tracing::trace;
use crate::{
InstanceHolder, client::JoinedClientBundle, connection::RawConnection,
events::LocalPlayerEvents,
};
use crate::{InstanceHolder, client::JoinedClientBundle, connection::RawConnection};
pub struct DisconnectPlugin;
impl Plugin for DisconnectPlugin {
@ -35,7 +32,16 @@ impl Plugin for DisconnectPlugin {
}
}
/// An event sent when a client is getting disconnected.
/// An event sent when a client got disconnected from the server.
///
/// If the client was kicked with a reason, that reason will be present in the
/// [`reason`](DisconnectEvent::reason) field.
///
/// This event won't be sent if creating the initial connection to the server
/// failed, for that see [`ConnectionFailedEvent`].
///
/// [`ConnectionFailedEvent`]: crate::join::ConnectionFailedEvent
#[derive(Event)]
pub struct DisconnectEvent {
pub entity: Entity,
@ -59,9 +65,7 @@ pub fn remove_components_from_disconnected_players(
.remove::<PlayerMetadataBundle>()
.remove::<InLoadedChunk>()
// this makes it close the tcp connection
.remove::<RawConnection>()
// swarm detects when this tx gets dropped to fire SwarmEvent::Disconnect
.remove::<LocalPlayerEvents>();
.remove::<RawConnection>();
// note that we don't remove the client from the ECS, so if they decide
// to reconnect they'll keep their state

View file

@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc};
use std::{io, net::SocketAddr, sync::Arc};
use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
use azalea_protocol::{
@ -29,24 +29,53 @@ use crate::{
pub struct JoinPlugin;
impl Plugin for JoinPlugin {
fn build(&self, app: &mut App) {
app.add_event::<StartJoinServerEvent>().add_systems(
Update,
(handle_start_join_server_event, poll_create_connection_task),
);
app.add_event::<StartJoinServerEvent>()
.add_event::<ConnectionFailedEvent>()
.add_systems(
Update,
(
handle_start_join_server_event,
poll_create_connection_task,
handle_connection_failed_events,
)
.chain(),
);
}
}
/// An event to make a client join the server and be added to our swarm.
#[derive(Event, Debug)]
pub struct StartJoinServerEvent {
pub account: Account,
pub address: ServerAddress,
pub resolved_address: SocketAddr,
pub proxy: Option<Proxy>,
pub connect_opts: ConnectOpts,
pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
pub start_join_callback_tx: Option<StartJoinCallback>,
}
/// Options for how the connection to the server will be made. These are
/// persisted on reconnects.
///
/// This is inserted as a component on clients to make auto-reconnecting work.
#[derive(Debug, Clone, Component)]
pub struct ConnectOpts {
pub address: ServerAddress,
pub resolved_address: SocketAddr,
pub proxy: Option<Proxy>,
}
/// An event that's sent when creating the TCP connection and sending the first
/// packet fails.
///
/// This isn't sent if we're kicked later, see [`DisconnectEvent`].
///
/// [`DisconnectEvent`]: crate::disconnect::DisconnectEvent
#[derive(Event)]
pub struct ConnectionFailedEvent {
pub entity: Entity,
pub error: ConnectionError,
}
// this is mpsc instead of oneshot so it can be cloned (since it's sent in an
// event)
#[derive(Component, Debug, Clone)]
@ -77,6 +106,8 @@ pub fn handle_start_join_server_event(
// localentity is always present for our clients, even if we're not actually logged
// in
LocalEntity,
// ConnectOpts is inserted as a component here
event.connect_opts.clone(),
// we don't insert InLoginState until we actually create the connection. note that
// there's no InHandshakeState component since we switch off of the handshake state
// immediately when the connection is created
@ -92,11 +123,9 @@ pub fn handle_start_join_server_event(
}
let task_pool = IoTaskPool::get();
let resolved_addr = event.resolved_address;
let address = event.address.clone();
let proxy = event.proxy.clone();
let connect_opts = event.connect_opts.clone();
let task = task_pool.spawn(async_compat::Compat::new(
create_conn_and_send_intention_packet(resolved_addr, address, proxy),
create_conn_and_send_intention_packet(connect_opts),
));
entity_mut.insert(CreateConnectionTask(task));
@ -104,20 +133,18 @@ pub fn handle_start_join_server_event(
}
async fn create_conn_and_send_intention_packet(
resolved_addr: SocketAddr,
address: ServerAddress,
proxy: Option<Proxy>,
opts: ConnectOpts,
) -> Result<LoginConn, ConnectionError> {
let mut conn = if let Some(proxy) = proxy {
Connection::new_with_proxy(&resolved_addr, proxy).await?
let mut conn = if let Some(proxy) = opts.proxy {
Connection::new_with_proxy(&opts.resolved_address, proxy).await?
} else {
Connection::new(&resolved_addr).await?
Connection::new(&opts.resolved_address).await?
};
conn.write(ServerboundIntention {
protocol_version: PROTOCOL_VERSION,
hostname: address.host.clone(),
port: address.port,
hostname: opts.address.host.clone(),
port: opts.address.port,
intention: ClientIntention::Login,
})
.await?;
@ -140,6 +167,7 @@ pub fn poll_create_connection_task(
&Account,
Option<&StartJoinCallback>,
)>,
mut connection_failed_events: EventWriter<ConnectionFailedEvent>,
) {
for (entity, mut task, account, mut start_join_callback) in query.iter_mut() {
if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
@ -147,11 +175,9 @@ pub fn poll_create_connection_task(
entity_mut.remove::<CreateConnectionTask>();
let conn = match poll_res {
Ok(conn) => conn,
Err(err) => {
warn!("failed to create connection: {err}");
if let Some(cb) = start_join_callback.take() {
let _ = cb.0.send(Err(err.into()));
}
Err(error) => {
warn!("failed to create connection: {error}");
connection_failed_events.send(ConnectionFailedEvent { entity, error });
return;
}
};
@ -196,3 +222,22 @@ pub fn poll_create_connection_task(
}
}
}
pub fn handle_connection_failed_events(
mut events: EventReader<ConnectionFailedEvent>,
query: Query<&StartJoinCallback>,
) {
for event in events.read() {
let Ok(start_join_callback) = query.get(event.entity) else {
// the StartJoinCallback isn't required to be present, so this is fine
continue;
};
// io::Error isn't clonable, so we create a new one based on the `kind` and
// `to_string`,
let ConnectionError::Io(err) = &event.error;
let cloned_err = ConnectionError::Io(io::Error::new(err.kind(), err.to_string()));
let _ = start_join_callback.0.send(Err(cloned_err.into()));
}
}

View file

@ -1,6 +1,7 @@
use bevy_app::{PluginGroup, PluginGroupBuilder};
pub mod attack;
pub mod auto_reconnect;
pub mod brand;
pub mod chat;
pub mod chunks;
@ -51,7 +52,8 @@ impl PluginGroup for DefaultPlugins {
.add(pong::PongPlugin)
.add(connection::ConnectionPlugin)
.add(login::LoginPlugin)
.add(join::JoinPlugin);
.add(join::JoinPlugin)
.add(auto_reconnect::AutoReconnectPlugin);
#[cfg(feature = "log")]
{
group = group.add(bevy_log::LogPlugin::default());

View file

@ -196,10 +196,6 @@ async fn swarm_handle(swarm: Swarm, event: SwarmEvent, _state: SwarmState) -> an
match &event {
SwarmEvent::Disconnect(account, join_opts) => {
println!("bot got kicked! {}", account.username);
tokio::time::sleep(Duration::from_secs(5)).await;
swarm
.add_and_retry_forever_with_opts(account, State::default(), join_opts)
.await;
}
SwarmEvent::Chat(chat) => {
if chat.message().to_string() == "The particle was not visible for anybody" {

View file

@ -19,7 +19,7 @@ use std::{
use azalea_client::{
Account, Client, DefaultPlugins, Event, JoinError, StartClientOpts, chat::ChatPacket,
start_ecs_runner,
join::ConnectOpts, start_ecs_runner,
};
use azalea_protocol::{ServerAddress, resolver};
use azalea_world::InstanceContainer;
@ -654,15 +654,18 @@ impl Swarm {
let resolved_address = join_opts
.custom_resolved_address
.unwrap_or_else(|| *self.resolved_address.read());
let proxy = join_opts.proxy.clone();
let (tx, rx) = mpsc::unbounded_channel();
let bot = Client::start_client(StartClientOpts {
ecs_lock: self.ecs_lock.clone(),
account,
address: &address,
resolved_address: &resolved_address,
proxy: join_opts.proxy.clone(),
connect_opts: ConnectOpts {
address,
resolved_address,
proxy,
},
event_sender: Some(tx),
})
.await?;
@ -730,10 +733,21 @@ impl Swarm {
}
}
if let Event::Disconnect(_) = event {
//
}
// we can't handle events here (since we can't copy the handler),
// they're handled above in SwarmBuilder::start
if let Err(e) = cloned_bots_tx.send((Some(event), cloned_bot.clone())) {
error!("Error sending event to swarm: {e}");
let account = cloned_bot
.get_component::<Account>()
.expect("bot is missing required Account component");
swarm_tx
.send(SwarmEvent::Disconnect(Box::new(account), join_opts.clone()))
.unwrap();
}
}
debug!("client sender ended, removing from cloned_bots and sending SwarmEvent::Disconnect");