1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

Move login state to the ECS (#213)

* use packet handlers code for login custom_query

* initial broken implementation for ecs-only login

* fixes

* run Update schedule 60 times per second and delete code related to run_schedule_sender

* fix tests

* fix online-mode

* reply to query packets in a separate system and make it easier for plugins to disable individual replies

* remove unused imports
This commit is contained in:
mat 2025-04-17 16:16:51 -05:00 committed by GitHub
parent 1989f4ec97
commit 3f60bdadac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
38 changed files with 1322 additions and 1242 deletions

14
Cargo.lock generated
View file

@ -160,6 +160,19 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "async-compat"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bab94bde396a3f7b4962e396fdad640e241ed797d4d8d77fc8c237d14c58fc0"
dependencies = [
"futures-core",
"futures-io",
"once_cell",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "async-executor" name = "async-executor"
version = "1.13.1" version = "1.13.1"
@ -336,6 +349,7 @@ name = "azalea-client"
version = "0.12.0+mc1.21.5" version = "0.12.0+mc1.21.5"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-compat",
"azalea-auth", "azalea-auth",
"azalea-block", "azalea-block",
"azalea-buf", "azalea-buf",

View file

@ -81,6 +81,7 @@ indexmap = "2.9.0"
paste = "1.0.15" paste = "1.0.15"
compact_str = "0.9.0" compact_str = "0.9.0"
crc32fast = "1.4.2" crc32fast = "1.4.2"
async-compat = "0.2.4"
# --- Profile Settings --- # --- Profile Settings ---

View file

@ -12,7 +12,10 @@ azalea-crypto = { path = "../azalea-crypto", version = "0.12.0" }
base64.workspace = true base64.workspace = true
chrono = { workspace = true, features = ["serde"] } chrono = { workspace = true, features = ["serde"] }
md-5.workspace = true md-5.workspace = true
reqwest = { workspace = true, features = ["json", "rustls-tls"] } reqwest = { workspace = true, default-features = false, features = [
"json",
"rustls-tls",
] }
rsa.workspace = true rsa.workspace = true
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true serde_json.workspace = true

View file

@ -10,6 +10,7 @@ pub struct GameProfile {
pub uuid: Uuid, pub uuid: Uuid,
/// The username of the player. /// The username of the player.
pub name: String, pub name: String,
// this is an arc to make GameProfile cheaper to clone when the properties are big
pub properties: Arc<HashMap<String, ProfilePropertyValue>>, pub properties: Arc<HashMap<String, ProfilePropertyValue>>,
} }

View file

@ -39,7 +39,7 @@ impl AzaleaWriteVar for i32 {
let mut buffer = [0]; let mut buffer = [0];
let mut value = *self; let mut value = *self;
if value == 0 { if value == 0 {
buf.write_all(&buffer).unwrap(); buf.write_all(&buffer)?;
} }
while value != 0 { while value != 0 {
buffer[0] = (value & 0b0111_1111) as u8; buffer[0] = (value & 0b0111_1111) as u8;

View file

@ -7,6 +7,7 @@ license.workspace = true
repository.workspace = true repository.workspace = true
[dependencies] [dependencies]
async-compat.workspace = true
azalea-auth = { path = "../azalea-auth", version = "0.12.0" } azalea-auth = { path = "../azalea-auth", version = "0.12.0" }
azalea-block = { path = "../azalea-block", version = "0.12.0" } azalea-block = { path = "../azalea-block", version = "0.12.0" }
azalea-buf = { path = "../azalea-buf", version = "0.12.0" } azalea-buf = { path = "../azalea-buf", version = "0.12.0" }

View file

@ -15,7 +15,7 @@ use uuid::Uuid;
/// To join a server using this account, use [`Client::join`] or /// To join a server using this account, use [`Client::join`] or
/// [`azalea::ClientBuilder`]. /// [`azalea::ClientBuilder`].
/// ///
/// Note that this is also a component that our clients have. /// This is also an ECS component that is present on our client entities.
/// ///
/// # Examples /// # Examples
/// ///

View file

@ -8,84 +8,69 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use azalea_auth::{game_profile::GameProfile, sessionserver::ClientSessionServerError}; use azalea_auth::game_profile::GameProfile;
use azalea_chat::FormattedText; use azalea_chat::FormattedText;
use azalea_core::{ use azalea_core::{
data_registry::ResolvableDataRegistry, position::Vec3, resource_location::ResourceLocation, data_registry::ResolvableDataRegistry, position::Vec3, resource_location::ResourceLocation,
tick::GameTick, tick::GameTick,
}; };
use azalea_entity::{ use azalea_entity::{
EntityPlugin, EntityUpdateSet, EyeHeight, LocalEntity, Position, EntityUpdateSet, EyeHeight, LocalEntity, Position,
indexing::{EntityIdIndex, EntityUuidIndex}, indexing::{EntityIdIndex, EntityUuidIndex},
metadata::Health, metadata::Health,
}; };
use azalea_physics::PhysicsPlugin;
use azalea_protocol::{ use azalea_protocol::{
ServerAddress, ServerAddress,
common::client_information::ClientInformation, common::client_information::ClientInformation,
connect::{Connection, ConnectionError, Proxy}, connect::{Connection, ConnectionError, Proxy},
packets::{ packets::{
self, ClientIntention, ConnectionProtocol, PROTOCOL_VERSION, Packet, self, ClientIntention, ConnectionProtocol, PROTOCOL_VERSION, Packet,
config::{ClientboundConfigPacket, ServerboundConfigPacket}, game::{self, ServerboundGamePacket},
game::ServerboundGamePacket, handshake::s_intention::ServerboundIntention,
handshake::{ login::s_hello::ServerboundHello,
ClientboundHandshakePacket, ServerboundHandshakePacket,
s_intention::ServerboundIntention,
},
login::{
ClientboundLoginPacket, s_hello::ServerboundHello, s_key::ServerboundKey,
s_login_acknowledged::ServerboundLoginAcknowledged,
},
}, },
resolver, resolver,
}; };
use azalea_world::{Instance, InstanceContainer, InstanceName, MinecraftEntityId, PartialInstance}; use azalea_world::{Instance, InstanceContainer, InstanceName, MinecraftEntityId, PartialInstance};
use bevy_app::{App, Plugin, PluginGroup, PluginGroupBuilder, PluginsState, Update}; use bevy_app::{App, Plugin, PluginsState, Update};
use bevy_ecs::{ use bevy_ecs::{
bundle::Bundle, bundle::Bundle,
component::Component, component::Component,
entity::Entity, entity::Entity,
schedule::{InternedScheduleLabel, IntoSystemConfigs, LogLevel, ScheduleBuildSettings}, schedule::{InternedScheduleLabel, IntoSystemConfigs, LogLevel, ScheduleBuildSettings},
system::Resource, system::{Commands, Resource},
world::World, world::World,
}; };
use bevy_time::TimePlugin;
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use simdnbt::owned::NbtCompound; use simdnbt::owned::NbtCompound;
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
sync::mpsc::{self, error::TrySendError}, sync::mpsc::{self},
time, time,
}; };
use tracing::{debug, error, info}; use tracing::{debug, error, info, warn};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
Account, PlayerInfo, Account, DefaultPlugins, PlayerInfo,
attack::{self, AttackPlugin}, attack::{self},
brand::BrandPlugin, chunks::ChunkBatchInfo,
chat::ChatPlugin, connection::RawConnection,
chunks::{ChunkBatchInfo, ChunksPlugin}, disconnect::DisconnectEvent,
disconnect::{DisconnectEvent, DisconnectPlugin}, events::{Event, LocalPlayerEvents},
events::{Event, EventsPlugin, LocalPlayerEvents}, interact::CurrentSequenceNumber,
interact::{CurrentSequenceNumber, InteractPlugin}, inventory::Inventory,
inventory::{Inventory, InventoryPlugin},
local_player::{ local_player::{
GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList, GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList,
}, },
mining::{self, MiningPlugin}, mining::{self},
movement::{LastSentLookDirection, MovementPlugin, PhysicsState}, movement::{LastSentLookDirection, PhysicsState},
packet::{ packet::{
PacketPlugin, as_system,
login::{self, InLoginState, LoginSendPacketQueue}, game::SendPacketEvent,
login::{InLoginState, SendLoginPacketEvent},
}, },
player::retroactively_add_game_profile_component, player::retroactively_add_game_profile_component,
pong::PongPlugin,
raw_connection::RawConnection,
respawn::RespawnPlugin,
task_pool::TaskPoolPlugin,
tick_broadcast::TickBroadcastPlugin,
tick_end::TickEndPlugin,
}; };
/// `Client` has the things that a user interacting with the library will want. /// `Client` has the things that a user interacting with the library will want.
@ -99,15 +84,6 @@ use crate::{
/// [`azalea::ClientBuilder`]: https://docs.rs/azalea/latest/azalea/struct.ClientBuilder.html /// [`azalea::ClientBuilder`]: https://docs.rs/azalea/latest/azalea/struct.ClientBuilder.html
#[derive(Clone)] #[derive(Clone)]
pub struct Client { pub struct Client {
/// The [`GameProfile`] for our client. This contains your username, UUID,
/// and skin data.
///
/// This is immutable; the server cannot change it. To get the username and
/// skin the server chose for you, get your player from the [`TabList`]
/// component.
///
/// This as also available from the ECS as [`GameProfileComponent`].
pub profile: GameProfile,
/// The entity for this client in the ECS. /// The entity for this client in the ECS.
pub entity: Entity, pub entity: Entity,
@ -115,9 +91,6 @@ pub struct Client {
/// directly. Note that if you're using a shared world (i.e. a swarm), this /// directly. Note that if you're using a shared world (i.e. a swarm), this
/// will contain all entities in all worlds. /// will contain all entities in all worlds.
pub ecs: Arc<Mutex<World>>, pub ecs: Arc<Mutex<World>>,
/// Use this to force the client to run the schedule outside of a tick.
pub run_schedule_sender: mpsc::Sender<()>,
} }
/// An error that happened while joining the server. /// An error that happened while joining the server.
@ -131,6 +104,8 @@ pub enum JoinError {
ReadPacket(#[from] Box<azalea_protocol::read::ReadPacketError>), ReadPacket(#[from] Box<azalea_protocol::read::ReadPacketError>),
#[error("{0}")] #[error("{0}")]
Io(#[from] io::Error), Io(#[from] io::Error),
#[error("Failed to encrypt the challenge from the server for {0:?}")]
EncryptionError(packets::login::ClientboundHello),
#[error("{0}")] #[error("{0}")]
SessionServer(#[from] azalea_auth::sessionserver::ClientSessionServerError), SessionServer(#[from] azalea_auth::sessionserver::ClientSessionServerError),
#[error("The given address could not be parsed into a ServerAddress")] #[error("The given address could not be parsed into a ServerAddress")]
@ -147,7 +122,6 @@ pub struct StartClientOpts<'a> {
pub address: &'a ServerAddress, pub address: &'a ServerAddress,
pub resolved_address: &'a SocketAddr, pub resolved_address: &'a SocketAddr,
pub proxy: Option<Proxy>, pub proxy: Option<Proxy>,
pub run_schedule_sender: mpsc::Sender<()>,
pub event_sender: Option<mpsc::UnboundedSender<Event>>, pub event_sender: Option<mpsc::UnboundedSender<Event>>,
} }
@ -158,13 +132,10 @@ impl<'a> StartClientOpts<'a> {
resolved_address: &'a SocketAddr, resolved_address: &'a SocketAddr,
event_sender: Option<mpsc::UnboundedSender<Event>>, event_sender: Option<mpsc::UnboundedSender<Event>>,
) -> StartClientOpts<'a> { ) -> StartClientOpts<'a> {
// An event that causes the schedule to run. This is only used internally.
let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
let mut app = App::new(); let mut app = App::new();
app.add_plugins(DefaultPlugins); app.add_plugins(DefaultPlugins);
let ecs_lock = start_ecs_runner(app, run_schedule_receiver, run_schedule_sender.clone()); let ecs_lock = start_ecs_runner(app);
Self { Self {
ecs_lock, ecs_lock,
@ -172,7 +143,6 @@ impl<'a> StartClientOpts<'a> {
address, address,
resolved_address, resolved_address,
proxy: None, proxy: None,
run_schedule_sender,
event_sender, event_sender,
} }
} }
@ -188,20 +158,12 @@ impl Client {
/// World, and schedule runner function. /// World, and schedule runner function.
/// You should only use this if you want to change these fields from the /// You should only use this if you want to change these fields from the
/// defaults, otherwise use [`Client::join`]. /// defaults, otherwise use [`Client::join`].
pub fn new( pub fn new(entity: Entity, ecs: Arc<Mutex<World>>) -> Self {
profile: GameProfile,
entity: Entity,
ecs: Arc<Mutex<World>>,
run_schedule_sender: mpsc::Sender<()>,
) -> Self {
Self { Self {
profile,
// default our id to 0, it'll be set later // default our id to 0, it'll be set later
entity, entity,
ecs, ecs,
run_schedule_sender,
} }
} }
@ -268,7 +230,6 @@ impl Client {
address, address,
resolved_address, resolved_address,
proxy, proxy,
run_schedule_sender,
event_sender, event_sender,
}: StartClientOpts<'_>, }: StartClientOpts<'_>,
) -> Result<Self, JoinError> { ) -> Result<Self, JoinError> {
@ -291,92 +252,31 @@ impl Client {
entity entity
}; };
// add the Account to the entity now so plugins can access it earlier let mut entity_mut = ecs.entity_mut(entity);
ecs.entity_mut(entity).insert(account.to_owned()); entity_mut.insert((
InLoginState,
// add the Account to the entity now so plugins can access it earlier
account.to_owned(),
// localentity is always present for our clients, even if we're not actually logged
// in
LocalEntity,
));
if let Some(event_sender) = event_sender {
// this is optional so we don't leak memory in case the user doesn't want to
// handle receiving packets
entity_mut.insert(LocalPlayerEvents(event_sender));
}
entity entity
}; };
let conn = if let Some(proxy) = proxy { let mut conn = if let Some(proxy) = proxy {
Connection::new_with_proxy(resolved_address, proxy).await? Connection::new_with_proxy(resolved_address, proxy).await?
} else { } else {
Connection::new(resolved_address).await? Connection::new(resolved_address).await?
}; };
let (conn, game_profile) = debug!("Created connection to {resolved_address:?}");
Self::handshake(ecs_lock.clone(), entity, conn, account, address).await?;
// note that we send the proper packets in
// crate::configuration::handle_in_configuration_state
let (read_conn, write_conn) = conn.into_split();
let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
// we did the handshake, so now we're connected to the server
let mut ecs = ecs_lock.lock();
// we got the ConfigurationConnection, so the client is now connected :)
let client = Client::new(
game_profile.clone(),
entity,
ecs_lock.clone(),
run_schedule_sender.clone(),
);
let instance = Instance::default();
let instance_holder = crate::local_player::InstanceHolder::new(
entity,
// default to an empty world, it'll be set correctly later when we
// get the login packet
Arc::new(RwLock::new(instance)),
);
let mut entity = ecs.entity_mut(entity);
entity.insert((
// these stay when we switch to the game state
LocalPlayerBundle {
raw_connection: RawConnection::new(
run_schedule_sender,
ConnectionProtocol::Configuration,
read_conn,
write_conn,
),
game_profile: GameProfileComponent(game_profile),
client_information: crate::ClientInformation::default(),
instance_holder,
metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
},
InConfigState,
// this component is never removed
LocalEntity,
));
if let Some(event_sender) = event_sender {
// this is optional so we don't leak memory in case the user
entity.insert(LocalPlayerEvents(event_sender));
}
Ok(client)
}
/// Do a handshake with the server and get to the game state from the
/// initial handshake state.
///
/// This will also automatically refresh the account's access token if
/// it's expired.
pub async fn handshake(
ecs_lock: Arc<Mutex<World>>,
entity: Entity,
mut conn: Connection<ClientboundHandshakePacket, ServerboundHandshakePacket>,
account: &Account,
address: &ServerAddress,
) -> Result<
(
Connection<ClientboundConfigPacket, ServerboundConfigPacket>,
GameProfile,
),
JoinError,
> {
// handshake
conn.write(ServerboundIntention { conn.write(ServerboundIntention {
protocol_version: PROTOCOL_VERSION, protocol_version: PROTOCOL_VERSION,
hostname: address.host.clone(), hostname: address.host.clone(),
@ -384,147 +284,63 @@ impl Client {
intention: ClientIntention::Login, intention: ClientIntention::Login,
}) })
.await?; .await?;
let mut conn = conn.login(); let conn = conn.login();
// this makes it so plugins can send an `SendLoginPacketEvent` event to the ecs let (read_conn, write_conn) = conn.into_split();
// and we'll send it to the server let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
let (ecs_packets_tx, mut ecs_packets_rx) = mpsc::unbounded_channel();
ecs_lock.lock().entity_mut(entity).insert((
LoginSendPacketQueue { tx: ecs_packets_tx },
crate::packet::login::IgnoreQueryIds::default(),
InLoginState,
));
// login // insert the client into the ecs so it finishes logging in
conn.write(ServerboundHello { {
name: account.username.clone(), let mut ecs = ecs_lock.lock();
// TODO: pretty sure this should generate an offline-mode uuid instead of just
// Uuid::default()
profile_id: account.uuid.unwrap_or_default(),
})
.await?;
let (conn, profile) = loop { let instance = Instance::default();
let packet = tokio::select! { let instance_holder = crate::local_player::InstanceHolder::new(
packet = conn.read() => packet?,
Some(packet) = ecs_packets_rx.recv() => {
// write this packet to the server
conn.write(packet).await?;
continue;
}
};
ecs_lock.lock().send_event(login::LoginPacketEvent {
entity, entity,
packet: Arc::new(packet.clone()), // default to an empty world, it'll be set correctly later when we
}); // get the login packet
Arc::new(RwLock::new(instance)),
);
match packet { let mut entity = ecs.entity_mut(entity);
ClientboundLoginPacket::Hello(p) => { entity.insert((
debug!("Got encryption request"); // these stay when we switch to the game state
let Ok(e) = azalea_crypto::encrypt(&p.public_key, &p.challenge) else { LocalPlayerBundle {
error!("Failed to encrypt the challenge from the server for {p:?}"); raw_connection: RawConnection::new(
continue; read_conn,
}; write_conn,
ConnectionProtocol::Login,
),
client_information: crate::ClientInformation::default(),
instance_holder,
metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
},
));
}
if let Some(access_token) = &account.access_token { as_system::<Commands>(&mut ecs_lock.lock(), |mut commands| {
// keep track of the number of times we tried commands.entity(entity).insert((InLoginState,));
// authenticating so we can give up after too many commands.trigger(SendLoginPacketEvent::new(
let mut attempts: usize = 1; entity,
ServerboundHello {
name: account.username.clone(),
// TODO: pretty sure this should generate an offline-mode uuid instead of just
// Uuid::default()
profile_id: account.uuid.unwrap_or_default(),
},
))
});
while let Err(e) = { let client = Client::new(entity, ecs_lock.clone());
let access_token = access_token.lock().clone(); Ok(client)
conn.authenticate(
&access_token,
&account
.uuid
.expect("Uuid must be present if access token is present."),
e.secret_key,
&p,
)
.await
} {
if attempts >= 2 {
// if this is the second attempt and we failed
// both times, give up
return Err(e.into());
}
if matches!(
e,
ClientSessionServerError::InvalidSession
| ClientSessionServerError::ForbiddenOperation
) {
// uh oh, we got an invalid session and have
// to reauthenticate now
account.refresh().await?;
} else {
return Err(e.into());
}
attempts += 1;
}
}
conn.write(ServerboundKey {
key_bytes: e.encrypted_public_key,
encrypted_challenge: e.encrypted_challenge,
})
.await?;
conn.set_encryption_key(e.secret_key);
}
ClientboundLoginPacket::LoginCompression(p) => {
debug!("Got compression request {:?}", p.compression_threshold);
conn.set_compression_threshold(p.compression_threshold);
}
ClientboundLoginPacket::LoginFinished(p) => {
debug!(
"Got profile {:?}. handshake is finished and we're now switching to the configuration state",
p.game_profile
);
conn.write(ServerboundLoginAcknowledged {}).await?;
break (conn.config(), p.game_profile);
}
ClientboundLoginPacket::LoginDisconnect(p) => {
debug!("Got disconnect {:?}", p);
return Err(JoinError::Disconnect { reason: p.reason });
}
ClientboundLoginPacket::CustomQuery(p) => {
debug!("Got custom query {:?}", p);
// replying to custom query is done in
// packet::login::process_packet_events
}
ClientboundLoginPacket::CookieRequest(p) => {
debug!("Got cookie request {:?}", p);
conn.write(packets::login::ServerboundCookieResponse {
key: p.key,
// cookies aren't implemented
payload: None,
})
.await?;
}
}
};
ecs_lock
.lock()
.entity_mut(entity)
.remove::<login::IgnoreQueryIds>()
.remove::<LoginSendPacketQueue>()
.remove::<InLoginState>();
Ok((conn, profile))
} }
/// Write a packet directly to the server. /// Write a packet directly to the server.
pub fn write_packet( pub fn write_packet(&self, packet: impl Packet<ServerboundGamePacket>) {
&self,
packet: impl Packet<ServerboundGamePacket>,
) -> Result<(), crate::raw_connection::WritePacketError> {
let packet = packet.into_variant(); let packet = packet.into_variant();
self.raw_connection_mut(&mut self.ecs.lock()) self.ecs
.write_packet(packet) .lock()
.commands()
.trigger(SendPacketEvent::new(self.entity, packet));
} }
/// Disconnect this client from the server by ending all tasks. /// Disconnect this client from the server by ending all tasks.
@ -687,14 +503,11 @@ impl Client {
/// view_distance: 2, /// view_distance: 2,
/// ..Default::default() /// ..Default::default()
/// }) /// })
/// .await?; /// .await;
/// # Ok(()) /// # Ok(())
/// # } /// # }
/// ``` /// ```
pub async fn set_client_information( pub async fn set_client_information(&self, client_information: ClientInformation) {
&self,
client_information: ClientInformation,
) -> Result<(), crate::raw_connection::WritePacketError> {
{ {
let mut ecs = self.ecs.lock(); let mut ecs = self.ecs.lock();
let mut client_information_mut = self.query::<&mut ClientInformation>(&mut ecs); let mut client_information_mut = self.query::<&mut ClientInformation>(&mut ecs);
@ -706,10 +519,10 @@ impl Client {
"Sending client information (already logged in): {:?}", "Sending client information (already logged in): {:?}",
client_information client_information
); );
self.write_packet(azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { information: client_information.clone() })?; self.write_packet(game::s_client_information::ServerboundClientInformation {
client_information,
});
} }
Ok(())
} }
} }
@ -757,14 +570,14 @@ impl Client {
/// This is a shortcut for /// This is a shortcut for
/// `bot.component::<GameProfileComponent>().name.to_owned()`. /// `bot.component::<GameProfileComponent>().name.to_owned()`.
pub fn username(&self) -> String { pub fn username(&self) -> String {
self.component::<GameProfileComponent>().name.to_owned() self.profile().name.to_owned()
} }
/// Get the Minecraft UUID of this client. /// Get the Minecraft UUID of this client.
/// ///
/// This is a shortcut for `bot.component::<GameProfileComponent>().uuid`. /// This is a shortcut for `bot.component::<GameProfileComponent>().uuid`.
pub fn uuid(&self) -> Uuid { pub fn uuid(&self) -> Uuid {
self.component::<GameProfileComponent>().uuid self.profile().uuid
} }
/// Get a map of player UUIDs to their information in the tab list. /// Get a map of player UUIDs to their information in the tab list.
@ -774,6 +587,19 @@ impl Client {
(*self.component::<TabList>()).clone() (*self.component::<TabList>()).clone()
} }
/// Returns the [`GameProfile`] for our client. This contains your username,
/// UUID, and skin data.
///
/// These values are set by the server upon login, which means they might
/// not match up with your actual game profile. Also, note that the username
/// and skin that gets displayed in-game will actually be the ones from
/// the tab list, which you can get from [`Self::tab_list`].
///
/// This as also available from the ECS as [`GameProfileComponent`].
pub fn profile(&self) -> GameProfile {
(*self.component::<GameProfileComponent>()).clone()
}
/// A convenience function to get the Minecraft Uuid of a player by their /// A convenience function to get the Minecraft Uuid of a player by their
/// username, if they're present in the tab list. /// username, if they're present in the tab list.
/// ///
@ -854,15 +680,14 @@ impl Client {
} }
} }
/// The bundle of components that's shared when we're either in the /// A bundle of components that's inserted right when we switch to the `login`
/// `configuration` or `game` state. /// state and stay present on our clients until we disconnect.
/// ///
/// For the components that are only present in the `game` state, see /// For the components that are only present in the `game` state, see
/// [`JoinedClientBundle`]. /// [`JoinedClientBundle`].
#[derive(Bundle)] #[derive(Bundle)]
pub struct LocalPlayerBundle { pub struct LocalPlayerBundle {
pub raw_connection: RawConnection, pub raw_connection: RawConnection,
pub game_profile: GameProfileComponent,
pub client_information: ClientInformation, pub client_information: ClientInformation,
pub instance_holder: InstanceHolder, pub instance_holder: InstanceHolder,
@ -922,11 +747,7 @@ impl Plugin for AzaleaPlugin {
/// You can create your app with `App::new()`, but don't forget to add /// You can create your app with `App::new()`, but don't forget to add
/// [`DefaultPlugins`]. /// [`DefaultPlugins`].
#[doc(hidden)] #[doc(hidden)]
pub fn start_ecs_runner( pub fn start_ecs_runner(mut app: App) -> Arc<Mutex<World>> {
mut app: App,
run_schedule_receiver: mpsc::Receiver<()>,
run_schedule_sender: mpsc::Sender<()>,
) -> Arc<Mutex<World>> {
// this block is based on Bevy's default runner: // this block is based on Bevy's default runner:
// https://github.com/bevyengine/bevy/blob/390877cdae7a17095a75c8f9f1b4241fe5047e83/crates/bevy_app/src/schedule_runner.rs#L77-L85 // https://github.com/bevyengine/bevy/blob/390877cdae7a17095a75c8f9f1b4241fe5047e83/crates/bevy_app/src/schedule_runner.rs#L77-L85
if app.plugins_state() != PluginsState::Cleaned { if app.plugins_state() != PluginsState::Cleaned {
@ -949,35 +770,54 @@ pub fn start_ecs_runner(
tokio::spawn(run_schedule_loop( tokio::spawn(run_schedule_loop(
ecs.clone(), ecs.clone(),
*app.main().update_schedule.as_ref().unwrap(), *app.main().update_schedule.as_ref().unwrap(),
run_schedule_receiver,
)); ));
tokio::spawn(tick_run_schedule_loop(run_schedule_sender));
ecs ecs
} }
async fn run_schedule_loop( async fn run_schedule_loop(ecs: Arc<Mutex<World>>, outer_schedule_label: InternedScheduleLabel) {
ecs: Arc<Mutex<World>>, let mut last_update: Option<Instant> = None;
outer_schedule_label: InternedScheduleLabel,
mut run_schedule_receiver: mpsc::Receiver<()>,
) {
let mut last_tick: Option<Instant> = None; let mut last_tick: Option<Instant> = None;
// azalea runs the Update schedule at most 60 times per second to simulate
// framerate. unlike vanilla though, we also only handle packets during Updates
// due to everything running in ecs systems.
const UPDATE_DURATION_TARGET: Duration = Duration::from_micros(1_000_000 / 60);
// minecraft runs at 20 tps
const GAME_TICK_DURATION_TARGET: Duration = Duration::from_micros(1_000_000 / 20);
loop { loop {
// whenever we get an event from run_schedule_receiver, run the schedule // sleep until the next update if necessary
run_schedule_receiver.recv().await; let now = Instant::now();
if let Some(last_update) = last_update {
let elapsed = now.duration_since(last_update);
if elapsed < UPDATE_DURATION_TARGET {
time::sleep(UPDATE_DURATION_TARGET - elapsed).await;
}
}
last_update = Some(now);
let mut ecs = ecs.lock(); let mut ecs = ecs.lock();
// if last tick is None or more than 50ms ago, run the GameTick schedule // if last tick is None or more than 50ms ago, run the GameTick schedule
ecs.run_schedule(outer_schedule_label); ecs.run_schedule(outer_schedule_label);
if last_tick if last_tick
.map(|last_tick| last_tick.elapsed() > Duration::from_millis(50)) .map(|last_tick| last_tick.elapsed() > GAME_TICK_DURATION_TARGET)
.unwrap_or(true) .unwrap_or(true)
{ {
if let Some(last_tick) = &mut last_tick { if let Some(last_tick) = &mut last_tick {
*last_tick += Duration::from_millis(50); *last_tick += GAME_TICK_DURATION_TARGET;
// if we're more than 10 ticks behind, set last_tick to now.
// vanilla doesn't do it in exactly the same way but it shouldn't really matter
if (now - *last_tick) > GAME_TICK_DURATION_TARGET * 10 {
warn!(
"GameTick is more than 10 ticks behind, skipping ticks so we don't have to burst too much"
);
*last_tick = now;
}
} else { } else {
last_tick = Some(Instant::now()); last_tick = Some(now);
} }
ecs.run_schedule(GameTick); ecs.run_schedule(GameTick);
} }
@ -986,23 +826,6 @@ async fn run_schedule_loop(
} }
} }
/// Send an event to run the schedule every 50 milliseconds. It will stop when
/// the receiver is dropped.
pub async fn tick_run_schedule_loop(run_schedule_sender: mpsc::Sender<()>) {
let mut game_tick_interval = time::interval(Duration::from_millis(50));
// TODO: Minecraft bursts up to 10 ticks and then skips, we should too
game_tick_interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst);
loop {
game_tick_interval.tick().await;
if let Err(TrySendError::Closed(())) = run_schedule_sender.try_send(()) {
error!("tick_run_schedule_loop failed because run_schedule_sender was closed");
// the sender is closed so end the task
return;
}
}
}
pub struct AmbiguityLoggerPlugin; pub struct AmbiguityLoggerPlugin;
impl Plugin for AmbiguityLoggerPlugin { impl Plugin for AmbiguityLoggerPlugin {
fn build(&self, app: &mut App) { fn build(&self, app: &mut App) {
@ -1020,40 +843,3 @@ impl Plugin for AmbiguityLoggerPlugin {
}); });
} }
} }
/// This plugin group will add all the default plugins necessary for Azalea to
/// work.
pub struct DefaultPlugins;
impl PluginGroup for DefaultPlugins {
fn build(self) -> PluginGroupBuilder {
#[allow(unused_mut)]
let mut group = PluginGroupBuilder::start::<Self>()
.add(AmbiguityLoggerPlugin)
.add(TimePlugin)
.add(PacketPlugin)
.add(AzaleaPlugin)
.add(EntityPlugin)
.add(PhysicsPlugin)
.add(EventsPlugin)
.add(TaskPoolPlugin::default())
.add(InventoryPlugin)
.add(ChatPlugin)
.add(DisconnectPlugin)
.add(MovementPlugin)
.add(InteractPlugin)
.add(RespawnPlugin)
.add(MiningPlugin)
.add(AttackPlugin)
.add(ChunksPlugin)
.add(TickEndPlugin)
.add(BrandPlugin)
.add(TickBroadcastPlugin)
.add(PongPlugin);
#[cfg(feature = "log")]
{
group = group.add(bevy_log::LogPlugin::default());
}
group
}
}

View file

@ -15,7 +15,6 @@ mod local_player;
pub mod ping; pub mod ping;
mod player; mod player;
mod plugins; mod plugins;
pub mod raw_connection;
#[doc(hidden)] #[doc(hidden)]
pub mod test_simulation; pub mod test_simulation;
@ -23,8 +22,8 @@ pub mod test_simulation;
pub use account::{Account, AccountOpts}; pub use account::{Account, AccountOpts};
pub use azalea_protocol::common::client_information::ClientInformation; pub use azalea_protocol::common::client_information::ClientInformation;
pub use client::{ pub use client::{
Client, DefaultPlugins, InConfigState, InGameState, JoinError, JoinedClientBundle, Client, InConfigState, InGameState, JoinError, JoinedClientBundle, LocalPlayerBundle,
LocalPlayerBundle, StartClientOpts, start_ecs_runner, StartClientOpts, start_ecs_runner,
}; };
pub use events::Event; pub use events::Event;
pub use local_player::{GameProfileComponent, Hunger, InstanceHolder, TabList}; pub use local_player::{GameProfileComponent, Hunger, InstanceHolder, TabList};

View file

@ -152,7 +152,6 @@ impl Client {
content: message.to_string(), content: message.to_string(),
kind: ChatKind::Message, kind: ChatKind::Message,
}); });
let _ = self.run_schedule_sender.try_send(());
} }
/// Send a command packet to the server. The `command` argument should not /// Send a command packet to the server. The `command` argument should not
@ -166,7 +165,6 @@ impl Client {
content: command.to_string(), content: command.to_string(),
kind: ChatKind::Command, kind: ChatKind::Command,
}); });
let _ = self.run_schedule_sender.try_send(());
} }
/// Send a message in chat. /// Send a message in chat.
@ -183,7 +181,6 @@ impl Client {
entity: self.entity, entity: self.entity,
content: content.to_string(), content: content.to_string(),
}); });
let _ = self.run_schedule_sender.try_send(());
} }
} }

View file

@ -0,0 +1,369 @@
use std::{fmt::Debug, io::Cursor, mem, sync::Arc};
use azalea_crypto::Aes128CfbEnc;
use azalea_protocol::{
connect::{RawReadConnection, RawWriteConnection},
packets::{
ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
game::ClientboundGamePacket, login::ClientboundLoginPacket,
},
read::{ReadPacketError, deserialize_packet},
write::serialize_packet,
};
use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use bevy_tasks::{IoTaskPool, futures_lite::future};
use thiserror::Error;
use tokio::{
io::AsyncWriteExt,
net::tcp::OwnedWriteHalf,
sync::mpsc::{self},
};
use tracing::{debug, error, info, trace};
use super::packet::{
config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
};
use crate::packet::{config, game, login};
pub struct ConnectionPlugin;
impl Plugin for ConnectionPlugin {
fn build(&self, app: &mut App) {
app.add_systems(PreUpdate, (read_packets, poll_all_writer_tasks).chain());
}
}
pub fn read_packets(ecs: &mut World) {
// receive_game_packet_events: EventWriter<ReceiveGamePacketEvent>,
let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
let mut conn_query = ecs.query::<&mut RawConnection>();
let mut entities_handling_packets = Vec::new();
let mut entities_with_injected_packets = Vec::new();
for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
if !raw_conn.injected_clientbound_packets.is_empty() {
entities_with_injected_packets.push((
entity,
mem::take(&mut raw_conn.injected_clientbound_packets),
));
}
if raw_conn.network.is_none() {
// no network connection, don't bother with the normal packet handling
continue;
}
entities_handling_packets.push(entity);
}
let mut queued_packet_events = QueuedPacketEvents::default();
// handle injected packets, see the comment on
// RawConnection::injected_clientbound_packets for more info
for (entity, raw_packets) in entities_with_injected_packets {
for raw_packet in raw_packets {
let conn = conn_query.get(ecs, entity).unwrap();
let state = conn.state;
trace!("Received injected packet with bytes: {raw_packet:?}");
if let Err(e) =
handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events)
{
error!("Error reading injected packet: {e}");
}
}
}
for entity in entities_handling_packets {
loop {
let mut conn = conn_query.get_mut(ecs, entity).unwrap();
let net_conn = conn.net_conn().unwrap();
let read_res = net_conn.reader.try_read();
let state = conn.state;
match read_res {
Ok(Some(raw_packet)) => {
let raw_packet = Arc::<[u8]>::from(raw_packet);
if let Err(e) = handle_raw_packet(
ecs,
&raw_packet,
entity,
state,
&mut queued_packet_events,
) {
error!("Error reading packet: {e}");
}
}
Ok(None) => {
// no packets available
break;
}
Err(err) => {
log_for_error(&err);
if matches!(
&*err,
ReadPacketError::IoError { .. } | ReadPacketError::ConnectionClosed
) {
info!("Server closed connection");
// ungraceful disconnect :(
conn.network = None;
// setting this will make us send a DisconnectEvent
conn.is_alive = false;
}
break;
}
}
}
}
queued_packet_events.send_events(ecs);
}
fn poll_all_writer_tasks(mut conn_query: Query<&mut RawConnection>) {
for mut conn in conn_query.iter_mut() {
if let Some(net_conn) = &mut conn.network {
// this needs to be done at some point every update to make sure packets are
// actually sent to the network
net_conn.poll_writer();
}
}
}
#[derive(Default)]
pub struct QueuedPacketEvents {
login: Vec<ReceiveLoginPacketEvent>,
config: Vec<ReceiveConfigPacketEvent>,
game: Vec<ReceiveGamePacketEvent>,
}
impl QueuedPacketEvents {
fn send_events(&mut self, ecs: &mut World) {
ecs.send_event_batch(self.login.drain(..));
ecs.send_event_batch(self.config.drain(..));
ecs.send_event_batch(self.game.drain(..));
}
}
fn log_for_error(error: &ReadPacketError) {
if !matches!(*error, ReadPacketError::ConnectionClosed) {
error!("Error reading packet from Client: {error:?}");
}
}
/// The client's connection to the server.
#[derive(Component)]
pub struct RawConnection {
/// The network connection to the server.
///
/// This isn't guaranteed to be present, for example during the main packet
/// handlers or at all times during tests.
///
/// You shouldn't rely on this. Instead, use the events for sending packets
/// like [`SendPacketEvent`](crate::packet::game::SendPacketEvent) /
/// [`SendConfigPacketEvent`](crate::packet::config::SendConfigPacketEvent)
/// / [`SendLoginPacketEvent`](crate::packet::login::SendLoginPacketEvent).
///
/// To check if we haven't disconnected from the server, use
/// [`Self::is_alive`].
network: Option<NetworkConnection>,
pub state: ConnectionProtocol,
is_alive: bool,
/// This exists for internal testing purposes and probably shouldn't be used
/// for normal bots. It's basically a way to make our client think it
/// received a packet from the server without needing to interact with the
/// network.
pub injected_clientbound_packets: Vec<Box<[u8]>>,
}
impl RawConnection {
pub fn new(
reader: RawReadConnection,
writer: RawWriteConnection,
state: ConnectionProtocol,
) -> Self {
let task_pool = IoTaskPool::get();
let (network_packet_writer_tx, network_packet_writer_rx) =
mpsc::unbounded_channel::<Box<[u8]>>();
let writer_task =
task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
let mut conn = Self::new_networkless(state);
conn.network = Some(NetworkConnection {
reader,
enc_cipher: writer.enc_cipher,
network_packet_writer_tx,
writer_task,
});
conn
}
pub fn new_networkless(state: ConnectionProtocol) -> Self {
Self {
network: None,
state,
is_alive: true,
injected_clientbound_packets: Vec::new(),
}
}
pub fn is_alive(&self) -> bool {
self.is_alive
}
/// Write a packet to the server without emitting any events.
///
/// This is called by the handlers for [`SendPacketEvent`],
/// [`SendConfigPacketEvent`], and [`SendLoginPacketEvent`].
///
/// [`SendPacketEvent`]: crate::packet::game::SendPacketEvent
/// [`SendConfigPacketEvent`]: crate::packet::config::SendConfigPacketEvent
/// [`SendLoginPacketEvent`]: crate::packet::login::SendLoginPacketEvent
pub fn write<P: ProtocolPacket + Debug>(
&mut self,
packet: impl Packet<P>,
) -> Result<(), WritePacketError> {
if let Some(network) = &mut self.network {
network.write(packet)?;
} else {
debug!(
"tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
);
}
Ok(())
}
pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
self.network.as_mut()
}
}
pub fn handle_raw_packet(
ecs: &mut World,
raw_packet: &[u8],
entity: Entity,
state: ConnectionProtocol,
queued_packet_events: &mut QueuedPacketEvents,
) -> Result<(), Box<ReadPacketError>> {
let stream = &mut Cursor::new(raw_packet);
match state {
ConnectionProtocol::Handshake => {
unreachable!()
}
ConnectionProtocol::Game => {
let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
trace!("Packet: {packet:?}");
game::process_packet(ecs, entity, packet.as_ref());
queued_packet_events
.game
.push(ReceiveGamePacketEvent { entity, packet });
}
ConnectionProtocol::Status => {
unreachable!()
}
ConnectionProtocol::Login => {
let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
trace!("Packet: {packet:?}");
login::process_packet(ecs, entity, &packet);
queued_packet_events
.login
.push(ReceiveLoginPacketEvent { entity, packet });
}
ConnectionProtocol::Configuration => {
let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
trace!("Packet: {packet:?}");
config::process_packet(ecs, entity, &packet);
queued_packet_events
.config
.push(ReceiveConfigPacketEvent { entity, packet });
}
};
Ok(())
}
pub struct NetworkConnection {
reader: RawReadConnection,
// compression threshold is in the RawReadConnection
pub enc_cipher: Option<Aes128CfbEnc>,
pub writer_task: bevy_tasks::Task<()>,
/// A queue of raw TCP packets to send. These will not be modified further,
/// they should already be serialized and encrypted and everything before
/// being added here.
network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
}
impl NetworkConnection {
pub fn write<P: ProtocolPacket + Debug>(
&mut self,
packet: impl Packet<P>,
) -> Result<(), WritePacketError> {
let packet = packet.into_variant();
let raw_packet = serialize_packet(&packet)?;
self.write_raw(&raw_packet)?;
Ok(())
}
pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
let network_packet = azalea_protocol::write::encode_to_network_packet(
raw_packet,
self.reader.compression_threshold,
&mut self.enc_cipher,
);
self.network_packet_writer_tx
.send(network_packet.into_boxed_slice())?;
Ok(())
}
pub fn poll_writer(&mut self) {
let poll_once_res = future::poll_once(&mut self.writer_task);
future::block_on(poll_once_res);
}
pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
trace!("Set compression threshold to {threshold:?}");
self.reader.compression_threshold = threshold;
}
/// Set the encryption key that is used to encrypt and decrypt packets. It's
/// the same for both reading and writing.
pub fn set_encryption_key(&mut self, key: [u8; 16]) {
trace!("Enabled protocol encryption");
let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
self.reader.dec_cipher = Some(dec_cipher);
self.enc_cipher = Some(enc_cipher);
}
}
async fn write_task(
mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
mut write_half: OwnedWriteHalf,
) {
while let Some(network_packet) = network_packet_writer_rx.recv().await {
if let Err(e) = write_half.write_all(&network_packet).await {
debug!("Error writing packet to server: {e}");
break;
};
}
trace!("write task is done");
}
#[derive(Error, Debug)]
pub enum WritePacketError {
#[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
WrongState {
expected: ConnectionProtocol,
got: ConnectionProtocol,
},
#[error(transparent)]
Encoding(#[from] azalea_protocol::write::PacketEncodeError),
#[error(transparent)]
SendError {
#[from]
#[backtrace]
source: mpsc::error::SendError<Box<[u8]>>,
},
}

View file

@ -16,8 +16,8 @@ use derive_more::Deref;
use tracing::trace; use tracing::trace;
use crate::{ use crate::{
InstanceHolder, client::JoinedClientBundle, events::LocalPlayerEvents, InstanceHolder, client::JoinedClientBundle, connection::RawConnection,
raw_connection::RawConnection, events::LocalPlayerEvents,
}; };
pub struct DisconnectPlugin; pub struct DisconnectPlugin;

View file

@ -27,7 +27,7 @@ use crate::{
chat::{ChatPacket, ChatReceivedEvent}, chat::{ChatPacket, ChatReceivedEvent},
disconnect::DisconnectEvent, disconnect::DisconnectEvent,
packet::game::{ packet::game::{
AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceivePacketEvent, RemovePlayerEvent, AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceiveGamePacketEvent, RemovePlayerEvent,
UpdatePlayerEvent, UpdatePlayerEvent,
}, },
}; };
@ -157,7 +157,7 @@ impl Plugin for EventsPlugin {
) )
.add_systems( .add_systems(
PreUpdate, PreUpdate,
init_listener.before(crate::packet::game::process_packet_events), init_listener.before(super::connection::read_packets),
) )
.add_systems(GameTick, tick_listener); .add_systems(GameTick, tick_listener);
} }
@ -217,7 +217,7 @@ pub fn tick_listener(query: Query<&LocalPlayerEvents, With<InstanceName>>) {
pub fn packet_listener( pub fn packet_listener(
query: Query<&LocalPlayerEvents>, query: Query<&LocalPlayerEvents>,
mut events: EventReader<ReceivePacketEvent>, mut events: EventReader<ReceiveGamePacketEvent>,
) { ) {
for event in events.read() { for event in events.read() {
if let Ok(local_player_events) = query.get(event.entity) { if let Ok(local_player_events) = query.get(event.entity) {

View file

@ -0,0 +1,152 @@
use azalea_auth::sessionserver::ClientSessionServerError;
use azalea_protocol::packets::login::{
ClientboundHello, ServerboundCustomQueryAnswer, ServerboundKey,
};
use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
use tracing::{debug, error, trace};
use super::{
connection::RawConnection,
packet::login::{ReceiveCustomQueryEvent, ReceiveHelloEvent, SendLoginPacketEvent},
};
use crate::{Account, JoinError};
/// Some systems that run during the `login` state.
pub struct LoginPlugin;
impl Plugin for LoginPlugin {
fn build(&self, app: &mut App) {
app.add_observer(handle_receive_hello_event)
.add_systems(Update, (poll_auth_task, reply_to_custom_queries));
}
}
fn handle_receive_hello_event(trigger: Trigger<ReceiveHelloEvent>, mut commands: Commands) {
let task_pool = IoTaskPool::get();
let account = trigger.account.clone();
let packet = trigger.packet.clone();
let player = trigger.entity();
let task = task_pool.spawn(auth_with_account(account, packet));
commands.entity(player).insert(AuthTask(task));
}
fn poll_auth_task(
mut commands: Commands,
mut query: Query<(Entity, &mut AuthTask, &mut RawConnection)>,
) {
for (entity, mut auth_task, mut raw_conn) in query.iter_mut() {
if let Some(poll_res) = future::block_on(future::poll_once(&mut auth_task.0)) {
debug!("Finished auth");
commands.entity(entity).remove::<AuthTask>();
match poll_res {
Ok((packet, private_key)) => {
// we use this instead of SendLoginPacketEvent to ensure that it's sent right
// before encryption is enabled. i guess another option would be to make a
// Trigger+observer for set_encryption_key; the current implementation is
// simpler though.
if let Err(e) = raw_conn.write(packet) {
error!("Error sending key packet: {e:?}");
}
if let Some(net_conn) = raw_conn.net_conn() {
net_conn.set_encryption_key(private_key);
}
}
Err(err) => {
error!("Error during authentication: {err:?}");
}
}
}
}
}
type PrivateKey = [u8; 16];
#[derive(Component)]
pub struct AuthTask(Task<Result<(ServerboundKey, PrivateKey), JoinError>>);
pub async fn auth_with_account(
account: Account,
packet: ClientboundHello,
) -> Result<(ServerboundKey, PrivateKey), JoinError> {
let Ok(encrypt_res) = azalea_crypto::encrypt(&packet.public_key, &packet.challenge) else {
return Err(JoinError::EncryptionError(packet));
};
let key_packet = ServerboundKey {
key_bytes: encrypt_res.encrypted_public_key,
encrypted_challenge: encrypt_res.encrypted_challenge,
};
let private_key = encrypt_res.secret_key;
let Some(access_token) = &account.access_token else {
// offline mode account, no need to do auth
return Ok((key_packet, private_key));
};
// keep track of the number of times we tried authenticating so we can give up
// after too many
let mut attempts: usize = 1;
while let Err(err) = {
let access_token = access_token.lock().clone();
let uuid = &account
.uuid
.expect("Uuid must be present if access token is present.");
// this is necessary since reqwest usually depends on tokio and we're using
// `futures` here
async_compat::Compat::new(async {
azalea_auth::sessionserver::join(
&access_token,
&packet.public_key,
&private_key,
uuid,
&packet.server_id,
)
.await
})
.await
} {
if attempts >= 2 {
// if this is the second attempt and we failed
// both times, give up
return Err(err.into());
}
if matches!(
err,
ClientSessionServerError::InvalidSession | ClientSessionServerError::ForbiddenOperation
) {
// uh oh, we got an invalid session and have
// to reauthenticate now
account.refresh().await?;
} else {
return Err(err.into());
}
attempts += 1;
}
Ok((key_packet, private_key))
}
pub fn reply_to_custom_queries(
mut commands: Commands,
mut events: EventReader<ReceiveCustomQueryEvent>,
) {
for event in events.read() {
trace!("Maybe replying to custom query: {event:?}");
if event.disabled {
continue;
}
commands.trigger(SendLoginPacketEvent::new(
event.entity,
ServerboundCustomQueryAnswer {
transaction_id: event.packet.transaction_id,
data: None,
},
));
}
}

View file

@ -1,11 +1,15 @@
use bevy_app::{PluginGroup, PluginGroupBuilder};
pub mod attack; pub mod attack;
pub mod brand; pub mod brand;
pub mod chat; pub mod chat;
pub mod chunks; pub mod chunks;
pub mod connection;
pub mod disconnect; pub mod disconnect;
pub mod events; pub mod events;
pub mod interact; pub mod interact;
pub mod inventory; pub mod inventory;
pub mod login;
pub mod mining; pub mod mining;
pub mod movement; pub mod movement;
pub mod packet; pub mod packet;
@ -14,3 +18,42 @@ pub mod respawn;
pub mod task_pool; pub mod task_pool;
pub mod tick_broadcast; pub mod tick_broadcast;
pub mod tick_end; pub mod tick_end;
/// This plugin group will add all the default plugins necessary for Azalea to
/// work.
pub struct DefaultPlugins;
impl PluginGroup for DefaultPlugins {
fn build(self) -> PluginGroupBuilder {
#[allow(unused_mut)]
let mut group = PluginGroupBuilder::start::<Self>()
.add(crate::client::AmbiguityLoggerPlugin)
.add(bevy_time::TimePlugin)
.add(packet::PacketPlugin)
.add(crate::client::AzaleaPlugin)
.add(azalea_entity::EntityPlugin)
.add(azalea_physics::PhysicsPlugin)
.add(events::EventsPlugin)
.add(task_pool::TaskPoolPlugin::default())
.add(inventory::InventoryPlugin)
.add(chat::ChatPlugin)
.add(disconnect::DisconnectPlugin)
.add(movement::MovementPlugin)
.add(interact::InteractPlugin)
.add(respawn::RespawnPlugin)
.add(mining::MiningPlugin)
.add(attack::AttackPlugin)
.add(chunks::ChunksPlugin)
.add(tick_end::TickEndPlugin)
.add(brand::BrandPlugin)
.add(tick_broadcast::TickBroadcastPlugin)
.add(pong::PongPlugin)
.add(connection::ConnectionPlugin)
.add(login::LoginPlugin);
#[cfg(feature = "log")]
{
group = group.add(bevy_log::LogPlugin::default());
}
group
}
}

View file

@ -1,23 +1,20 @@
use std::io::Cursor; use std::sync::Arc;
use azalea_protocol::{ use azalea_protocol::packets::{
packets::{ Packet,
Packet, config::{ClientboundConfigPacket, ServerboundConfigPacket},
config::{ClientboundConfigPacket, ServerboundConfigPacket},
},
read::deserialize_packet,
}; };
use bevy_ecs::prelude::*; use bevy_ecs::prelude::*;
use tracing::{debug, error}; use tracing::{debug, error};
use crate::{InConfigState, raw_connection::RawConnection}; use crate::{InConfigState, connection::RawConnection};
#[derive(Event, Debug, Clone)] #[derive(Event, Debug, Clone)]
pub struct ReceiveConfigPacketEvent { pub struct ReceiveConfigPacketEvent {
/// The client entity that received the packet. /// The client entity that received the packet.
pub entity: Entity, pub entity: Entity,
/// The packet that was actually received. /// The packet that was actually received.
pub packet: ClientboundConfigPacket, pub packet: Arc<ClientboundConfigPacket>,
} }
/// An event for sending a packet to the server while we're in the /// An event for sending a packet to the server while we're in the
@ -39,7 +36,7 @@ pub fn handle_outgoing_packets_observer(
mut query: Query<(&mut RawConnection, Option<&InConfigState>)>, mut query: Query<(&mut RawConnection, Option<&InConfigState>)>,
) { ) {
let event = trigger.event(); let event = trigger.event();
if let Ok((raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) { if let Ok((mut raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) {
if in_configuration_state.is_none() { if in_configuration_state.is_none() {
error!( error!(
"Tried to send a configuration packet {:?} while not in configuration state", "Tried to send a configuration packet {:?} while not in configuration state",
@ -47,8 +44,8 @@ pub fn handle_outgoing_packets_observer(
); );
return; return;
} }
debug!("Sending packet: {:?}", event.packet); debug!("Sending config packet: {:?}", event.packet);
if let Err(e) = raw_conn.write_packet(event.packet.clone()) { if let Err(e) = raw_conn.write(event.packet.clone()) {
error!("Failed to send packet: {e}"); error!("Failed to send packet: {e}");
} }
} }
@ -64,61 +61,6 @@ pub fn handle_outgoing_packets(
} }
} }
pub fn emit_receive_config_packet_events(
query: Query<(Entity, &RawConnection), With<InConfigState>>,
mut packet_events: ResMut<Events<ReceiveConfigPacketEvent>>,
) {
// we manually clear and send the events at the beginning of each update
// since otherwise it'd cause issues with events in process_packet_events
// running twice
packet_events.clear();
for (player_entity, raw_conn) in &query {
let packets_lock = raw_conn.incoming_packet_queue();
let mut packets = packets_lock.lock();
if !packets.is_empty() {
let mut packets_read = 0;
for raw_packet in packets.iter() {
packets_read += 1;
let packet = match deserialize_packet::<ClientboundConfigPacket>(&mut Cursor::new(
raw_packet,
)) {
Ok(packet) => packet,
Err(err) => {
error!("failed to read packet: {err:?}");
debug!("packet bytes: {raw_packet:?}");
continue;
}
};
let should_interrupt = packet_interrupts(&packet);
packet_events.send(ReceiveConfigPacketEvent {
entity: player_entity,
packet,
});
if should_interrupt {
break;
}
}
packets.drain(0..packets_read);
}
}
}
/// Whether the given packet should make us stop deserializing the received
/// packets until next update.
///
/// This is used for packets that can switch the client state.
fn packet_interrupts(packet: &ClientboundConfigPacket) -> bool {
matches!(
packet,
ClientboundConfigPacket::FinishConfiguration(_)
| ClientboundConfigPacket::Disconnect(_)
| ClientboundConfigPacket::Transfer(_)
)
}
/// A Bevy trigger that's sent when our client receives a [`ClientboundPing`] /// A Bevy trigger that's sent when our client receives a [`ClientboundPing`]
/// packet in the config state. /// packet in the config state.
/// ///

View file

@ -1,65 +1,61 @@
mod events; mod events;
use std::io::Cursor;
use azalea_entity::LocalEntity; use azalea_entity::LocalEntity;
use azalea_protocol::packets::ConnectionProtocol; use azalea_protocol::packets::ConnectionProtocol;
use azalea_protocol::packets::config::*; use azalea_protocol::packets::config::*;
use azalea_protocol::read::ReadPacketError;
use azalea_protocol::read::deserialize_packet;
use bevy_ecs::prelude::*; use bevy_ecs::prelude::*;
use bevy_ecs::system::SystemState;
pub use events::*; pub use events::*;
use tracing::{debug, warn}; use tracing::{debug, warn};
use super::as_system; use super::as_system;
use crate::client::InConfigState; use crate::client::InConfigState;
use crate::connection::RawConnection;
use crate::disconnect::DisconnectEvent; use crate::disconnect::DisconnectEvent;
use crate::packet::game::KeepAliveEvent; use crate::packet::game::KeepAliveEvent;
use crate::packet::game::ResourcePackEvent; use crate::packet::game::ResourcePackEvent;
use crate::raw_connection::RawConnection;
use crate::{InstanceHolder, declare_packet_handlers}; use crate::{InstanceHolder, declare_packet_handlers};
pub fn process_packet_events(ecs: &mut World) { pub fn process_raw_packet(
let mut events_owned = Vec::new(); ecs: &mut World,
let mut system_state: SystemState<EventReader<ReceiveConfigPacketEvent>> = player: Entity,
SystemState::new(ecs); raw_packet: &[u8],
let mut events = system_state.get_mut(ecs); ) -> Result<(), Box<ReadPacketError>> {
for ReceiveConfigPacketEvent { let packet = deserialize_packet(&mut Cursor::new(raw_packet))?;
entity: player_entity, process_packet(ecs, player, &packet);
packet, Ok(())
} in events.read() }
{
// we do this so `ecs` isn't borrowed for the whole loop
events_owned.push((*player_entity, packet.clone()));
}
for (player_entity, packet) in events_owned {
let mut handler = ConfigPacketHandler {
player: player_entity,
ecs,
};
declare_packet_handlers!( pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundConfigPacket) {
ClientboundConfigPacket, let mut handler = ConfigPacketHandler { player, ecs };
packet,
handler, declare_packet_handlers!(
[ ClientboundConfigPacket,
cookie_request, packet,
custom_payload, handler,
disconnect, [
finish_configuration, cookie_request,
keep_alive, custom_payload,
ping, disconnect,
reset_chat, finish_configuration,
registry_data, keep_alive,
resource_pack_pop, ping,
resource_pack_push, reset_chat,
store_cookie, registry_data,
transfer, resource_pack_pop,
update_enabled_features, resource_pack_push,
update_tags, store_cookie,
select_known_packs, transfer,
custom_report_details, update_enabled_features,
server_links, update_tags,
] select_known_packs,
); custom_report_details,
} server_links,
]
);
} }
pub struct ConfigPacketHandler<'a> { pub struct ConfigPacketHandler<'a> {
@ -67,44 +63,45 @@ pub struct ConfigPacketHandler<'a> {
pub player: Entity, pub player: Entity,
} }
impl ConfigPacketHandler<'_> { impl ConfigPacketHandler<'_> {
pub fn registry_data(&mut self, p: ClientboundRegistryData) { pub fn registry_data(&mut self, p: &ClientboundRegistryData) {
as_system::<Query<&mut InstanceHolder>>(self.ecs, |mut query| { as_system::<Query<&mut InstanceHolder>>(self.ecs, |mut query| {
let instance_holder = query.get_mut(self.player).unwrap(); let instance_holder = query.get_mut(self.player).unwrap();
let mut instance = instance_holder.instance.write(); let mut instance = instance_holder.instance.write();
// add the new registry data // add the new registry data
instance.registries.append(p.registry_id, p.entries); instance
.registries
.append(p.registry_id.clone(), p.entries.clone());
}); });
} }
pub fn custom_payload(&mut self, p: ClientboundCustomPayload) { pub fn custom_payload(&mut self, p: &ClientboundCustomPayload) {
debug!("Got custom payload packet {p:?}"); debug!("Got custom payload packet {p:?}");
} }
pub fn disconnect(&mut self, p: ClientboundDisconnect) { pub fn disconnect(&mut self, p: &ClientboundDisconnect) {
warn!("Got disconnect packet {p:?}"); warn!("Got disconnect packet {p:?}");
as_system::<EventWriter<_>>(self.ecs, |mut events| { as_system::<EventWriter<_>>(self.ecs, |mut events| {
events.send(DisconnectEvent { events.send(DisconnectEvent {
entity: self.player, entity: self.player,
reason: Some(p.reason), reason: Some(p.reason.clone()),
}); });
}); });
} }
pub fn finish_configuration(&mut self, p: ClientboundFinishConfiguration) { pub fn finish_configuration(&mut self, _p: &ClientboundFinishConfiguration) {
debug!("got FinishConfiguration packet: {p:?}"); debug!("got FinishConfiguration packet");
as_system::<(Commands, Query<&mut RawConnection>)>( as_system::<(Commands, Query<&mut RawConnection>)>(
self.ecs, self.ecs,
|(mut commands, mut query)| { |(mut commands, mut query)| {
let mut raw_conn = query.get_mut(self.player).unwrap(); let mut raw_conn = query.get_mut(self.player).unwrap();
raw_conn commands.trigger(SendConfigPacketEvent::new(
.write_packet(ServerboundFinishConfiguration) self.player,
.expect( ServerboundFinishConfiguration,
"we should be in the right state and encoding this packet shouldn't fail", ));
); raw_conn.state = ConnectionProtocol::Game;
raw_conn.set_state(ConnectionProtocol::Game);
// these components are added now that we're going to be in the Game state // these components are added now that we're going to be in the Game state
commands commands
@ -120,34 +117,33 @@ impl ConfigPacketHandler<'_> {
); );
} }
pub fn keep_alive(&mut self, p: ClientboundKeepAlive) { pub fn keep_alive(&mut self, p: &ClientboundKeepAlive) {
debug!( debug!(
"Got keep alive packet (in configuration) {p:?} for {:?}", "Got keep alive packet (in configuration) {p:?} for {:?}",
self.player self.player
); );
as_system::<(Query<&RawConnection>, EventWriter<_>)>(self.ecs, |(query, mut events)| { as_system::<(Commands, EventWriter<_>)>(self.ecs, |(mut commands, mut events)| {
let raw_conn = query.get(self.player).unwrap();
events.send(KeepAliveEvent { events.send(KeepAliveEvent {
entity: self.player, entity: self.player,
id: p.id, id: p.id,
}); });
raw_conn commands.trigger(SendConfigPacketEvent::new(
.write_packet(ServerboundKeepAlive { id: p.id }) self.player,
.unwrap(); ServerboundKeepAlive { id: p.id },
));
}); });
} }
pub fn ping(&mut self, p: ClientboundPing) { pub fn ping(&mut self, p: &ClientboundPing) {
debug!("Got ping packet (in configuration) {p:?}"); debug!("Got ping packet (in configuration) {p:?}");
as_system::<Commands>(self.ecs, |mut commands| { as_system::<Commands>(self.ecs, |mut commands| {
commands.trigger_targets(ConfigPingEvent(p), self.player); commands.trigger_targets(ConfigPingEvent(p.clone()), self.player);
}); });
} }
pub fn resource_pack_push(&mut self, p: ClientboundResourcePackPush) { pub fn resource_pack_push(&mut self, p: &ClientboundResourcePackPush) {
debug!("Got resource pack push packet {p:?}"); debug!("Got resource pack push packet {p:?}");
as_system::<EventWriter<_>>(self.ecs, |mut events| { as_system::<EventWriter<_>>(self.ecs, |mut events| {
@ -162,66 +158,64 @@ impl ConfigPacketHandler<'_> {
}); });
} }
pub fn resource_pack_pop(&mut self, p: ClientboundResourcePackPop) { pub fn resource_pack_pop(&mut self, p: &ClientboundResourcePackPop) {
debug!("Got resource pack pop packet {p:?}"); debug!("Got resource pack pop packet {p:?}");
} }
pub fn update_enabled_features(&mut self, p: ClientboundUpdateEnabledFeatures) { pub fn update_enabled_features(&mut self, p: &ClientboundUpdateEnabledFeatures) {
debug!("Got update enabled features packet {p:?}"); debug!("Got update enabled features packet {p:?}");
} }
pub fn update_tags(&mut self, _p: ClientboundUpdateTags) { pub fn update_tags(&mut self, _p: &ClientboundUpdateTags) {
debug!("Got update tags packet"); debug!("Got update tags packet");
} }
pub fn cookie_request(&mut self, p: ClientboundCookieRequest) { pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) {
debug!("Got cookie request packet {p:?}"); debug!("Got cookie request packet {p:?}");
as_system::<Query<&RawConnection>>(self.ecs, |query| { as_system::<Commands>(self.ecs, |mut commands| {
let raw_conn = query.get(self.player).unwrap(); commands.trigger(SendConfigPacketEvent::new(
self.player,
raw_conn ServerboundCookieResponse {
.write_packet(ServerboundCookieResponse { key: p.key.clone(),
key: p.key,
// cookies aren't implemented // cookies aren't implemented
payload: None, payload: None,
}) },
.unwrap(); ));
}); });
} }
pub fn reset_chat(&mut self, p: ClientboundResetChat) { pub fn reset_chat(&mut self, p: &ClientboundResetChat) {
debug!("Got reset chat packet {p:?}"); debug!("Got reset chat packet {p:?}");
} }
pub fn store_cookie(&mut self, p: ClientboundStoreCookie) { pub fn store_cookie(&mut self, p: &ClientboundStoreCookie) {
debug!("Got store cookie packet {p:?}"); debug!("Got store cookie packet {p:?}");
} }
pub fn transfer(&mut self, p: ClientboundTransfer) { pub fn transfer(&mut self, p: &ClientboundTransfer) {
debug!("Got transfer packet {p:?}"); debug!("Got transfer packet {p:?}");
} }
pub fn select_known_packs(&mut self, p: ClientboundSelectKnownPacks) { pub fn select_known_packs(&mut self, p: &ClientboundSelectKnownPacks) {
debug!("Got select known packs packet {p:?}"); debug!("Got select known packs packet {p:?}");
as_system::<Query<&RawConnection>>(self.ecs, |query| { as_system::<Commands>(self.ecs, |mut commands| {
let raw_conn = query.get(self.player).unwrap();
// resource pack management isn't implemented // resource pack management isn't implemented
raw_conn commands.trigger(SendConfigPacketEvent::new(
.write_packet(ServerboundSelectKnownPacks { self.player,
ServerboundSelectKnownPacks {
known_packs: vec![], known_packs: vec![],
}) },
.unwrap(); ));
}); });
} }
pub fn server_links(&mut self, p: ClientboundServerLinks) { pub fn server_links(&mut self, p: &ClientboundServerLinks) {
debug!("Got server links packet {p:?}"); debug!("Got server links packet {p:?}");
} }
pub fn custom_report_details(&mut self, p: ClientboundCustomReportDetails) { pub fn custom_report_details(&mut self, p: &ClientboundCustomReportDetails) {
debug!("Got custom report details packet {p:?}"); debug!("Got custom report details packet {p:?}");
} }
} }

View file

@ -1,33 +1,27 @@
use std::{ use std::sync::{Arc, Weak};
io::Cursor,
sync::{Arc, Weak},
};
use azalea_chat::FormattedText; use azalea_chat::FormattedText;
use azalea_core::resource_location::ResourceLocation; use azalea_core::resource_location::ResourceLocation;
use azalea_protocol::{ use azalea_protocol::packets::{
packets::{ Packet,
Packet, game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket},
game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket},
},
read::deserialize_packet,
}; };
use azalea_world::Instance; use azalea_world::Instance;
use bevy_ecs::prelude::*; use bevy_ecs::prelude::*;
use parking_lot::RwLock; use parking_lot::RwLock;
use tracing::{debug, error}; use tracing::error;
use uuid::Uuid; use uuid::Uuid;
use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection}; use crate::{PlayerInfo, client::InGameState, connection::RawConnection};
/// An event that's sent when we receive a packet. /// An event that's sent when we receive a packet.
/// ``` /// ```
/// # use azalea_client::packet::game::ReceivePacketEvent; /// # use azalea_client::packet::game::ReceiveGamePacketEvent;
/// # use azalea_protocol::packets::game::ClientboundGamePacket; /// # use azalea_protocol::packets::game::ClientboundGamePacket;
/// # use bevy_ecs::event::EventReader; /// # use bevy_ecs::event::EventReader;
/// ///
/// fn handle_packets(mut events: EventReader<ReceivePacketEvent>) { /// fn handle_packets(mut events: EventReader<ReceiveGamePacketEvent>) {
/// for ReceivePacketEvent { /// for ReceiveGamePacketEvent {
/// entity, /// entity,
/// packet, /// packet,
/// } in events.read() { /// } in events.read() {
@ -41,7 +35,7 @@ use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection};
/// } /// }
/// ``` /// ```
#[derive(Event, Debug, Clone)] #[derive(Event, Debug, Clone)]
pub struct ReceivePacketEvent { pub struct ReceiveGamePacketEvent {
/// The client entity that received the packet. /// The client entity that received the packet.
pub entity: Entity, pub entity: Entity,
/// The packet that was actually received. /// The packet that was actually received.
@ -67,7 +61,7 @@ pub fn handle_outgoing_packets_observer(
) { ) {
let event = trigger.event(); let event = trigger.event();
if let Ok((raw_connection, in_game_state)) = query.get_mut(event.sent_by) { if let Ok((mut raw_connection, in_game_state)) = query.get_mut(event.sent_by) {
if in_game_state.is_none() { if in_game_state.is_none() {
error!( error!(
"Tried to send a game packet {:?} while not in game state", "Tried to send a game packet {:?} while not in game state",
@ -76,8 +70,8 @@ pub fn handle_outgoing_packets_observer(
return; return;
} }
// debug!("Sending packet: {:?}", event.packet); // debug!("Sending game packet: {:?}", event.packet);
if let Err(e) = raw_connection.write_packet(event.packet.clone()) { if let Err(e) = raw_connection.write(event.packet.clone()) {
error!("Failed to send packet: {e}"); error!("Failed to send packet: {e}");
} }
} }
@ -91,61 +85,6 @@ pub fn handle_outgoing_packets(mut commands: Commands, mut events: EventReader<S
} }
} }
pub fn emit_receive_packet_events(
query: Query<(Entity, &RawConnection), With<InGameState>>,
mut packet_events: ResMut<Events<ReceivePacketEvent>>,
) {
// we manually clear and send the events at the beginning of each update
// since otherwise it'd cause issues with events in process_packet_events
// running twice
packet_events.clear();
for (player_entity, raw_connection) in &query {
let packets_lock = raw_connection.incoming_packet_queue();
let mut packets = packets_lock.lock();
if !packets.is_empty() {
let mut packets_read = 0;
for raw_packet in packets.iter() {
packets_read += 1;
let packet =
match deserialize_packet::<ClientboundGamePacket>(&mut Cursor::new(raw_packet))
{
Ok(packet) => packet,
Err(err) => {
error!("failed to read packet: {err:?}");
debug!("packet bytes: {raw_packet:?}");
continue;
}
};
let should_interrupt = packet_interrupts(&packet);
packet_events.send(ReceivePacketEvent {
entity: player_entity,
packet: Arc::new(packet),
});
if should_interrupt {
break;
}
}
packets.drain(0..packets_read);
}
}
}
/// Whether the given packet should make us stop deserializing the received
/// packets until next update.
///
/// This is used for packets that can switch the client state.
fn packet_interrupts(packet: &ClientboundGamePacket) -> bool {
matches!(
packet,
ClientboundGamePacket::StartConfiguration(_)
| ClientboundGamePacket::Disconnect(_)
| ClientboundGamePacket::Transfer(_)
)
}
/// A player joined the game (or more specifically, was added to the tab /// A player joined the game (or more specifically, was added to the tab
/// list of a local player). /// list of a local player).
#[derive(Event, Debug, Clone)] #[derive(Event, Debug, Clone)]

View file

@ -32,171 +32,150 @@ use crate::{
}, },
movement::{KnockbackEvent, KnockbackType}, movement::{KnockbackEvent, KnockbackType},
packet::as_system, packet::as_system,
raw_connection::RawConnection,
}; };
pub fn process_packet_events(ecs: &mut World) { pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundGamePacket) {
let mut events_owned = Vec::<(Entity, Arc<ClientboundGamePacket>)>::new(); let mut handler = GamePacketHandler { player, ecs };
{ // the order of these doesn't matter, that's decided by the protocol library
let mut system_state = SystemState::<EventReader<ReceivePacketEvent>>::new(ecs); declare_packet_handlers!(
let mut events = system_state.get_mut(ecs); ClientboundGamePacket,
for ReceivePacketEvent { packet,
entity: player_entity, handler,
packet, [
} in events.read() login,
{ set_chunk_cache_radius,
// we do this so `ecs` isn't borrowed for the whole loop chunk_batch_start,
events_owned.push((*player_entity, packet.clone())); chunk_batch_finished,
} custom_payload,
} change_difficulty,
commands,
for (player_entity, packet) in events_owned { player_abilities,
let mut handler = GamePacketHandler { set_cursor_item,
player: player_entity, update_tags,
ecs, disconnect,
}; update_recipes,
entity_event,
// the order of these doesn't matter, that's decided by the protocol library player_position,
declare_packet_handlers!( player_info_update,
ClientboundGamePacket, player_info_remove,
packet.as_ref(), set_chunk_cache_center,
handler, chunks_biomes,
[ light_update,
login, level_chunk_with_light,
set_chunk_cache_radius, add_entity,
chunk_batch_start, set_entity_data,
chunk_batch_finished, update_attributes,
custom_payload, set_entity_motion,
change_difficulty, set_entity_link,
commands, initialize_border,
player_abilities, set_time,
set_cursor_item, set_default_spawn_position,
update_tags, set_health,
disconnect, set_experience,
update_recipes, teleport_entity,
entity_event, update_advancements,
player_position, rotate_head,
player_info_update, move_entity_pos,
player_info_remove, move_entity_pos_rot,
set_chunk_cache_center, move_entity_rot,
chunks_biomes, keep_alive,
light_update, remove_entities,
level_chunk_with_light, player_chat,
add_entity, system_chat,
set_entity_data, disguised_chat,
update_attributes, sound,
set_entity_motion, level_event,
set_entity_link, block_update,
initialize_border, animate,
set_time, section_blocks_update,
set_default_spawn_position, game_event,
set_health, level_particles,
set_experience, server_data,
teleport_entity, set_equipment,
update_advancements, update_mob_effect,
rotate_head, award_stats,
move_entity_pos, block_changed_ack,
move_entity_pos_rot, block_destruction,
move_entity_rot, block_entity_data,
keep_alive, block_event,
remove_entities, boss_event,
player_chat, command_suggestions,
system_chat, container_set_content,
disguised_chat, container_set_data,
sound, container_set_slot,
level_event, container_close,
block_update, cooldown,
animate, custom_chat_completions,
section_blocks_update, delete_chat,
game_event, explode,
level_particles, forget_level_chunk,
server_data, horse_screen_open,
set_equipment, map_item_data,
update_mob_effect, merchant_offers,
award_stats, move_vehicle,
block_changed_ack, open_book,
block_destruction, open_screen,
block_entity_data, open_sign_editor,
block_event, ping,
boss_event, place_ghost_recipe,
command_suggestions, player_combat_end,
container_set_content, player_combat_enter,
container_set_data, player_combat_kill,
container_set_slot, player_look_at,
container_close, remove_mob_effect,
cooldown, resource_pack_push,
custom_chat_completions, resource_pack_pop,
delete_chat, respawn,
explode, start_configuration,
forget_level_chunk, entity_position_sync,
horse_screen_open, select_advancements_tab,
map_item_data, set_action_bar_text,
merchant_offers, set_border_center,
move_vehicle, set_border_lerp_size,
open_book, set_border_size,
open_screen, set_border_warning_delay,
open_sign_editor, set_border_warning_distance,
ping, set_camera,
place_ghost_recipe, set_display_objective,
player_combat_end, set_objective,
player_combat_enter, set_passengers,
player_combat_kill, set_player_team,
player_look_at, set_score,
remove_mob_effect, set_simulation_distance,
resource_pack_push, set_subtitle_text,
resource_pack_pop, set_title_text,
respawn, set_titles_animation,
start_configuration, clear_titles,
entity_position_sync, sound_entity,
select_advancements_tab, stop_sound,
set_action_bar_text, tab_list,
set_border_center, tag_query,
set_border_lerp_size, take_item_entity,
set_border_size, bundle_delimiter,
set_border_warning_delay, damage_event,
set_border_warning_distance, hurt_animation,
set_camera, ticking_state,
set_display_objective, ticking_step,
set_objective, reset_score,
set_passengers, cookie_request,
set_player_team, debug_sample,
set_score, pong_response,
set_simulation_distance, store_cookie,
set_subtitle_text, transfer,
set_title_text, move_minecart_along_track,
set_titles_animation, set_held_slot,
clear_titles, set_player_inventory,
sound_entity, projectile_power,
stop_sound, custom_report_details,
tab_list, server_links,
tag_query, player_rotation,
take_item_entity, recipe_book_add,
bundle_delimiter, recipe_book_remove,
damage_event, recipe_book_settings,
hurt_animation, test_instance_block_status,
ticking_state, ]
ticking_step, );
reset_score,
cookie_request,
debug_sample,
pong_response,
store_cookie,
transfer,
move_minecart_along_track,
set_held_slot,
set_player_inventory,
projectile_power,
custom_report_details,
server_links,
player_rotation,
recipe_book_add,
recipe_book_remove,
recipe_book_settings,
test_instance_block_status,
]
);
}
} }
pub struct GamePacketHandler<'a> { pub struct GamePacketHandler<'a> {
@ -342,7 +321,7 @@ impl GamePacketHandler<'_> {
client_information client_information
); );
commands.trigger(SendPacketEvent::new(self.player, commands.trigger(SendPacketEvent::new(self.player,
azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { information: client_information.clone() }, azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { client_information: client_information.clone() },
)); ));
}, },
); );
@ -1506,9 +1485,11 @@ impl GamePacketHandler<'_> {
pub fn start_configuration(&mut self, _p: &ClientboundStartConfiguration) { pub fn start_configuration(&mut self, _p: &ClientboundStartConfiguration) {
debug!("Got start configuration packet"); debug!("Got start configuration packet");
as_system::<(Query<&RawConnection>, Commands)>(self.ecs, |(query, mut commands)| { as_system::<Commands>(self.ecs, |mut commands| {
let raw_conn = query.get(self.player).unwrap(); commands.trigger(SendPacketEvent::new(
let _ = raw_conn.write_packet(ServerboundConfigurationAcknowledged); self.player,
ServerboundConfigurationAcknowledged,
));
commands commands
.entity(self.player) .entity(self.player)

View file

@ -1,114 +0,0 @@
// login packets aren't actually handled here because compression/encryption
// would make packet handling a lot messier
use std::{collections::HashSet, sync::Arc};
use azalea_protocol::packets::{
Packet,
login::{
ClientboundLoginPacket, ServerboundLoginPacket,
s_custom_query_answer::ServerboundCustomQueryAnswer,
},
};
use bevy_ecs::{prelude::*, system::SystemState};
use derive_more::{Deref, DerefMut};
use tokio::sync::mpsc;
use tracing::error;
// this struct is defined here anyways though so it's consistent with the other
// ones
/// An event that's sent when we receive a login packet from the server. Note
/// that if you want to handle this in a system, you must add
/// `.before(azalea::packet::login::process_packet_events)` to it
/// because that system clears the events.
#[derive(Event, Debug, Clone)]
pub struct LoginPacketEvent {
/// The client entity that received the packet.
pub entity: Entity,
/// The packet that was actually received.
pub packet: Arc<ClientboundLoginPacket>,
}
/// Event for sending a login packet to the server.
#[derive(Event)]
pub struct SendLoginPacketEvent {
pub entity: Entity,
pub packet: ServerboundLoginPacket,
}
impl SendLoginPacketEvent {
pub fn new(entity: Entity, packet: impl Packet<ServerboundLoginPacket>) -> Self {
let packet = packet.into_variant();
Self { entity, packet }
}
}
#[derive(Component)]
pub struct LoginSendPacketQueue {
pub tx: mpsc::UnboundedSender<ServerboundLoginPacket>,
}
/// A marker component for local players that are currently in the
/// `login` state.
#[derive(Component, Clone, Debug)]
pub struct InLoginState;
pub fn handle_send_packet_event(
mut send_packet_events: EventReader<SendLoginPacketEvent>,
mut query: Query<&mut LoginSendPacketQueue>,
) {
for event in send_packet_events.read() {
if let Ok(queue) = query.get_mut(event.entity) {
let _ = queue.tx.send(event.packet.clone());
} else {
error!("Sent SendPacketEvent for entity that doesn't have a LoginSendPacketQueue");
}
}
}
/// Plugins can add to this set if they want to handle a custom query packet
/// themselves. This component removed after the login state ends.
#[derive(Component, Default, Debug, Deref, DerefMut)]
pub struct IgnoreQueryIds(HashSet<u32>);
pub fn process_packet_events(ecs: &mut World) {
let mut events_owned = Vec::new();
let mut system_state: SystemState<ResMut<Events<LoginPacketEvent>>> = SystemState::new(ecs);
let mut events = system_state.get_mut(ecs);
for LoginPacketEvent {
entity: player_entity,
packet,
} in events.drain()
{
// we do this so `ecs` isn't borrowed for the whole loop
events_owned.push((player_entity, packet));
}
for (player_entity, packet) in events_owned {
#[allow(clippy::single_match)]
match packet.as_ref() {
ClientboundLoginPacket::CustomQuery(p) => {
let mut system_state: SystemState<(
EventWriter<SendLoginPacketEvent>,
Query<&IgnoreQueryIds>,
)> = SystemState::new(ecs);
let (mut send_packet_events, query) = system_state.get_mut(ecs);
let ignore_query_ids = query.get(player_entity).ok().map(|x| x.0.clone());
if let Some(ignore_query_ids) = ignore_query_ids {
if ignore_query_ids.contains(&p.transaction_id) {
continue;
}
}
send_packet_events.send(SendLoginPacketEvent::new(
player_entity,
ServerboundCustomQueryAnswer {
transaction_id: p.transaction_id,
data: None,
},
));
}
_ => {}
}
}
}

View file

@ -0,0 +1,86 @@
use std::sync::Arc;
use azalea_protocol::packets::{
Packet,
login::{
ClientboundCustomQuery, ClientboundHello, ClientboundLoginPacket, ServerboundLoginPacket,
},
};
use bevy_ecs::prelude::*;
use tracing::{debug, error};
use super::InLoginState;
use crate::{Account, connection::RawConnection};
#[derive(Event, Debug, Clone)]
pub struct ReceiveLoginPacketEvent {
/// The client entity that received the packet.
pub entity: Entity,
/// The packet that was actually received.
pub packet: Arc<ClientboundLoginPacket>,
}
#[derive(Event, Debug, Clone)]
pub struct ReceiveHelloEvent {
pub account: Account,
pub packet: ClientboundHello,
}
#[derive(Event, Debug, Clone)]
pub struct ReceiveCustomQueryEvent {
/// The client entity that received the packet.
pub entity: Entity,
pub packet: ClientboundCustomQuery,
/// A system can set this to `true` to make Azalea not reply to the query.
/// You must make sure you modify this before the
/// [`reply_to_custom_queries`] system runs.
///
/// [`reply_to_custom_queries`]: crate::login::reply_to_custom_queries
pub disabled: bool,
}
/// Event for sending a login packet to the server.
#[derive(Event, Debug, Clone)]
pub struct SendLoginPacketEvent {
pub sent_by: Entity,
pub packet: ServerboundLoginPacket,
}
impl SendLoginPacketEvent {
pub fn new(entity: Entity, packet: impl Packet<ServerboundLoginPacket>) -> Self {
let packet = packet.into_variant();
Self {
sent_by: entity,
packet,
}
}
}
pub fn handle_outgoing_packets_observer(
trigger: Trigger<SendLoginPacketEvent>,
mut query: Query<(&mut RawConnection, Option<&InLoginState>)>,
) {
let event = trigger.event();
if let Ok((mut raw_conn, in_login_state)) = query.get_mut(event.sent_by) {
if in_login_state.is_none() {
error!(
"Tried to send a login packet {:?} while not in login state",
event.packet
);
return;
}
debug!("Sending login packet: {:?}", event.packet);
if let Err(e) = raw_conn.write(event.packet.clone()) {
error!("Failed to send packet: {e}");
}
}
}
/// A system that converts [`SendLoginPacketEvent`] events into triggers so
/// they get received by [`handle_outgoing_packets_observer`].
pub fn handle_outgoing_packets(
mut commands: Commands,
mut events: EventReader<SendLoginPacketEvent>,
) {
for event in events.read() {
commands.trigger(event.clone());
}
}

View file

@ -0,0 +1,145 @@
// login packets aren't actually handled here because compression/encryption
// would make packet handling a lot messier
mod events;
use azalea_protocol::packets::{
ConnectionProtocol,
login::{
ClientboundCookieRequest, ClientboundCustomQuery, ClientboundHello,
ClientboundLoginCompression, ClientboundLoginDisconnect, ClientboundLoginFinished,
ClientboundLoginPacket, ServerboundCookieResponse, ServerboundLoginAcknowledged,
},
};
use bevy_ecs::prelude::*;
pub use events::*;
use tracing::{debug, error};
use super::as_system;
use crate::{
Account, GameProfileComponent, InConfigState, connection::RawConnection,
declare_packet_handlers, disconnect::DisconnectEvent,
};
pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundLoginPacket) {
let mut handler = LoginPacketHandler { player, ecs };
declare_packet_handlers!(
ClientboundLoginPacket,
packet,
handler,
[
hello,
login_disconnect,
login_finished,
login_compression,
custom_query,
cookie_request
]
);
}
/// A marker component for local players that are currently in the
/// `login` state.
#[derive(Component, Clone, Debug)]
pub struct InLoginState;
pub struct LoginPacketHandler<'a> {
pub ecs: &'a mut World,
pub player: Entity,
}
impl LoginPacketHandler<'_> {
pub fn hello(&mut self, p: &ClientboundHello) {
debug!("Got encryption request {p:?}");
as_system::<(Commands, Query<&Account>)>(self.ecs, |(mut commands, query)| {
let Ok(account) = query.get(self.player) else {
error!(
"Expected Account component to be present on player when receiving hello packet."
);
return;
};
commands.trigger_targets(
ReceiveHelloEvent {
account: account.clone(),
packet: p.clone(),
},
self.player,
);
});
}
pub fn login_disconnect(&mut self, p: &ClientboundLoginDisconnect) {
debug!("Got disconnect {:?}", p);
as_system::<EventWriter<_>>(self.ecs, |mut events| {
events.send(DisconnectEvent {
entity: self.player,
reason: Some(p.reason.clone()),
});
});
}
pub fn login_finished(&mut self, p: &ClientboundLoginFinished) {
debug!(
"Got profile {:?}. login is finished and we're now switching to the config state",
p.game_profile
);
as_system::<(Commands, Query<&mut RawConnection>)>(
self.ecs,
|(mut commands, mut query)| {
commands.trigger(SendLoginPacketEvent::new(
self.player,
ServerboundLoginAcknowledged,
));
commands
.entity(self.player)
.remove::<InLoginState>()
.insert(InConfigState)
.insert(GameProfileComponent(p.game_profile.clone()));
let mut conn = query
.get_mut(self.player)
.expect("RawConnection component should be present when receiving packets");
conn.state = ConnectionProtocol::Configuration;
},
);
}
pub fn login_compression(&mut self, p: &ClientboundLoginCompression) {
debug!("Got compression request {p:?}");
as_system::<Query<&mut RawConnection>>(self.ecs, |mut query| {
let mut conn = query
.get_mut(self.player)
.expect("RawConnection component should be present when receiving packets");
if let Some(net_conn) = &mut conn.net_conn() {
net_conn.set_compression_threshold(Some(p.compression_threshold as u32));
}
})
}
pub fn custom_query(&mut self, p: &ClientboundCustomQuery) {
debug!("Got custom query {p:?}");
as_system::<EventWriter<ReceiveCustomQueryEvent>>(self.ecs, |mut events| {
events.send(ReceiveCustomQueryEvent {
entity: self.player,
packet: p.clone(),
disabled: false,
});
});
}
pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) {
debug!("Got cookie request {p:?}");
as_system::<Commands>(self.ecs, |mut commands| {
commands.trigger(SendLoginPacketEvent::new(
self.player,
ServerboundCookieResponse {
key: p.key.clone(),
// cookies aren't implemented
payload: None,
},
));
});
}
}

View file

@ -1,17 +1,11 @@
use azalea_entity::metadata::Health; use azalea_entity::metadata::Health;
use bevy_app::{App, First, Plugin, PreUpdate, Update}; use bevy_app::{App, Plugin, Update};
use bevy_ecs::{ use bevy_ecs::{
prelude::*, prelude::*,
system::{SystemParam, SystemState}, system::{SystemParam, SystemState},
}; };
use self::{ use self::game::DeathEvent;
game::{
AddPlayerEvent, DeathEvent, InstanceLoadedEvent, KeepAliveEvent, RemovePlayerEvent,
ResourcePackEvent, UpdatePlayerEvent,
},
login::{LoginPacketEvent, SendLoginPacketEvent},
};
use crate::{chat::ChatReceivedEvent, events::death_listener}; use crate::{chat::ChatReceivedEvent, events::death_listener};
pub mod config; pub mod config;
@ -36,50 +30,38 @@ pub fn death_event_on_0_health(
impl Plugin for PacketPlugin { impl Plugin for PacketPlugin {
fn build(&self, app: &mut App) { fn build(&self, app: &mut App) {
app.add_systems( app.add_observer(game::handle_outgoing_packets_observer)
First, .add_observer(config::handle_outgoing_packets_observer)
( .add_observer(login::handle_outgoing_packets_observer)
game::emit_receive_packet_events, .add_systems(
config::emit_receive_config_packet_events, Update,
),
)
.add_systems(
PreUpdate,
(
game::process_packet_events,
config::process_packet_events,
login::handle_send_packet_event,
login::process_packet_events,
),
)
.add_observer(game::handle_outgoing_packets_observer)
.add_observer(config::handle_outgoing_packets_observer)
.add_systems(
Update,
(
( (
config::handle_outgoing_packets, (
game::handle_outgoing_packets, config::handle_outgoing_packets,
) game::handle_outgoing_packets,
.chain(), login::handle_outgoing_packets,
death_event_on_0_health.before(death_listener), )
), .chain(),
) death_event_on_0_health.before(death_listener),
// we do this instead of add_event so we can handle the events ourselves ),
.init_resource::<Events<game::ReceivePacketEvent>>() )
.init_resource::<Events<config::ReceiveConfigPacketEvent>>() .add_event::<game::ReceiveGamePacketEvent>()
.add_event::<game::SendPacketEvent>() .add_event::<config::ReceiveConfigPacketEvent>()
.add_event::<config::SendConfigPacketEvent>() .add_event::<login::ReceiveLoginPacketEvent>()
.add_event::<AddPlayerEvent>() //
.add_event::<RemovePlayerEvent>() .add_event::<game::SendPacketEvent>()
.add_event::<UpdatePlayerEvent>() .add_event::<config::SendConfigPacketEvent>()
.add_event::<ChatReceivedEvent>() .add_event::<login::SendLoginPacketEvent>()
.add_event::<DeathEvent>() //
.add_event::<KeepAliveEvent>() .add_event::<game::AddPlayerEvent>()
.add_event::<ResourcePackEvent>() .add_event::<game::RemovePlayerEvent>()
.add_event::<InstanceLoadedEvent>() .add_event::<game::UpdatePlayerEvent>()
.add_event::<LoginPacketEvent>() .add_event::<ChatReceivedEvent>()
.add_event::<SendLoginPacketEvent>(); .add_event::<game::DeathEvent>()
.add_event::<game::KeepAliveEvent>()
.add_event::<game::ResourcePackEvent>()
.add_event::<game::InstanceLoadedEvent>()
.add_event::<login::ReceiveCustomQueryEvent>();
} }
} }

View file

@ -1,208 +0,0 @@
use std::fmt::Debug;
use std::sync::Arc;
use azalea_protocol::{
connect::{RawReadConnection, RawWriteConnection},
packets::{ConnectionProtocol, Packet, ProtocolPacket},
read::ReadPacketError,
write::serialize_packet,
};
use bevy_ecs::prelude::*;
use parking_lot::Mutex;
use thiserror::Error;
use tokio::sync::mpsc::{
self,
error::{SendError, TrySendError},
};
use tracing::error;
/// A component for clients that can read and write packets to the server. This
/// works with raw bytes, so you'll have to serialize/deserialize packets
/// yourself. It will do the compression and encryption for you though.
#[derive(Component)]
pub struct RawConnection {
pub reader: RawConnectionReader,
pub writer: RawConnectionWriter,
/// Packets sent to this will be sent to the server.
/// A task that reads packets from the server. The client is disconnected
/// when this task ends.
pub read_packets_task: tokio::task::JoinHandle<()>,
/// A task that writes packets from the server.
pub write_packets_task: tokio::task::JoinHandle<()>,
pub connection_protocol: ConnectionProtocol,
}
#[derive(Clone)]
pub struct RawConnectionReader {
pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
pub run_schedule_sender: mpsc::Sender<()>,
}
#[derive(Clone)]
pub struct RawConnectionWriter {
pub outgoing_packets_sender: mpsc::UnboundedSender<Box<[u8]>>,
}
#[derive(Error, Debug)]
pub enum WritePacketError {
#[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
WrongState {
expected: ConnectionProtocol,
got: ConnectionProtocol,
},
#[error(transparent)]
Encoding(#[from] azalea_protocol::write::PacketEncodeError),
#[error(transparent)]
SendError {
#[from]
#[backtrace]
source: SendError<Box<[u8]>>,
},
}
impl RawConnection {
pub fn new(
run_schedule_sender: mpsc::Sender<()>,
connection_protocol: ConnectionProtocol,
raw_read_connection: RawReadConnection,
raw_write_connection: RawWriteConnection,
) -> Self {
let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel();
let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
let reader = RawConnectionReader {
incoming_packet_queue: incoming_packet_queue.clone(),
run_schedule_sender,
};
let writer = RawConnectionWriter {
outgoing_packets_sender,
};
let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection));
let write_packets_task = tokio::spawn(
writer
.clone()
.write_task(raw_write_connection, outgoing_packets_receiver),
);
Self {
reader,
writer,
read_packets_task,
write_packets_task,
connection_protocol,
}
}
pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> {
self.writer.outgoing_packets_sender.send(raw_packet)?;
Ok(())
}
/// Write the packet with the given state to the server.
///
/// # Errors
///
/// Returns an error if the packet is not valid for the current state, or if
/// encoding it failed somehow (like it's too big or something).
pub fn write_packet<P: ProtocolPacket + Debug>(
&self,
packet: impl Packet<P>,
) -> Result<(), WritePacketError> {
let packet = packet.into_variant();
let raw_packet = serialize_packet(&packet)?;
self.write_raw_packet(raw_packet)?;
Ok(())
}
/// Returns whether the connection is still alive.
pub fn is_alive(&self) -> bool {
!self.read_packets_task.is_finished()
}
pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Box<[u8]>>>> {
self.reader.incoming_packet_queue.clone()
}
pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) {
self.connection_protocol = connection_protocol;
}
}
impl RawConnectionReader {
/// Loop that reads from the connection and adds the packets to the queue +
/// runs the schedule.
pub async fn read_task(self, mut read_conn: RawReadConnection) {
fn log_for_error(error: &ReadPacketError) {
if !matches!(*error, ReadPacketError::ConnectionClosed) {
error!("Error reading packet from Client: {error:?}");
}
}
loop {
match read_conn.read().await {
Ok(raw_packet) => {
let mut incoming_packet_queue = self.incoming_packet_queue.lock();
incoming_packet_queue.push(raw_packet);
// this makes it so packets received at the same time are guaranteed to be
// handled in the same tick. this is also an attempt at making it so we can't
// receive any packets in the ticks/updates after being disconnected.
loop {
let raw_packet = match read_conn.try_read() {
Ok(p) => p,
Err(err) => {
log_for_error(&err);
return;
}
};
let Some(raw_packet) = raw_packet else { break };
incoming_packet_queue.push(raw_packet);
}
// tell the client to run all the systems
if self.run_schedule_sender.try_send(()) == Err(TrySendError::Closed(())) {
// the client was dropped
break;
}
}
Err(err) => {
log_for_error(&err);
return;
}
}
}
}
}
impl RawConnectionWriter {
/// Consume the [`ServerboundGamePacket`] queue and actually write the
/// packets to the server. It's like this so writing packets doesn't need to
/// be awaited.
///
/// [`ServerboundGamePacket`]: azalea_protocol::packets::game::ServerboundGamePacket
pub async fn write_task(
self,
mut write_conn: RawWriteConnection,
mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Box<[u8]>>,
) {
while let Some(raw_packet) = outgoing_packets_receiver.recv().await {
if let Err(err) = write_conn.write(&raw_packet).await {
error!("Disconnecting because we couldn't write a packet: {err}.");
break;
};
}
// receiver is automatically closed when it's dropped
}
}
impl Drop for RawConnection {
/// Stop every active task when this `RawConnection` is dropped.
fn drop(&mut self) {
self.read_packets_task.abort();
self.write_packets_task.abort();
}
}

View file

@ -1,4 +1,4 @@
use std::{fmt::Debug, sync::Arc, time::Duration}; use std::{fmt::Debug, sync::Arc};
use azalea_auth::game_profile::GameProfile; use azalea_auth::game_profile::GameProfile;
use azalea_buf::AzaleaWrite; use azalea_buf::AzaleaWrite;
@ -21,16 +21,14 @@ use azalea_world::palette::{PalettedContainer, PalettedContainerKind};
use azalea_world::{Chunk, Instance, MinecraftEntityId, Section}; use azalea_world::{Chunk, Instance, MinecraftEntityId, Section};
use bevy_app::App; use bevy_app::App;
use bevy_ecs::{prelude::*, schedule::ExecutorKind}; use bevy_ecs::{prelude::*, schedule::ExecutorKind};
use parking_lot::{Mutex, RwLock}; use parking_lot::RwLock;
use simdnbt::owned::{NbtCompound, NbtTag}; use simdnbt::owned::{NbtCompound, NbtTag};
use tokio::task::JoinHandle;
use tokio::{sync::mpsc, time::sleep};
use uuid::Uuid; use uuid::Uuid;
use crate::connection::RawConnection;
use crate::disconnect::DisconnectEvent; use crate::disconnect::DisconnectEvent;
use crate::{ use crate::{
ClientInformation, GameProfileComponent, InConfigState, InstanceHolder, LocalPlayerBundle, ClientInformation, GameProfileComponent, InConfigState, InstanceHolder, LocalPlayerBundle,
raw_connection::{RawConnection, RawConnectionReader, RawConnectionWriter},
}; };
/// A way to simulate a client in a server, used for some internal tests. /// A way to simulate a client in a server, used for some internal tests.
@ -40,16 +38,13 @@ pub struct Simulation {
// the runtime needs to be kept around for the tasks to be considered alive // the runtime needs to be kept around for the tasks to be considered alive
pub rt: tokio::runtime::Runtime, pub rt: tokio::runtime::Runtime,
pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
pub clear_outgoing_packets_receiver_task: JoinHandle<!>,
} }
impl Simulation { impl Simulation {
pub fn new(initial_connection_protocol: ConnectionProtocol) -> Self { pub fn new(initial_connection_protocol: ConnectionProtocol) -> Self {
let mut app = create_simulation_app(); let mut app = create_simulation_app();
let mut entity = app.world_mut().spawn_empty(); let mut entity = app.world_mut().spawn_empty();
let (player, clear_outgoing_packets_receiver_task, incoming_packet_queue, rt) = let (player, rt) =
create_local_player_bundle(entity.id(), ConnectionProtocol::Configuration); create_local_player_bundle(entity.id(), ConnectionProtocol::Configuration);
entity.insert(player); entity.insert(player);
@ -58,16 +53,16 @@ impl Simulation {
tick_app(&mut app); tick_app(&mut app);
// start in the config state // start in the config state
app.world_mut().entity_mut(entity).insert(InConfigState); app.world_mut().entity_mut(entity).insert((
InConfigState,
GameProfileComponent(GameProfile::new(
Uuid::from_u128(1234),
"azalea".to_string(),
)),
));
tick_app(&mut app); tick_app(&mut app);
let mut simulation = Self { let mut simulation = Self { app, entity, rt };
app,
entity,
rt,
incoming_packet_queue,
clear_outgoing_packets_receiver_task,
};
#[allow(clippy::single_match)] #[allow(clippy::single_match)]
match initial_connection_protocol { match initial_connection_protocol {
@ -95,9 +90,11 @@ impl Simulation {
simulation simulation
} }
pub fn receive_packet<P: ProtocolPacket + Debug>(&self, packet: impl Packet<P>) { pub fn receive_packet<P: ProtocolPacket + Debug>(&mut self, packet: impl Packet<P>) {
let buf = azalea_protocol::write::serialize_packet(&packet.into_variant()).unwrap(); let buf = azalea_protocol::write::serialize_packet(&packet.into_variant()).unwrap();
self.incoming_packet_queue.lock().push(buf); self.with_component_mut::<RawConnection>(|raw_conn| {
raw_conn.injected_clientbound_packets.push(buf);
});
} }
pub fn tick(&mut self) { pub fn tick(&mut self) {
@ -112,6 +109,14 @@ impl Simulation {
pub fn has_component<T: Component>(&self) -> bool { pub fn has_component<T: Component>(&self) -> bool {
self.app.world().get::<T>(self.entity).is_some() self.app.world().get::<T>(self.entity).is_some()
} }
pub fn with_component_mut<T: Component>(&mut self, f: impl FnOnce(&mut T)) {
f(&mut self
.app
.world_mut()
.entity_mut(self.entity)
.get_mut::<T>()
.unwrap());
}
pub fn resource<T: Resource + Clone>(&self) -> T { pub fn resource<T: Resource + Clone>(&self) -> T {
self.app.world().get_resource::<T>().unwrap().clone() self.app.world().get_resource::<T>().unwrap().clone()
} }
@ -143,70 +148,24 @@ impl Simulation {
fn create_local_player_bundle( fn create_local_player_bundle(
entity: Entity, entity: Entity,
connection_protocol: ConnectionProtocol, connection_protocol: ConnectionProtocol,
) -> ( ) -> (LocalPlayerBundle, tokio::runtime::Runtime) {
LocalPlayerBundle,
JoinHandle<!>,
Arc<Mutex<Vec<Box<[u8]>>>>,
tokio::runtime::Runtime,
) {
// unused since we'll trigger ticks ourselves // unused since we'll trigger ticks ourselves
let (run_schedule_sender, _run_schedule_receiver) = mpsc::channel(1);
let (outgoing_packets_sender, mut outgoing_packets_receiver) = mpsc::unbounded_channel();
let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
let reader = RawConnectionReader {
incoming_packet_queue: incoming_packet_queue.clone(),
run_schedule_sender,
};
let writer = RawConnectionWriter {
outgoing_packets_sender,
};
let rt = tokio::runtime::Runtime::new().unwrap(); let rt = tokio::runtime::Runtime::new().unwrap();
// the tasks can't die since that would make us send a DisconnectEvent let raw_connection = RawConnection::new_networkless(connection_protocol);
let read_packets_task = rt.spawn(async {
loop {
sleep(Duration::from_secs(60)).await;
}
});
let write_packets_task = rt.spawn(async {
loop {
sleep(Duration::from_secs(60)).await;
}
});
let clear_outgoing_packets_receiver_task = rt.spawn(async move {
loop {
let _ = outgoing_packets_receiver.recv().await;
}
});
let raw_connection = RawConnection {
reader,
writer,
read_packets_task,
write_packets_task,
connection_protocol,
};
let instance = Instance::default(); let instance = Instance::default();
let instance_holder = InstanceHolder::new(entity, Arc::new(RwLock::new(instance))); let instance_holder = InstanceHolder::new(entity, Arc::new(RwLock::new(instance)));
let local_player_bundle = LocalPlayerBundle { let local_player_bundle = LocalPlayerBundle {
raw_connection, raw_connection,
game_profile: GameProfileComponent(GameProfile::new(Uuid::nil(), "azalea".to_owned())),
client_information: ClientInformation::default(), client_information: ClientInformation::default(),
instance_holder, instance_holder,
metadata: PlayerMetadataBundle::default(), metadata: PlayerMetadataBundle::default(),
}; };
( (local_player_bundle, rt)
local_player_bundle,
clear_outgoing_packets_receiver_task,
incoming_packet_queue,
rt,
)
} }
fn create_simulation_app() -> App { fn create_simulation_app() -> App {

View file

@ -12,6 +12,8 @@ use simdnbt::owned::{NbtCompound, NbtTag};
#[test] #[test]
fn test_change_dimension_to_nether_and_back() { fn test_change_dimension_to_nether_and_back() {
let _ = tracing_subscriber::fmt().try_init();
generic_test_change_dimension_to_nether_and_back(true); generic_test_change_dimension_to_nether_and_back(true);
generic_test_change_dimension_to_nether_and_back(false); generic_test_change_dimension_to_nether_and_back(false);
} }

View file

@ -344,8 +344,9 @@ impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> { impl Connection<ClientboundLoginPacket, ServerboundLoginPacket> {
/// Set our compression threshold, i.e. the maximum size that a packet is /// Set our compression threshold, i.e. the maximum size that a packet is
/// allowed to be without getting compressed. If you set it to less than 0 /// allowed to be without getting compressed. Setting it to 0 means every
/// then compression gets disabled. /// packet will be compressed. If you set it to less than 0,
/// then compression is disabled.
pub fn set_compression_threshold(&mut self, threshold: i32) { pub fn set_compression_threshold(&mut self, threshold: i32) {
// if you pass a threshold of less than 0, compression is disabled // if you pass a threshold of less than 0, compression is disabled
if threshold >= 0 { if threshold >= 0 {

View file

@ -5,5 +5,5 @@ use crate::common::client_information::ClientInformation;
#[derive(Clone, Debug, AzBuf, ServerboundGamePacket)] #[derive(Clone, Debug, AzBuf, ServerboundGamePacket)]
pub struct ServerboundClientInformation { pub struct ServerboundClientInformation {
pub information: ClientInformation, pub client_information: ClientInformation,
} }

View file

@ -1,9 +0,0 @@
use azalea_buf::{AzBuf, UnsizedByteArray};
use azalea_protocol_macros::ServerboundLoginPacket;
#[derive(Clone, Debug, AzBuf, ServerboundLoginPacket)]
pub struct ServerboundCustomQuery {
#[var]
pub transaction_id: u32,
pub data: Option<UnsizedByteArray>,
}

View file

@ -285,6 +285,8 @@ where
buffer.get_mut().extend_from_slice(&bytes); buffer.get_mut().extend_from_slice(&bytes);
} }
} }
/// Read a packet from the stream, then if necessary decrypt it, decompress
/// it, and split it.
pub fn try_read_raw_packet<R>( pub fn try_read_raw_packet<R>(
stream: &mut R, stream: &mut R,
buffer: &mut Cursor<Vec<u8>>, buffer: &mut Cursor<Vec<u8>>,

View file

@ -54,6 +54,15 @@ pub async fn write_raw_packet<W>(
where where
W: AsyncWrite + Unpin + Send, W: AsyncWrite + Unpin + Send,
{ {
let network_packet = encode_to_network_packet(raw_packet, compression_threshold, cipher);
stream.write_all(&network_packet).await
}
pub fn encode_to_network_packet(
raw_packet: &[u8],
compression_threshold: Option<u32>,
cipher: &mut Option<Aes128CfbEnc>,
) -> Vec<u8> {
trace!("Writing raw packet: {raw_packet:?}"); trace!("Writing raw packet: {raw_packet:?}");
let mut raw_packet = raw_packet.to_vec(); let mut raw_packet = raw_packet.to_vec();
if let Some(threshold) = compression_threshold { if let Some(threshold) = compression_threshold {
@ -64,7 +73,7 @@ where
if let Some(cipher) = cipher { if let Some(cipher) = cipher {
azalea_crypto::encrypt_packet(cipher, &mut raw_packet); azalea_crypto::encrypt_packet(cipher, &mut raw_packet);
} }
stream.write_all(&raw_packet).await raw_packet
} }
pub fn compression_encoder( pub fn compression_encoder(

View file

@ -20,7 +20,7 @@ pub struct State {}
async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> { async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
if let Event::Chat(m) = event { if let Event::Chat(m) = event {
if let (Some(sender), content) = m.split_sender_and_content() { if let (Some(sender), content) = m.split_sender_and_content() {
if sender == bot.profile.name { if sender == bot.username() {
return Ok(()); // ignore our own messages return Ok(()); // ignore our own messages
} }
bot.chat(&content); bot.chat(&content);

View file

@ -28,7 +28,7 @@ struct State {
async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> {
if let Event::Chat(m) = event { if let Event::Chat(m) = event {
if m.sender() == Some(bot.profile.name.clone()) { if m.sender() == Some(bot.username()) {
return Ok(()); return Ok(());
}; };
if m.content() != "go" { if m.content() != "go" {

View file

@ -25,6 +25,12 @@ pub fn register(commands: &mut CommandDispatcher<Mutex<CommandSource>>) {
1 1
})); }));
commands.register(literal("disconnect").executes(|ctx: &Ctx| {
let source = ctx.source.lock();
source.bot.disconnect();
1
}));
commands.register(literal("whereami").executes(|ctx: &Ctx| { commands.register(literal("whereami").executes(|ctx: &Ctx| {
let mut source = ctx.source.lock(); let mut source = ctx.source.lock();
let Some(entity) = source.entity() else { let Some(entity) = source.entity() else {
@ -248,7 +254,7 @@ pub fn register(commands: &mut CommandDispatcher<Mutex<CommandSource>>) {
} }
} }
"bevy_ecs::event::collections::Events<azalea_client::packet::game::ReceivePacketEvent>" => { "bevy_ecs::event::collections::Events<azalea_client::packet::game::ReceivePacketEvent>" => {
let events = ecs.resource::<Events<game::ReceivePacketEvent>>(); let events = ecs.resource::<Events<game::ReceiveGamePacketEvent>>();
writeln!(report, "- Event count: {}", events.len()).unwrap(); writeln!(report, "- Event count: {}", events.len()).unwrap();
} }
"bevy_ecs::event::collections::Events<azalea_client::chunks::ReceiveChunkEvent>" => { "bevy_ecs::event::collections::Events<azalea_client::chunks::ReceiveChunkEvent>" => {

View file

@ -134,7 +134,7 @@ async fn handle(bot: Client, event: azalea::Event, state: State) -> anyhow::Resu
view_distance: 32, view_distance: 32,
..Default::default() ..Default::default()
}) })
.await?; .await;
if swarm.args.pathfinder_debug_particles { if swarm.args.pathfinder_debug_particles {
bot.ecs bot.ecs
.lock() .lock()

View file

@ -24,7 +24,7 @@ async fn main() {
async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> { async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> {
match event { match event {
Event::Chat(m) => { Event::Chat(m) => {
if m.sender() == Some(bot.profile.name) { if m.sender() == Some(bot.username()) {
return Ok(()); return Ok(());
}; };
if m.content() == "go" { if m.content() == "go" {

View file

@ -1,7 +1,7 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::fmt::Formatter; use std::fmt::Formatter;
use azalea_client::packet::game::ReceivePacketEvent; use azalea_client::packet::game::ReceiveGamePacketEvent;
use azalea_client::{ use azalea_client::{
Client, Client,
inventory::{CloseContainerEvent, ContainerClickEvent, Inventory}, inventory::{CloseContainerEvent, ContainerClickEvent, Inventory},
@ -234,7 +234,10 @@ impl ContainerHandle {
#[derive(Component, Debug)] #[derive(Component, Debug)]
pub struct WaitingForInventoryOpen; pub struct WaitingForInventoryOpen;
fn handle_menu_opened_event(mut commands: Commands, mut events: EventReader<ReceivePacketEvent>) { fn handle_menu_opened_event(
mut commands: Commands,
mut events: EventReader<ReceiveGamePacketEvent>,
) {
for event in events.read() { for event in events.read() {
if let ClientboundGamePacket::ContainerSetContent { .. } = event.packet.as_ref() { if let ClientboundGamePacket::ContainerSetContent { .. } = event.packet.as_ref() {
commands commands

View file

@ -55,8 +55,6 @@ pub struct Swarm {
bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>, bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
swarm_tx: mpsc::UnboundedSender<SwarmEvent>, swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
run_schedule_sender: mpsc::Sender<()>,
} }
/// Create a new [`Swarm`]. /// Create a new [`Swarm`].
@ -396,12 +394,9 @@ where
swarm_tx.send(SwarmEvent::Init).unwrap(); swarm_tx.send(SwarmEvent::Init).unwrap();
let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
let main_schedule_label = self.app.main().update_schedule.unwrap(); let main_schedule_label = self.app.main().update_schedule.unwrap();
let ecs_lock = let ecs_lock = start_ecs_runner(self.app);
start_ecs_runner(self.app, run_schedule_receiver, run_schedule_sender.clone());
let swarm = Swarm { let swarm = Swarm {
ecs_lock: ecs_lock.clone(), ecs_lock: ecs_lock.clone(),
@ -414,8 +409,6 @@ where
bots_tx, bots_tx,
swarm_tx: swarm_tx.clone(), swarm_tx: swarm_tx.clone(),
run_schedule_sender,
}; };
// run the main schedule so the startup systems run // run the main schedule so the startup systems run
@ -495,7 +488,8 @@ where
let Some(first_bot_state) = first_bot.query::<Option<&S>>(&mut ecs).cloned() else { let Some(first_bot_state) = first_bot.query::<Option<&S>>(&mut ecs).cloned() else {
error!( error!(
"the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.", "the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
first_bot.profile.name, first_bot.entity first_bot.username(),
first_bot.entity
); );
continue; continue;
}; };
@ -513,7 +507,8 @@ where
let Some(state) = bot.query::<Option<&S>>(&mut ecs).cloned() else { let Some(state) = bot.query::<Option<&S>>(&mut ecs).cloned() else {
error!( error!(
"one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.", "one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
bot.profile.name, bot.entity bot.username(),
bot.entity
); );
continue; continue;
}; };
@ -665,7 +660,6 @@ impl Swarm {
address: &address, address: &address,
resolved_address: &resolved_address, resolved_address: &resolved_address,
proxy: join_opts.proxy.clone(), proxy: join_opts.proxy.clone(),
run_schedule_sender: self.run_schedule_sender.clone(),
event_sender: Some(tx), event_sender: Some(tx),
}) })
.await?; .await?;