1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

simplify some join logic so the Entity is returned even on connection error

This commit is contained in:
mat 2025-06-02 04:12:07 -01:00
parent 3d121722d7
commit 8da179b221
4 changed files with 50 additions and 96 deletions

View file

@ -1,7 +1,7 @@
use std::{
collections::HashMap,
fmt::Debug,
io, mem,
mem,
net::SocketAddr,
sync::Arc,
thread,
@ -9,7 +9,6 @@ use std::{
};
use azalea_auth::game_profile::GameProfile;
use azalea_chat::FormattedText;
use azalea_core::{
data_registry::ResolvableDataRegistry, position::Vec3, resource_location::ResourceLocation,
tick::GameTick,
@ -22,9 +21,9 @@ use azalea_entity::{
use azalea_protocol::{
ServerAddress,
common::client_information::ClientInformation,
connect::{ConnectionError, Proxy},
connect::Proxy,
packets::{
self, Packet,
Packet,
game::{self, ServerboundGamePacket},
},
resolver,
@ -54,7 +53,7 @@ use crate::{
events::Event,
interact::CurrentSequenceNumber,
inventory::Inventory,
join::{ConnectOpts, StartJoinCallback, StartJoinServerEvent},
join::{ConnectOpts, StartJoinServerEvent},
local_player::{
GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList,
},
@ -89,22 +88,8 @@ pub struct Client {
pub enum JoinError {
#[error("{0}")]
Resolver(#[from] resolver::ResolverError),
#[error("{0}")]
Connection(#[from] ConnectionError),
#[error("{0}")]
ReadPacket(#[from] Box<azalea_protocol::read::ReadPacketError>),
#[error("{0}")]
Io(#[from] io::Error),
#[error("Failed to encrypt the challenge from the server for {0:?}")]
EncryptionError(packets::login::ClientboundHello),
#[error("{0}")]
SessionServer(#[from] azalea_auth::sessionserver::ClientSessionServerError),
#[error("The given address could not be parsed into a ServerAddress")]
InvalidAddress,
#[error("Couldn't refresh access token: {0}")]
Auth(#[from] azalea_auth::AuthError),
#[error("Disconnected: {reason}")]
Disconnect { reason: FormattedText },
}
pub struct StartClientOpts {
@ -193,7 +178,7 @@ impl Client {
resolved_address,
Some(tx),
))
.await?;
.await;
Ok((client, rx))
}
@ -209,7 +194,7 @@ impl Client {
let client = Self::start_client(
StartClientOpts::new(account, address, resolved_address, Some(tx)).proxy(proxy),
)
.await?;
.await;
Ok((client, rx))
}
@ -222,25 +207,24 @@ impl Client {
connect_opts,
event_sender,
}: StartClientOpts,
) -> Result<Self, JoinError> {
) -> Self {
// send a StartJoinServerEvent
let (start_join_callback_tx, mut start_join_callback_rx) =
mpsc::unbounded_channel::<Result<Entity, JoinError>>();
mpsc::unbounded_channel::<Entity>();
ecs_lock.lock().send_event(StartJoinServerEvent {
account,
connect_opts,
event_sender,
start_join_callback_tx: Some(StartJoinCallback(start_join_callback_tx)),
start_join_callback_tx: Some(start_join_callback_tx),
});
let entity = start_join_callback_rx.recv().await.expect(
"StartJoinCallback should not be dropped before sending a message, this is a bug in Azalea",
)?;
"start_join_callback should not be dropped before sending a message, this is a bug in Azalea",
);
let client = Client::new(entity, ecs_lock.clone());
Ok(client)
Client::new(entity, ecs_lock)
}
/// Write a packet directly to the server.

View file

@ -1,4 +1,4 @@
use std::{io, net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc};
use azalea_entity::{LocalEntity, indexing::EntityUuidIndex};
use azalea_protocol::{
@ -20,7 +20,7 @@ use tracing::{debug, warn};
use super::events::LocalPlayerEvents;
use crate::{
Account, JoinError, LocalPlayerBundle,
Account, LocalPlayerBundle,
connection::RawConnection,
packet::login::{InLoginState, SendLoginPacketEvent},
};
@ -36,7 +36,6 @@ impl Plugin for JoinPlugin {
(
handle_start_join_server_event.before(super::login::poll_auth_task),
poll_create_connection_task,
handle_connection_failed_events,
)
.chain(),
);
@ -53,7 +52,8 @@ pub struct StartJoinServerEvent {
pub connect_opts: ConnectOpts,
pub event_sender: Option<mpsc::UnboundedSender<crate::Event>>,
pub start_join_callback_tx: Option<StartJoinCallback>,
// this is mpsc instead of oneshot so it can be cloned (since it's sent in an event)
pub start_join_callback_tx: Option<mpsc::UnboundedSender<Entity>>,
}
/// Options for how the connection to the server will be made. These are
@ -79,11 +79,6 @@ pub struct ConnectionFailedEvent {
pub error: ConnectionError,
}
// this is mpsc instead of oneshot so it can be cloned (since it's sent in an
// event)
#[derive(Component, Debug, Clone)]
pub struct StartJoinCallback(pub mpsc::UnboundedSender<Result<Entity, JoinError>>);
pub fn handle_start_join_server_event(
mut commands: Commands,
mut events: EventReader<StartJoinServerEvent>,
@ -103,7 +98,7 @@ pub fn handle_start_join_server_event(
warn!(
"Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event but replying with Ok."
);
let _ = start_join_callback_tx.0.send(Ok(entity));
let _ = start_join_callback_tx.send(entity);
} else {
warn!(
"Received StartJoinServerEvent for {entity:?} but it's already connected. Ignoring the event."
@ -121,6 +116,10 @@ pub fn handle_start_join_server_event(
entity
};
if let Some(start_join_callback) = &event.start_join_callback_tx {
let _ = start_join_callback.send(entity);
}
let mut entity_mut = commands.entity(entity);
entity_mut.insert((
@ -141,9 +140,6 @@ pub fn handle_start_join_server_event(
// handle receiving packets
entity_mut.insert(LocalPlayerEvents(event_sender.clone()));
}
if let Some(start_join_callback) = &event.start_join_callback_tx {
entity_mut.insert(start_join_callback.clone());
}
let task_pool = IoTaskPool::get();
let connect_opts = event.connect_opts.clone();
@ -184,15 +180,10 @@ pub struct CreateConnectionTask(pub Task<Result<LoginConn, ConnectionError>>);
pub fn poll_create_connection_task(
mut commands: Commands,
mut query: Query<(
Entity,
&mut CreateConnectionTask,
&Account,
Option<&StartJoinCallback>,
)>,
mut query: Query<(Entity, &mut CreateConnectionTask, &Account)>,
mut connection_failed_events: EventWriter<ConnectionFailedEvent>,
) {
for (entity, mut task, account, mut start_join_callback) in query.iter_mut() {
for (entity, mut task, account) in query.iter_mut() {
if let Some(poll_res) = future::block_on(future::poll_once(&mut task.0)) {
let mut entity_mut = commands.entity(entity);
entity_mut.remove::<CreateConnectionTask>();
@ -238,29 +229,6 @@ pub fn poll_create_connection_task(
profile_id: account.uuid_or_offline(),
},
));
if let Some(cb) = start_join_callback.take() {
let _ = cb.0.send(Ok(entity));
}
}
}
}
pub fn handle_connection_failed_events(
mut events: EventReader<ConnectionFailedEvent>,
query: Query<&StartJoinCallback>,
) {
for event in events.read() {
let Ok(start_join_callback) = query.get(event.entity) else {
// the StartJoinCallback isn't required to be present, so this is fine
continue;
};
// io::Error isn't clonable, so we create a new one based on the `kind` and
// `to_string`,
let ConnectionError::Io(err) = &event.error;
let cloned_err = ConnectionError::Io(io::Error::new(err.kind(), err.to_string()));
let _ = start_join_callback.0.send(Err(cloned_err.into()));
}
}

View file

@ -5,13 +5,14 @@ use azalea_protocol::packets::login::{
use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
use thiserror::Error;
use tracing::{debug, error, trace};
use super::{
connection::RawConnection,
packet::login::{ReceiveCustomQueryEvent, ReceiveHelloEvent, SendLoginPacketEvent},
};
use crate::{Account, JoinError};
use crate::Account;
/// Some systems that run during the `login` state.
pub struct LoginPlugin;
@ -73,14 +74,24 @@ pub fn poll_auth_task(
type PrivateKey = [u8; 16];
#[derive(Component)]
pub struct AuthTask(Task<Result<(ServerboundKey, PrivateKey), JoinError>>);
pub struct AuthTask(Task<Result<(ServerboundKey, PrivateKey), AuthWithAccountError>>);
#[derive(Debug, Error)]
pub enum AuthWithAccountError {
#[error("Failed to encrypt the challenge from the server for {0:?}")]
Encryption(ClientboundHello),
#[error("{0}")]
SessionServer(#[from] ClientSessionServerError),
#[error("Couldn't refresh access token: {0}")]
Auth(#[from] azalea_auth::AuthError),
}
pub async fn auth_with_account(
account: Account,
packet: ClientboundHello,
) -> Result<(ServerboundKey, PrivateKey), JoinError> {
) -> Result<(ServerboundKey, PrivateKey), AuthWithAccountError> {
let Ok(encrypt_res) = azalea_crypto::encrypt(&packet.public_key, &packet.challenge) else {
return Err(JoinError::EncryptionError(packet));
return Err(AuthWithAccountError::Encryption(packet));
};
let key_packet = ServerboundKey {
key_bytes: encrypt_res.encrypted_public_key,

View file

@ -687,7 +687,7 @@ impl Swarm {
///
/// # Errors
///
/// Returns an `Err` if the bot could not do a handshake successfully.
/// Returns an error if the server's address could not be resolved.
pub async fn add<S: Component + Clone>(
&self,
account: &Account,
@ -702,7 +702,7 @@ impl Swarm {
///
/// # Errors
///
/// Returns an `Err` if the bot could not do a handshake successfully.
/// Returns an error if the server's address could not be resolved.
pub async fn add_with_opts<S: Component + Clone>(
&self,
account: &Account,
@ -725,7 +725,7 @@ impl Swarm {
let (tx, rx) = mpsc::unbounded_channel();
let bot = Client::start_client(StartClientOpts {
let client = Client::start_client(StartClientOpts {
ecs_lock: self.ecs_lock.clone(),
account: account.clone(),
connect_opts: ConnectOpts {
@ -735,14 +735,14 @@ impl Swarm {
},
event_sender: Some(tx),
})
.await?;
.await;
// add the state to the client
{
let mut ecs = self.ecs_lock.lock();
ecs.entity_mut(bot.entity).insert(state);
ecs.entity_mut(client.entity).insert(state);
}
let cloned_bot = bot.clone();
let cloned_bot = client.clone();
let swarm_tx = self.swarm_tx.clone();
let bots_tx = self.bots_tx.clone();
@ -751,7 +751,7 @@ impl Swarm {
rx, swarm_tx, bots_tx, cloned_bot, join_opts,
));
Ok(bot)
Ok(client)
}
/// Copy the events from a client's receiver into bots_tx, until the bot is
@ -767,7 +767,9 @@ impl Swarm {
if rx.len() > 1_000 {
static WARNED_1_000: AtomicBool = AtomicBool::new(false);
if !WARNED_1_000.swap(true, atomic::Ordering::Relaxed) {
warn!("the client's Event channel has more than 1000 items!")
warn!(
"the client's Event channel has more than 1,000 items! this is probably fine but if you're concerned about it, maybe consider disabling the packet-event feature in azalea to reduce the number of events?"
)
}
if rx.len() > 10_000 {
@ -786,7 +788,7 @@ impl Swarm {
static WARNED_1_000_000: AtomicBool = AtomicBool::new(false);
if !WARNED_1_000_000.swap(true, atomic::Ordering::Relaxed) {
warn!(
"the client's Event channel has more than 1,000,000 items!!!! i sincerely hope no one ever sees this warning"
"the client's Event channel has more than 1,000,000 items!!!! your code is almost certainly leaking memory"
)
}
}
@ -860,18 +862,7 @@ impl Swarm {
.min(Duration::from_secs(15));
let username = account.username.clone();
match &e {
JoinError::Disconnect { reason } => {
error!(
"Error joining as {username}, server says: \"{reason}\". Waiting {delay:?} and trying again."
);
}
_ => {
error!(
"Error joining as {username}: {e}. Waiting {delay:?} and trying again."
);
}
}
error!("Error joining as {username}: {e}. Waiting {delay:?} and trying again.");
sleep(delay).await;
}