1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

initial broken implementation for ecs-only login

This commit is contained in:
mat 2025-04-11 21:45:40 +01:00
parent 2ef9860b82
commit 16811560d8
26 changed files with 1085 additions and 1120 deletions

View file

@ -10,6 +10,7 @@ pub struct GameProfile {
pub uuid: Uuid,
/// The username of the player.
pub name: String,
// this is an arc to make GameProfile cheaper to clone when the properties are big
pub properties: Arc<HashMap<String, ProfilePropertyValue>>,
}

View file

@ -39,7 +39,7 @@ impl AzaleaWriteVar for i32 {
let mut buffer = [0];
let mut value = *self;
if value == 0 {
buf.write_all(&buffer).unwrap();
buf.write_all(&buffer)?;
}
while value != 0 {
buffer[0] = (value & 0b0111_1111) as u8;

View file

@ -15,7 +15,7 @@ use uuid::Uuid;
/// To join a server using this account, use [`Client::join`] or
/// [`azalea::ClientBuilder`].
///
/// Note that this is also a component that our clients have.
/// This is also an ECS component that is present on our client entities.
///
/// # Examples
///

View file

@ -8,48 +8,42 @@ use std::{
time::{Duration, Instant},
};
use azalea_auth::{game_profile::GameProfile, sessionserver::ClientSessionServerError};
use azalea_auth::game_profile::GameProfile;
use azalea_chat::FormattedText;
use azalea_core::{
data_registry::ResolvableDataRegistry, position::Vec3, resource_location::ResourceLocation,
tick::GameTick,
};
use azalea_entity::{
EntityPlugin, EntityUpdateSet, EyeHeight, LocalEntity, Position,
EntityUpdateSet, EyeHeight, LocalEntity, Position,
indexing::{EntityIdIndex, EntityUuidIndex},
metadata::Health,
};
use azalea_physics::PhysicsPlugin;
use azalea_protocol::{
ServerAddress,
common::client_information::ClientInformation,
connect::{Connection, ConnectionError, Proxy},
packets::{
self, ClientIntention, ConnectionProtocol, PROTOCOL_VERSION, Packet,
config::{ClientboundConfigPacket, ServerboundConfigPacket},
game::ServerboundGamePacket,
game::{self, ServerboundGamePacket},
handshake::{
ClientboundHandshakePacket, ServerboundHandshakePacket,
s_intention::ServerboundIntention,
},
login::{
ClientboundLoginPacket, s_hello::ServerboundHello, s_key::ServerboundKey,
s_login_acknowledged::ServerboundLoginAcknowledged,
},
login::{ClientboundLoginPacket, ServerboundLoginPacket, s_hello::ServerboundHello},
},
resolver,
};
use azalea_world::{Instance, InstanceContainer, InstanceName, MinecraftEntityId, PartialInstance};
use bevy_app::{App, Plugin, PluginGroup, PluginGroupBuilder, PluginsState, Update};
use bevy_app::{App, Plugin, PluginsState, Update};
use bevy_ecs::{
bundle::Bundle,
component::Component,
entity::Entity,
schedule::{InternedScheduleLabel, IntoSystemConfigs, LogLevel, ScheduleBuildSettings},
system::{ResMut, Resource},
system::{Commands, ResMut, Resource},
world::World,
};
use bevy_time::TimePlugin;
use derive_more::Deref;
use parking_lot::{Mutex, RwLock};
use simdnbt::owned::NbtCompound;
@ -65,30 +59,25 @@ use tracing::{debug, error, info};
use uuid::Uuid;
use crate::{
Account, PlayerInfo,
attack::{self, AttackPlugin},
brand::BrandPlugin,
chat::ChatPlugin,
chunks::{ChunkBatchInfo, ChunksPlugin},
disconnect::{DisconnectEvent, DisconnectPlugin},
events::{Event, EventsPlugin, LocalPlayerEvents},
interact::{CurrentSequenceNumber, InteractPlugin},
inventory::{Inventory, InventoryPlugin},
Account, DefaultPlugins, PlayerInfo,
attack::{self},
chunks::ChunkBatchInfo,
connection::RawConnection,
disconnect::DisconnectEvent,
events::{Event, LocalPlayerEvents},
interact::CurrentSequenceNumber,
inventory::Inventory,
local_player::{
GameProfileComponent, Hunger, InstanceHolder, PermissionLevel, PlayerAbilities, TabList,
},
mining::{self, MiningPlugin},
movement::{LastSentLookDirection, MovementPlugin, PhysicsState},
mining::{self},
movement::{LastSentLookDirection, PhysicsState},
packet::{
PacketPlugin,
login::{self, InLoginState, LoginSendPacketQueue},
as_system,
game::SendPacketEvent,
login::{InLoginState, SendLoginPacketEvent},
},
player::retroactively_add_game_profile_component,
pong::PongPlugin,
raw_connection::RawConnection,
respawn::RespawnPlugin,
task_pool::TaskPoolPlugin,
tick_end::TickEndPlugin,
};
/// `Client` has the things that a user interacting with the library will want.
@ -102,15 +91,6 @@ use crate::{
/// [`azalea::ClientBuilder`]: https://docs.rs/azalea/latest/azalea/struct.ClientBuilder.html
#[derive(Clone)]
pub struct Client {
/// The [`GameProfile`] for our client. This contains your username, UUID,
/// and skin data.
///
/// This is immutable; the server cannot change it. To get the username and
/// skin the server chose for you, get your player from the [`TabList`]
/// component.
///
/// This as also available from the ECS as [`GameProfileComponent`].
pub profile: GameProfile,
/// The entity for this client in the ECS.
pub entity: Entity,
@ -134,6 +114,8 @@ pub enum JoinError {
ReadPacket(#[from] Box<azalea_protocol::read::ReadPacketError>),
#[error("{0}")]
Io(#[from] io::Error),
#[error("Failed to encrypt the challenge from the server for {0:?}")]
EncryptionError(packets::login::ClientboundHello),
#[error("{0}")]
SessionServer(#[from] azalea_auth::sessionserver::ClientSessionServerError),
#[error("The given address could not be parsed into a ServerAddress")]
@ -192,13 +174,11 @@ impl Client {
/// You should only use this if you want to change these fields from the
/// defaults, otherwise use [`Client::join`].
pub fn new(
profile: GameProfile,
entity: Entity,
ecs: Arc<Mutex<World>>,
run_schedule_sender: mpsc::Sender<()>,
) -> Self {
Self {
profile,
// default our id to 0, it'll be set later
entity,
@ -294,8 +274,19 @@ impl Client {
entity
};
// add the Account to the entity now so plugins can access it earlier
ecs.entity_mut(entity).insert(account.to_owned());
let mut entity_mut = ecs.entity_mut(entity);
entity_mut.insert((
// add the Account to the entity now so plugins can access it earlier
account.to_owned(),
// localentity is always present for our clients, even if we're not actually logged
// in
LocalEntity,
));
if let Some(event_sender) = event_sender {
// this is optional so we don't leak memory in case the user doesn't want to
// handle receiving packets
entity_mut.insert(LocalPlayerEvents(event_sender));
}
entity
};
@ -305,59 +296,43 @@ impl Client {
} else {
Connection::new(resolved_address).await?
};
let (conn, game_profile) =
Self::handshake(ecs_lock.clone(), entity, conn, account, address).await?;
// note that we send the proper packets in
// crate::configuration::handle_in_configuration_state
let conn = Self::handshake(ecs_lock.clone(), entity, conn, account, address).await?;
let (read_conn, write_conn) = conn.into_split();
let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
// we did the handshake, so now we're connected to the server
// insert the client into the ecs so it finishes logging in
{
let mut ecs = ecs_lock.lock();
let mut ecs = ecs_lock.lock();
let instance = Instance::default();
let instance_holder = crate::local_player::InstanceHolder::new(
entity,
// default to an empty world, it'll be set correctly later when we
// get the login packet
Arc::new(RwLock::new(instance)),
);
// we got the ConfigurationConnection, so the client is now connected :)
let client = Client::new(
game_profile.clone(),
entity,
ecs_lock.clone(),
run_schedule_sender.clone(),
);
let instance = Instance::default();
let instance_holder = crate::local_player::InstanceHolder::new(
entity,
// default to an empty world, it'll be set correctly later when we
// get the login packet
Arc::new(RwLock::new(instance)),
);
let mut entity = ecs.entity_mut(entity);
entity.insert((
// these stay when we switch to the game state
LocalPlayerBundle {
raw_connection: RawConnection::new(
run_schedule_sender,
ConnectionProtocol::Configuration,
read_conn,
write_conn,
),
game_profile: GameProfileComponent(game_profile),
client_information: crate::ClientInformation::default(),
instance_holder,
metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
},
InConfigState,
// this component is never removed
LocalEntity,
));
if let Some(event_sender) = event_sender {
// this is optional so we don't leak memory in case the user
entity.insert(LocalPlayerEvents(event_sender));
let mut entity = ecs.entity_mut(entity);
entity.insert((
// these stay when we switch to the game state
LocalPlayerBundle {
raw_connection: RawConnection::new(
read_conn,
write_conn,
ConnectionProtocol::Login,
),
client_information: crate::ClientInformation::default(),
instance_holder,
metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
},
InConfigState,
// this component is never removed
LocalEntity,
));
}
let client = Client::new(entity, ecs_lock.clone(), run_schedule_sender.clone());
Ok(client)
}
@ -372,13 +347,7 @@ impl Client {
mut conn: Connection<ClientboundHandshakePacket, ServerboundHandshakePacket>,
account: &Account,
address: &ServerAddress,
) -> Result<
(
Connection<ClientboundConfigPacket, ServerboundConfigPacket>,
GameProfile,
),
JoinError,
> {
) -> Result<Connection<ClientboundLoginPacket, ServerboundLoginPacket>, JoinError> {
// handshake
conn.write(ServerboundIntention {
protocol_version: PROTOCOL_VERSION,
@ -387,147 +356,34 @@ impl Client {
intention: ClientIntention::Login,
})
.await?;
let mut conn = conn.login();
let conn = conn.login();
// this makes it so plugins can send an `SendLoginPacketEvent` event to the ecs
// and we'll send it to the server
let (ecs_packets_tx, mut ecs_packets_rx) = mpsc::unbounded_channel();
ecs_lock.lock().entity_mut(entity).insert((
LoginSendPacketQueue { tx: ecs_packets_tx },
crate::packet::login::IgnoreQueryIds::default(),
InLoginState,
));
// login
conn.write(ServerboundHello {
name: account.username.clone(),
// TODO: pretty sure this should generate an offline-mode uuid instead of just
// Uuid::default()
profile_id: account.uuid.unwrap_or_default(),
})
.await?;
let (conn, profile) = loop {
let packet = tokio::select! {
packet = conn.read() => packet?,
Some(packet) = ecs_packets_rx.recv() => {
// write this packet to the server
conn.write(packet).await?;
continue;
}
};
ecs_lock.lock().send_event(login::LoginPacketEvent {
as_system::<Commands>(&mut ecs_lock.lock(), |mut commands| {
commands.entity(entity).insert((
crate::packet::login::IgnoreQueryIds::default(),
InLoginState,
));
commands.trigger(SendLoginPacketEvent::new(
entity,
packet: Arc::new(packet.clone()),
});
ServerboundHello {
name: account.username.clone(),
// TODO: pretty sure this should generate an offline-mode uuid instead of just
// Uuid::default()
profile_id: account.uuid.unwrap_or_default(),
},
))
});
match packet {
ClientboundLoginPacket::Hello(p) => {
debug!("Got encryption request");
let Ok(e) = azalea_crypto::encrypt(&p.public_key, &p.challenge) else {
error!("Failed to encrypt the challenge from the server for {p:?}");
continue;
};
if let Some(access_token) = &account.access_token {
// keep track of the number of times we tried
// authenticating so we can give up after too many
let mut attempts: usize = 1;
while let Err(e) = {
let access_token = access_token.lock().clone();
conn.authenticate(
&access_token,
&account
.uuid
.expect("Uuid must be present if access token is present."),
e.secret_key,
&p,
)
.await
} {
if attempts >= 2 {
// if this is the second attempt and we failed
// both times, give up
return Err(e.into());
}
if matches!(
e,
ClientSessionServerError::InvalidSession
| ClientSessionServerError::ForbiddenOperation
) {
// uh oh, we got an invalid session and have
// to reauthenticate now
account.refresh().await?;
} else {
return Err(e.into());
}
attempts += 1;
}
}
conn.write(ServerboundKey {
key_bytes: e.encrypted_public_key,
encrypted_challenge: e.encrypted_challenge,
})
.await?;
conn.set_encryption_key(e.secret_key);
}
ClientboundLoginPacket::LoginCompression(p) => {
debug!("Got compression request {:?}", p.compression_threshold);
conn.set_compression_threshold(p.compression_threshold);
}
ClientboundLoginPacket::LoginFinished(p) => {
debug!(
"Got profile {:?}. handshake is finished and we're now switching to the configuration state",
p.game_profile
);
conn.write(ServerboundLoginAcknowledged {}).await?;
break (conn.config(), p.game_profile);
}
ClientboundLoginPacket::LoginDisconnect(p) => {
debug!("Got disconnect {:?}", p);
return Err(JoinError::Disconnect { reason: p.reason });
}
ClientboundLoginPacket::CustomQuery(p) => {
debug!("Got custom query {:?}", p);
// replying to custom query is done in
// packet::login::process_packet_events
}
ClientboundLoginPacket::CookieRequest(p) => {
debug!("Got cookie request {:?}", p);
conn.write(packets::login::ServerboundCookieResponse {
key: p.key,
// cookies aren't implemented
payload: None,
})
.await?;
}
}
};
ecs_lock
.lock()
.entity_mut(entity)
.remove::<login::IgnoreQueryIds>()
.remove::<LoginSendPacketQueue>()
.remove::<InLoginState>();
Ok((conn, profile))
Ok(conn)
}
/// Write a packet directly to the server.
pub fn write_packet(
&self,
packet: impl Packet<ServerboundGamePacket>,
) -> Result<(), crate::raw_connection::WritePacketError> {
pub fn write_packet(&self, packet: impl Packet<ServerboundGamePacket>) {
let packet = packet.into_variant();
self.raw_connection_mut(&mut self.ecs.lock())
.write_packet(packet)
self.ecs
.lock()
.commands()
.trigger(SendPacketEvent::new(self.entity, packet));
}
/// Disconnect this client from the server by ending all tasks.
@ -694,10 +550,7 @@ impl Client {
/// # Ok(())
/// # }
/// ```
pub async fn set_client_information(
&self,
client_information: ClientInformation,
) -> Result<(), crate::raw_connection::WritePacketError> {
pub async fn set_client_information(&self, client_information: ClientInformation) {
{
let mut ecs = self.ecs.lock();
let mut client_information_mut = self.query::<&mut ClientInformation>(&mut ecs);
@ -709,10 +562,10 @@ impl Client {
"Sending client information (already logged in): {:?}",
client_information
);
self.write_packet(azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { information: client_information.clone() })?;
self.write_packet(game::s_client_information::ServerboundClientInformation {
client_information,
});
}
Ok(())
}
}
@ -760,14 +613,14 @@ impl Client {
/// This is a shortcut for
/// `bot.component::<GameProfileComponent>().name.to_owned()`.
pub fn username(&self) -> String {
self.component::<GameProfileComponent>().name.to_owned()
self.profile().name.to_owned()
}
/// Get the Minecraft UUID of this client.
///
/// This is a shortcut for `bot.component::<GameProfileComponent>().uuid`.
pub fn uuid(&self) -> Uuid {
self.component::<GameProfileComponent>().uuid
self.profile().uuid
}
/// Get a map of player UUIDs to their information in the tab list.
@ -777,6 +630,19 @@ impl Client {
(*self.component::<TabList>()).clone()
}
/// Returns the [`GameProfile`] for our client. This contains your username,
/// UUID, and skin data.
///
/// These values are set by the server upon login, which means they might
/// not match up with your actual game profile. Also, note that the username
/// and skin that gets displayed in-game will actually be the ones from
/// the tab list, which you can get from [`Self::tab_list`].
///
/// This as also available from the ECS as [`GameProfileComponent`].
pub fn profile(&self) -> GameProfile {
(*self.component::<GameProfileComponent>()).clone()
}
/// A convenience function to get the Minecraft Uuid of a player by their
/// username, if they're present in the tab list.
///
@ -857,15 +723,14 @@ impl Client {
}
}
/// The bundle of components that's shared when we're either in the
/// `configuration` or `game` state.
/// A bundle of components that's inserted right when we switch to the `login`
/// state and stay present on our clients until we disconnect.
///
/// For the components that are only present in the `game` state, see
/// [`JoinedClientBundle`].
#[derive(Bundle)]
pub struct LocalPlayerBundle {
pub raw_connection: RawConnection,
pub game_profile: GameProfileComponent,
pub client_information: ClientInformation,
pub instance_holder: InstanceHolder,
@ -1056,40 +921,3 @@ impl Plugin for AmbiguityLoggerPlugin {
});
}
}
/// This plugin group will add all the default plugins necessary for Azalea to
/// work.
pub struct DefaultPlugins;
impl PluginGroup for DefaultPlugins {
fn build(self) -> PluginGroupBuilder {
#[allow(unused_mut)]
let mut group = PluginGroupBuilder::start::<Self>()
.add(AmbiguityLoggerPlugin)
.add(TimePlugin)
.add(PacketPlugin)
.add(AzaleaPlugin)
.add(EntityPlugin)
.add(PhysicsPlugin)
.add(EventsPlugin)
.add(TaskPoolPlugin::default())
.add(InventoryPlugin)
.add(ChatPlugin)
.add(DisconnectPlugin)
.add(MovementPlugin)
.add(InteractPlugin)
.add(RespawnPlugin)
.add(MiningPlugin)
.add(AttackPlugin)
.add(ChunksPlugin)
.add(TickEndPlugin)
.add(BrandPlugin)
.add(TickBroadcastPlugin)
.add(PongPlugin);
#[cfg(feature = "log")]
{
group = group.add(bevy_log::LogPlugin::default());
}
group
}
}

View file

@ -15,7 +15,6 @@ mod local_player;
pub mod ping;
mod player;
mod plugins;
pub mod raw_connection;
#[doc(hidden)]
pub mod test_simulation;
@ -23,8 +22,8 @@ pub mod test_simulation;
pub use account::{Account, AccountOpts};
pub use azalea_protocol::common::client_information::ClientInformation;
pub use client::{
Client, DefaultPlugins, InConfigState, InGameState, JoinError, JoinedClientBundle,
LocalPlayerBundle, StartClientOpts, TickBroadcast, start_ecs_runner,
Client, InConfigState, InGameState, JoinError, JoinedClientBundle, LocalPlayerBundle,
StartClientOpts, TickBroadcast, start_ecs_runner,
};
pub use events::Event;
pub use local_player::{GameProfileComponent, Hunger, InstanceHolder, TabList};

View file

@ -0,0 +1,340 @@
use std::{fmt::Debug, io::Cursor, mem, sync::Arc};
use azalea_crypto::Aes128CfbEnc;
use azalea_protocol::{
connect::{RawReadConnection, RawWriteConnection},
packets::{
ConnectionProtocol, Packet, ProtocolPacket, config::ClientboundConfigPacket,
game::ClientboundGamePacket, login::ClientboundLoginPacket,
},
read::{ReadPacketError, deserialize_packet},
write::serialize_packet,
};
use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use bevy_tasks::{IoTaskPool, futures_lite::future};
use thiserror::Error;
use tokio::{
io::AsyncWriteExt,
net::tcp::OwnedWriteHalf,
sync::mpsc::{self},
};
use tracing::{debug, error};
use super::packet::{
config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
};
use crate::packet::{config, game, login};
pub struct ConnectionPlugin;
impl Plugin for ConnectionPlugin {
fn build(&self, app: &mut App) {
app.add_systems(PreUpdate, read_packets);
}
}
pub fn read_packets(ecs: &mut World) {
// receive_game_packet_events: EventWriter<ReceiveGamePacketEvent>,
let mut query = ecs.query::<(Entity, &mut RawConnection)>();
let mut entities_handling_packets = Vec::new();
let mut entities_with_injected_packets = Vec::new();
for (entity, mut raw_conn) in query.iter_mut(ecs) {
let state = raw_conn.state;
if !raw_conn.injected_clientbound_packets.is_empty() {
entities_with_injected_packets.push((
entity,
state,
mem::take(&mut raw_conn.injected_clientbound_packets),
));
}
let Some(net_conn) = raw_conn.network.take() else {
// means it's a networkless connection
continue;
};
entities_handling_packets.push((entity, state, net_conn));
}
let mut queued_packet_events = QueuedPacketEvents::default();
// handle injected packets, see the comment on
// RawConnection::injected_clientbound_packets for more info
for (entity, mut state, raw_packets) in entities_with_injected_packets {
for raw_packet in raw_packets {
handle_raw_packet(
ecs,
&raw_packet,
entity,
&mut state,
None,
&mut queued_packet_events,
)
.unwrap();
// update the state and for the client
let (_, mut raw_conn_component) = query.get_mut(ecs, entity).unwrap();
raw_conn_component.state = state;
}
}
// we pass the mutable state and net_conn into the handlers so they're allowed
// to mutate it
for (entity, mut state, mut net_conn) in entities_handling_packets {
loop {
match net_conn.reader.try_read() {
Ok(Some(raw_packet)) => {
let raw_packet = Arc::<[u8]>::from(raw_packet);
if let Err(e) = handle_raw_packet(
ecs,
&raw_packet,
entity,
&mut state,
Some(&mut net_conn),
&mut queued_packet_events,
) {
error!("Error reading packet: {e}");
}
}
Ok(None) => {
// no packets available
break;
}
Err(err) => {
log_for_error(&err);
break;
}
}
}
// this needs to be done at some point every update, so we do it here right
// after the handlers are called
net_conn.poll_writer();
// update the state and network connections for the client
let (_, mut raw_conn_component) = query.get_mut(ecs, entity).unwrap();
raw_conn_component.state = state;
raw_conn_component.network = Some(net_conn);
}
queued_packet_events.send_events(ecs);
}
#[derive(Default)]
pub struct QueuedPacketEvents {
login: Vec<ReceiveLoginPacketEvent>,
config: Vec<ReceiveConfigPacketEvent>,
game: Vec<ReceiveGamePacketEvent>,
}
impl QueuedPacketEvents {
fn send_events(&mut self, ecs: &mut World) {
ecs.send_event_batch(self.login.drain(..));
ecs.send_event_batch(self.config.drain(..));
ecs.send_event_batch(self.game.drain(..));
}
}
fn log_for_error(error: &ReadPacketError) {
if !matches!(*error, ReadPacketError::ConnectionClosed) {
error!("Error reading packet from Client: {error:?}");
}
}
/// The client's connection to the server.
#[derive(Component)]
pub struct RawConnection {
/// The network connection to the server.
///
/// This isn't guaranteed to be present, for example during the main packet
/// handlers or at all times during tests.
///
/// You shouldn't rely on this. Instead, use the events for sending packets
/// like [`SendPacketEvent`](crate::packet::game::SendPacketEvent) /
/// [`SendConfigPacketEvent`](crate::packet::config::SendConfigPacketEvent)
/// / [`SendLoginPacketEvent`](crate::packet::login::SendLoginPacketEvent).
///
/// To check if we haven't disconnected from the server, use
/// [`Self::is_alive`].
network: Option<NetworkConnection>,
pub state: ConnectionProtocol,
is_alive: bool,
/// This exists for internal testing purposes and probably shouldn't be used
/// for normal bots. It's basically a way to make our client think it
/// received a packet from the server without needing to interact with the
/// network.
pub injected_clientbound_packets: Vec<Box<[u8]>>,
}
impl RawConnection {
pub fn new(
reader: RawReadConnection,
writer: RawWriteConnection,
state: ConnectionProtocol,
) -> Self {
let task_pool = IoTaskPool::get();
let (network_packet_writer_tx, network_packet_writer_rx) =
mpsc::unbounded_channel::<Box<[u8]>>();
let writer_task =
task_pool.spawn(write_task(network_packet_writer_rx, writer.write_stream));
let mut conn = Self::new_networkless(state);
conn.network = Some(NetworkConnection {
reader,
enc_cipher: writer.enc_cipher,
network_packet_writer_tx,
writer_task,
});
conn
}
pub fn new_networkless(state: ConnectionProtocol) -> Self {
Self {
network: None,
state,
is_alive: true,
injected_clientbound_packets: Vec::new(),
}
}
pub fn is_alive(&self) -> bool {
self.is_alive
}
/// Write a packet to the server without emitting any events.
///
/// This is called by the handlers for [`SendPacketEvent`],
/// [`SendConfigPacketEvent`], and [`SendLoginPacketEvent`].
///
/// [`SendPacketEvent`]: crate::packet::game::SendPacketEvent
/// [`SendConfigPacketEvent`]: crate::packet::config::SendConfigPacketEvent
/// [`SendLoginPacketEvent`]: crate::packet::login::SendLoginPacketEvent
pub fn write<P: ProtocolPacket + Debug>(
&mut self,
packet: impl Packet<P>,
) -> Result<(), WritePacketError> {
if let Some(network) = &mut self.network {
let packet = packet.into_variant();
let raw_packet = serialize_packet(&packet)?;
network.write_raw(&raw_packet)?;
}
Ok(())
}
pub fn net_conn(&mut self) -> Option<&mut NetworkConnection> {
self.network.as_mut()
}
}
pub fn handle_raw_packet(
ecs: &mut World,
raw_packet: &[u8],
entity: Entity,
state: &mut ConnectionProtocol,
net_conn: Option<&mut NetworkConnection>,
queued_packet_events: &mut QueuedPacketEvents,
) -> Result<(), Box<ReadPacketError>> {
let stream = &mut Cursor::new(raw_packet);
match state {
ConnectionProtocol::Handshake => {
unreachable!()
}
ConnectionProtocol::Game => {
let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
game::process_packet(ecs, entity, packet.as_ref());
queued_packet_events
.game
.push(ReceiveGamePacketEvent { entity, packet });
}
ConnectionProtocol::Status => {
unreachable!()
}
ConnectionProtocol::Login => {
let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
login::process_packet(ecs, entity, &packet, state, net_conn);
queued_packet_events
.login
.push(ReceiveLoginPacketEvent { entity, packet });
}
ConnectionProtocol::Configuration => {
let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
config::process_packet(ecs, entity, &packet);
queued_packet_events
.config
.push(ReceiveConfigPacketEvent { entity, packet });
}
};
Ok(())
}
pub struct NetworkConnection {
reader: RawReadConnection,
// compression threshold is in the RawReadConnection
pub enc_cipher: Option<Aes128CfbEnc>,
pub writer_task: bevy_tasks::Task<()>,
/// A queue of raw TCP packets to send. These will not be modified further,
/// they should already be serialized and encrypted and everything before
/// being added here.
network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
}
impl NetworkConnection {
pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
let network_packet = azalea_protocol::write::encode_to_network_packet(
raw_packet,
self.reader.compression_threshold,
&mut self.enc_cipher,
);
self.network_packet_writer_tx
.send(network_packet.into_boxed_slice())?;
Ok(())
}
pub fn poll_writer(&mut self) {
future::block_on(future::poll_once(&mut self.writer_task));
}
pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
self.reader.compression_threshold = threshold;
}
/// Set the encryption key that is used to encrypt and decrypt packets. It's
/// the same for both reading and writing.
pub fn set_encryption_key(&mut self, key: [u8; 16]) {
let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
self.reader.dec_cipher = Some(dec_cipher);
self.enc_cipher = Some(enc_cipher);
}
}
async fn write_task(
mut network_packet_writer_rx: mpsc::UnboundedReceiver<Box<[u8]>>,
mut write_half: OwnedWriteHalf,
) {
while let Some(network_packet) = network_packet_writer_rx.recv().await {
if let Err(e) = write_half.write_all(&network_packet).await {
debug!("Error writing packet to server: {e}");
break;
};
}
}
#[derive(Error, Debug)]
pub enum WritePacketError {
#[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
WrongState {
expected: ConnectionProtocol,
got: ConnectionProtocol,
},
#[error(transparent)]
Encoding(#[from] azalea_protocol::write::PacketEncodeError),
#[error(transparent)]
SendError {
#[from]
#[backtrace]
source: mpsc::error::SendError<Box<[u8]>>,
},
}

View file

@ -16,8 +16,8 @@ use derive_more::Deref;
use tracing::trace;
use crate::{
InstanceHolder, client::JoinedClientBundle, events::LocalPlayerEvents,
raw_connection::RawConnection,
InstanceHolder, client::JoinedClientBundle, connection::RawConnection,
events::LocalPlayerEvents,
};
pub struct DisconnectPlugin;

View file

@ -27,7 +27,7 @@ use crate::{
chat::{ChatPacket, ChatReceivedEvent},
disconnect::DisconnectEvent,
packet::game::{
AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceivePacketEvent, RemovePlayerEvent,
AddPlayerEvent, DeathEvent, KeepAliveEvent, ReceiveGamePacketEvent, RemovePlayerEvent,
UpdatePlayerEvent,
},
};
@ -157,7 +157,7 @@ impl Plugin for EventsPlugin {
)
.add_systems(
PreUpdate,
init_listener.before(crate::packet::game::process_packet_events),
init_listener.before(super::connection::read_packets),
)
.add_systems(GameTick, tick_listener);
}
@ -217,7 +217,7 @@ pub fn tick_listener(query: Query<&LocalPlayerEvents, With<InstanceName>>) {
pub fn packet_listener(
query: Query<&LocalPlayerEvents>,
mut events: EventReader<ReceivePacketEvent>,
mut events: EventReader<ReceiveGamePacketEvent>,
) {
for event in events.read() {
if let Ok(local_player_events) = query.get(event.entity) {

View file

@ -0,0 +1,120 @@
use azalea_auth::sessionserver::ClientSessionServerError;
use azalea_protocol::packets::login::{ClientboundHello, ServerboundKey};
use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
use tracing::error;
use super::{connection::RawConnection, packet::login::ReceiveHelloEvent};
use crate::{Account, JoinError};
pub struct LoginPlugin;
impl Plugin for LoginPlugin {
fn build(&self, app: &mut App) {
app.add_observer(handle_receive_hello_event)
.add_systems(Update, poll_auth_task);
}
}
fn handle_receive_hello_event(trigger: Trigger<ReceiveHelloEvent>, mut commands: Commands) {
let task_pool = IoTaskPool::get();
let account = trigger.account.clone();
let packet = trigger.packet.clone();
let player = trigger.entity();
let task = task_pool.spawn(auth_with_account(account, packet));
commands.entity(player).insert(AuthTask(task));
}
fn poll_auth_task(
mut commands: Commands,
mut query: Query<(Entity, &mut AuthTask, &mut RawConnection)>,
) {
for (entity, mut auth_task, mut raw_conn) in query.iter_mut() {
if let Some(poll_res) = future::block_on(future::poll_once(&mut auth_task.0)) {
commands.entity(entity).remove::<AuthTask>();
match poll_res {
Ok((packet, private_key)) => {
// we use this instead of SendLoginPacketEvent to ensure that it's sent right
// before encryption is enabled. i guess another option would be to make a
// Trigger+observer for set_encryption_key; the current implementation is
// simpler though.
if let Err(e) = raw_conn.write(packet) {
error!("Error sending key packet: {e:?}");
}
if let Some(net_conn) = raw_conn.net_conn() {
net_conn.set_encryption_key(private_key);
}
}
Err(err) => {
error!("Error during authentication: {err:?}");
}
}
}
}
}
type PrivateKey = [u8; 16];
#[derive(Component)]
pub struct AuthTask(Task<Result<(ServerboundKey, PrivateKey), JoinError>>);
pub async fn auth_with_account(
account: Account,
packet: ClientboundHello,
) -> Result<(ServerboundKey, PrivateKey), JoinError> {
let Ok(encrypt_res) = azalea_crypto::encrypt(&packet.public_key, &packet.challenge) else {
return Err(JoinError::EncryptionError(packet));
};
let key_packet = ServerboundKey {
key_bytes: encrypt_res.encrypted_public_key,
encrypted_challenge: encrypt_res.encrypted_challenge,
};
let private_key = encrypt_res.secret_key;
let Some(access_token) = &account.access_token else {
// offline mode account, no need to do auth
return Ok((key_packet, private_key));
};
// keep track of the number of times we tried authenticating so we can give up
// after too many
let mut attempts: usize = 1;
while let Err(err) = {
let access_token = access_token.lock().clone();
let uuid = &account
.uuid
.expect("Uuid must be present if access token is present.");
azalea_auth::sessionserver::join(
&access_token,
&packet.public_key,
&private_key,
uuid,
&packet.server_id,
)
.await
} {
if attempts >= 2 {
// if this is the second attempt and we failed
// both times, give up
return Err(err.into());
}
if matches!(
err,
ClientSessionServerError::InvalidSession | ClientSessionServerError::ForbiddenOperation
) {
// uh oh, we got an invalid session and have
// to reauthenticate now
account.refresh().await?;
} else {
return Err(err.into());
}
attempts += 1;
}
Ok((key_packet, private_key))
}

View file

@ -1,11 +1,15 @@
use bevy_app::{PluginGroup, PluginGroupBuilder};
pub mod attack;
pub mod brand;
pub mod chat;
pub mod chunks;
pub mod connection;
pub mod disconnect;
pub mod events;
pub mod interact;
pub mod inventory;
pub mod login;
pub mod mining;
pub mod movement;
pub mod packet;
@ -13,3 +17,42 @@ pub mod pong;
pub mod respawn;
pub mod task_pool;
pub mod tick_end;
/// This plugin group will add all the default plugins necessary for Azalea to
/// work.
pub struct DefaultPlugins;
impl PluginGroup for DefaultPlugins {
fn build(self) -> PluginGroupBuilder {
#[allow(unused_mut)]
let mut group = PluginGroupBuilder::start::<Self>()
.add(crate::client::AmbiguityLoggerPlugin)
.add(bevy_time::TimePlugin)
.add(packet::PacketPlugin)
.add(crate::client::AzaleaPlugin)
.add(azalea_entity::EntityPlugin)
.add(azalea_physics::PhysicsPlugin)
.add(events::EventsPlugin)
.add(task_pool::TaskPoolPlugin::default())
.add(inventory::InventoryPlugin)
.add(chat::ChatPlugin)
.add(disconnect::DisconnectPlugin)
.add(movement::MovementPlugin)
.add(interact::InteractPlugin)
.add(respawn::RespawnPlugin)
.add(mining::MiningPlugin)
.add(attack::AttackPlugin)
.add(chunks::ChunksPlugin)
.add(tick_end::TickEndPlugin)
.add(brand::BrandPlugin)
.add(crate::client::TickBroadcastPlugin)
.add(pong::PongPlugin)
.add(connection::ConnectionPlugin)
.add(login::LoginPlugin);
#[cfg(feature = "log")]
{
group = group.add(bevy_log::LogPlugin::default());
}
group
}
}

View file

@ -1,23 +1,20 @@
use std::io::Cursor;
use std::sync::Arc;
use azalea_protocol::{
packets::{
Packet,
config::{ClientboundConfigPacket, ServerboundConfigPacket},
},
read::deserialize_packet,
use azalea_protocol::packets::{
Packet,
config::{ClientboundConfigPacket, ServerboundConfigPacket},
};
use bevy_ecs::prelude::*;
use tracing::{debug, error};
use crate::{InConfigState, raw_connection::RawConnection};
use crate::{InConfigState, connection::RawConnection};
#[derive(Event, Debug, Clone)]
pub struct ReceiveConfigPacketEvent {
/// The client entity that received the packet.
pub entity: Entity,
/// The packet that was actually received.
pub packet: ClientboundConfigPacket,
pub packet: Arc<ClientboundConfigPacket>,
}
/// An event for sending a packet to the server while we're in the
@ -39,7 +36,7 @@ pub fn handle_outgoing_packets_observer(
mut query: Query<(&mut RawConnection, Option<&InConfigState>)>,
) {
let event = trigger.event();
if let Ok((raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) {
if let Ok((mut raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) {
if in_configuration_state.is_none() {
error!(
"Tried to send a configuration packet {:?} while not in configuration state",
@ -48,7 +45,7 @@ pub fn handle_outgoing_packets_observer(
return;
}
debug!("Sending packet: {:?}", event.packet);
if let Err(e) = raw_conn.write_packet(event.packet.clone()) {
if let Err(e) = raw_conn.write(event.packet.clone()) {
error!("Failed to send packet: {e}");
}
}
@ -64,61 +61,6 @@ pub fn handle_outgoing_packets(
}
}
pub fn emit_receive_config_packet_events(
query: Query<(Entity, &RawConnection), With<InConfigState>>,
mut packet_events: ResMut<Events<ReceiveConfigPacketEvent>>,
) {
// we manually clear and send the events at the beginning of each update
// since otherwise it'd cause issues with events in process_packet_events
// running twice
packet_events.clear();
for (player_entity, raw_conn) in &query {
let packets_lock = raw_conn.incoming_packet_queue();
let mut packets = packets_lock.lock();
if !packets.is_empty() {
let mut packets_read = 0;
for raw_packet in packets.iter() {
packets_read += 1;
let packet = match deserialize_packet::<ClientboundConfigPacket>(&mut Cursor::new(
raw_packet,
)) {
Ok(packet) => packet,
Err(err) => {
error!("failed to read packet: {err:?}");
debug!("packet bytes: {raw_packet:?}");
continue;
}
};
let should_interrupt = packet_interrupts(&packet);
packet_events.send(ReceiveConfigPacketEvent {
entity: player_entity,
packet,
});
if should_interrupt {
break;
}
}
packets.drain(0..packets_read);
}
}
}
/// Whether the given packet should make us stop deserializing the received
/// packets until next update.
///
/// This is used for packets that can switch the client state.
fn packet_interrupts(packet: &ClientboundConfigPacket) -> bool {
matches!(
packet,
ClientboundConfigPacket::FinishConfiguration(_)
| ClientboundConfigPacket::Disconnect(_)
| ClientboundConfigPacket::Transfer(_)
)
}
/// A Bevy trigger that's sent when our client receives a [`ClientboundPing`]
/// packet in the config state.
///

View file

@ -1,65 +1,61 @@
mod events;
use std::io::Cursor;
use azalea_entity::LocalEntity;
use azalea_protocol::packets::ConnectionProtocol;
use azalea_protocol::packets::config::*;
use azalea_protocol::read::ReadPacketError;
use azalea_protocol::read::deserialize_packet;
use bevy_ecs::prelude::*;
use bevy_ecs::system::SystemState;
pub use events::*;
use tracing::{debug, warn};
use super::as_system;
use crate::client::InConfigState;
use crate::connection::RawConnection;
use crate::disconnect::DisconnectEvent;
use crate::packet::game::KeepAliveEvent;
use crate::packet::game::ResourcePackEvent;
use crate::raw_connection::RawConnection;
use crate::{InstanceHolder, declare_packet_handlers};
pub fn process_packet_events(ecs: &mut World) {
let mut events_owned = Vec::new();
let mut system_state: SystemState<EventReader<ReceiveConfigPacketEvent>> =
SystemState::new(ecs);
let mut events = system_state.get_mut(ecs);
for ReceiveConfigPacketEvent {
entity: player_entity,
packet,
} in events.read()
{
// we do this so `ecs` isn't borrowed for the whole loop
events_owned.push((*player_entity, packet.clone()));
}
for (player_entity, packet) in events_owned {
let mut handler = ConfigPacketHandler {
player: player_entity,
ecs,
};
pub fn process_raw_packet(
ecs: &mut World,
player: Entity,
raw_packet: &[u8],
) -> Result<(), Box<ReadPacketError>> {
let packet = deserialize_packet(&mut Cursor::new(&raw_packet))?;
process_packet(ecs, player, &packet);
Ok(())
}
declare_packet_handlers!(
ClientboundConfigPacket,
packet,
handler,
[
cookie_request,
custom_payload,
disconnect,
finish_configuration,
keep_alive,
ping,
reset_chat,
registry_data,
resource_pack_pop,
resource_pack_push,
store_cookie,
transfer,
update_enabled_features,
update_tags,
select_known_packs,
custom_report_details,
server_links,
]
);
}
pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundConfigPacket) {
let mut handler = ConfigPacketHandler { player, ecs };
declare_packet_handlers!(
ClientboundConfigPacket,
packet,
handler,
[
cookie_request,
custom_payload,
disconnect,
finish_configuration,
keep_alive,
ping,
reset_chat,
registry_data,
resource_pack_pop,
resource_pack_push,
store_cookie,
transfer,
update_enabled_features,
update_tags,
select_known_packs,
custom_report_details,
server_links,
]
);
}
pub struct ConfigPacketHandler<'a> {
@ -67,31 +63,33 @@ pub struct ConfigPacketHandler<'a> {
pub player: Entity,
}
impl ConfigPacketHandler<'_> {
pub fn registry_data(&mut self, p: ClientboundRegistryData) {
pub fn registry_data(&mut self, p: &ClientboundRegistryData) {
as_system::<Query<&mut InstanceHolder>>(self.ecs, |mut query| {
let instance_holder = query.get_mut(self.player).unwrap();
let mut instance = instance_holder.instance.write();
// add the new registry data
instance.registries.append(p.registry_id, p.entries);
instance
.registries
.append(p.registry_id.clone(), p.entries.clone());
});
}
pub fn custom_payload(&mut self, p: ClientboundCustomPayload) {
pub fn custom_payload(&mut self, p: &ClientboundCustomPayload) {
debug!("Got custom payload packet {p:?}");
}
pub fn disconnect(&mut self, p: ClientboundDisconnect) {
pub fn disconnect(&mut self, p: &ClientboundDisconnect) {
warn!("Got disconnect packet {p:?}");
as_system::<EventWriter<_>>(self.ecs, |mut events| {
events.send(DisconnectEvent {
entity: self.player,
reason: Some(p.reason),
reason: Some(p.reason.clone()),
});
});
}
pub fn finish_configuration(&mut self, p: ClientboundFinishConfiguration) {
pub fn finish_configuration(&mut self, p: &ClientboundFinishConfiguration) {
debug!("got FinishConfiguration packet: {p:?}");
as_system::<(Commands, Query<&mut RawConnection>)>(
@ -99,12 +97,11 @@ impl ConfigPacketHandler<'_> {
|(mut commands, mut query)| {
let mut raw_conn = query.get_mut(self.player).unwrap();
raw_conn
.write_packet(ServerboundFinishConfiguration)
.expect(
"we should be in the right state and encoding this packet shouldn't fail",
);
raw_conn.set_state(ConnectionProtocol::Game);
commands.trigger(SendConfigPacketEvent::new(
self.player,
ServerboundFinishConfiguration,
));
raw_conn.state = ConnectionProtocol::Game;
// these components are added now that we're going to be in the Game state
commands
@ -120,34 +117,33 @@ impl ConfigPacketHandler<'_> {
);
}
pub fn keep_alive(&mut self, p: ClientboundKeepAlive) {
pub fn keep_alive(&mut self, p: &ClientboundKeepAlive) {
debug!(
"Got keep alive packet (in configuration) {p:?} for {:?}",
self.player
);
as_system::<(Query<&RawConnection>, EventWriter<_>)>(self.ecs, |(query, mut events)| {
let raw_conn = query.get(self.player).unwrap();
as_system::<(Commands, EventWriter<_>)>(self.ecs, |(mut commands, mut events)| {
events.send(KeepAliveEvent {
entity: self.player,
id: p.id,
});
raw_conn
.write_packet(ServerboundKeepAlive { id: p.id })
.unwrap();
commands.trigger(SendConfigPacketEvent::new(
self.player,
ServerboundKeepAlive { id: p.id },
));
});
}
pub fn ping(&mut self, p: ClientboundPing) {
pub fn ping(&mut self, p: &ClientboundPing) {
debug!("Got ping packet (in configuration) {p:?}");
as_system::<Commands>(self.ecs, |mut commands| {
commands.trigger_targets(ConfigPingEvent(p), self.player);
commands.trigger_targets(ConfigPingEvent(p.clone()), self.player);
});
}
pub fn resource_pack_push(&mut self, p: ClientboundResourcePackPush) {
pub fn resource_pack_push(&mut self, p: &ClientboundResourcePackPush) {
debug!("Got resource pack push packet {p:?}");
as_system::<EventWriter<_>>(self.ecs, |mut events| {
@ -162,66 +158,64 @@ impl ConfigPacketHandler<'_> {
});
}
pub fn resource_pack_pop(&mut self, p: ClientboundResourcePackPop) {
pub fn resource_pack_pop(&mut self, p: &ClientboundResourcePackPop) {
debug!("Got resource pack pop packet {p:?}");
}
pub fn update_enabled_features(&mut self, p: ClientboundUpdateEnabledFeatures) {
pub fn update_enabled_features(&mut self, p: &ClientboundUpdateEnabledFeatures) {
debug!("Got update enabled features packet {p:?}");
}
pub fn update_tags(&mut self, _p: ClientboundUpdateTags) {
pub fn update_tags(&mut self, _p: &ClientboundUpdateTags) {
debug!("Got update tags packet");
}
pub fn cookie_request(&mut self, p: ClientboundCookieRequest) {
pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) {
debug!("Got cookie request packet {p:?}");
as_system::<Query<&RawConnection>>(self.ecs, |query| {
let raw_conn = query.get(self.player).unwrap();
raw_conn
.write_packet(ServerboundCookieResponse {
key: p.key,
as_system::<Commands>(self.ecs, |mut commands| {
commands.trigger(SendConfigPacketEvent::new(
self.player,
ServerboundCookieResponse {
key: p.key.clone(),
// cookies aren't implemented
payload: None,
})
.unwrap();
},
));
});
}
pub fn reset_chat(&mut self, p: ClientboundResetChat) {
pub fn reset_chat(&mut self, p: &ClientboundResetChat) {
debug!("Got reset chat packet {p:?}");
}
pub fn store_cookie(&mut self, p: ClientboundStoreCookie) {
pub fn store_cookie(&mut self, p: &ClientboundStoreCookie) {
debug!("Got store cookie packet {p:?}");
}
pub fn transfer(&mut self, p: ClientboundTransfer) {
pub fn transfer(&mut self, p: &ClientboundTransfer) {
debug!("Got transfer packet {p:?}");
}
pub fn select_known_packs(&mut self, p: ClientboundSelectKnownPacks) {
pub fn select_known_packs(&mut self, p: &ClientboundSelectKnownPacks) {
debug!("Got select known packs packet {p:?}");
as_system::<Query<&RawConnection>>(self.ecs, |query| {
let raw_conn = query.get(self.player).unwrap();
as_system::<Commands>(self.ecs, |mut commands| {
// resource pack management isn't implemented
raw_conn
.write_packet(ServerboundSelectKnownPacks {
commands.trigger(SendConfigPacketEvent::new(
self.player,
ServerboundSelectKnownPacks {
known_packs: vec![],
})
.unwrap();
},
));
});
}
pub fn server_links(&mut self, p: ClientboundServerLinks) {
pub fn server_links(&mut self, p: &ClientboundServerLinks) {
debug!("Got server links packet {p:?}");
}
pub fn custom_report_details(&mut self, p: ClientboundCustomReportDetails) {
pub fn custom_report_details(&mut self, p: &ClientboundCustomReportDetails) {
debug!("Got custom report details packet {p:?}");
}
}

View file

@ -1,24 +1,18 @@
use std::{
io::Cursor,
sync::{Arc, Weak},
};
use std::sync::{Arc, Weak};
use azalea_chat::FormattedText;
use azalea_core::resource_location::ResourceLocation;
use azalea_protocol::{
packets::{
Packet,
game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket},
},
read::deserialize_packet,
use azalea_protocol::packets::{
Packet,
game::{ClientboundGamePacket, ClientboundPlayerCombatKill, ServerboundGamePacket},
};
use azalea_world::Instance;
use bevy_ecs::prelude::*;
use parking_lot::RwLock;
use tracing::{debug, error};
use tracing::error;
use uuid::Uuid;
use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection};
use crate::{PlayerInfo, client::InGameState, connection::RawConnection};
/// An event that's sent when we receive a packet.
/// ```
@ -41,7 +35,7 @@ use crate::{PlayerInfo, client::InGameState, raw_connection::RawConnection};
/// }
/// ```
#[derive(Event, Debug, Clone)]
pub struct ReceivePacketEvent {
pub struct ReceiveGamePacketEvent {
/// The client entity that received the packet.
pub entity: Entity,
/// The packet that was actually received.
@ -67,7 +61,7 @@ pub fn handle_outgoing_packets_observer(
) {
let event = trigger.event();
if let Ok((raw_connection, in_game_state)) = query.get_mut(event.sent_by) {
if let Ok((mut raw_connection, in_game_state)) = query.get_mut(event.sent_by) {
if in_game_state.is_none() {
error!(
"Tried to send a game packet {:?} while not in game state",
@ -77,7 +71,7 @@ pub fn handle_outgoing_packets_observer(
}
// debug!("Sending packet: {:?}", event.packet);
if let Err(e) = raw_connection.write_packet(event.packet.clone()) {
if let Err(e) = raw_connection.write(event.packet.clone()) {
error!("Failed to send packet: {e}");
}
}
@ -91,61 +85,6 @@ pub fn handle_outgoing_packets(mut commands: Commands, mut events: EventReader<S
}
}
pub fn emit_receive_packet_events(
query: Query<(Entity, &RawConnection), With<InGameState>>,
mut packet_events: ResMut<Events<ReceivePacketEvent>>,
) {
// we manually clear and send the events at the beginning of each update
// since otherwise it'd cause issues with events in process_packet_events
// running twice
packet_events.clear();
for (player_entity, raw_connection) in &query {
let packets_lock = raw_connection.incoming_packet_queue();
let mut packets = packets_lock.lock();
if !packets.is_empty() {
let mut packets_read = 0;
for raw_packet in packets.iter() {
packets_read += 1;
let packet =
match deserialize_packet::<ClientboundGamePacket>(&mut Cursor::new(raw_packet))
{
Ok(packet) => packet,
Err(err) => {
error!("failed to read packet: {err:?}");
debug!("packet bytes: {raw_packet:?}");
continue;
}
};
let should_interrupt = packet_interrupts(&packet);
packet_events.send(ReceivePacketEvent {
entity: player_entity,
packet: Arc::new(packet),
});
if should_interrupt {
break;
}
}
packets.drain(0..packets_read);
}
}
}
/// Whether the given packet should make us stop deserializing the received
/// packets until next update.
///
/// This is used for packets that can switch the client state.
fn packet_interrupts(packet: &ClientboundGamePacket) -> bool {
matches!(
packet,
ClientboundGamePacket::StartConfiguration(_)
| ClientboundGamePacket::Disconnect(_)
| ClientboundGamePacket::Transfer(_)
)
}
/// A player joined the game (or more specifically, was added to the tab
/// list of a local player).
#[derive(Event, Debug, Clone)]

View file

@ -32,171 +32,150 @@ use crate::{
},
movement::{KnockbackEvent, KnockbackType},
packet::as_system,
raw_connection::RawConnection,
};
pub fn process_packet_events(ecs: &mut World) {
let mut events_owned = Vec::<(Entity, Arc<ClientboundGamePacket>)>::new();
pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundGamePacket) {
let mut handler = GamePacketHandler { player, ecs };
{
let mut system_state = SystemState::<EventReader<ReceivePacketEvent>>::new(ecs);
let mut events = system_state.get_mut(ecs);
for ReceivePacketEvent {
entity: player_entity,
packet,
} in events.read()
{
// we do this so `ecs` isn't borrowed for the whole loop
events_owned.push((*player_entity, packet.clone()));
}
}
for (player_entity, packet) in events_owned {
let mut handler = GamePacketHandler {
player: player_entity,
ecs,
};
// the order of these doesn't matter, that's decided by the protocol library
declare_packet_handlers!(
ClientboundGamePacket,
packet.as_ref(),
handler,
[
login,
set_chunk_cache_radius,
chunk_batch_start,
chunk_batch_finished,
custom_payload,
change_difficulty,
commands,
player_abilities,
set_cursor_item,
update_tags,
disconnect,
update_recipes,
entity_event,
player_position,
player_info_update,
player_info_remove,
set_chunk_cache_center,
chunks_biomes,
light_update,
level_chunk_with_light,
add_entity,
set_entity_data,
update_attributes,
set_entity_motion,
set_entity_link,
initialize_border,
set_time,
set_default_spawn_position,
set_health,
set_experience,
teleport_entity,
update_advancements,
rotate_head,
move_entity_pos,
move_entity_pos_rot,
move_entity_rot,
keep_alive,
remove_entities,
player_chat,
system_chat,
disguised_chat,
sound,
level_event,
block_update,
animate,
section_blocks_update,
game_event,
level_particles,
server_data,
set_equipment,
update_mob_effect,
award_stats,
block_changed_ack,
block_destruction,
block_entity_data,
block_event,
boss_event,
command_suggestions,
container_set_content,
container_set_data,
container_set_slot,
container_close,
cooldown,
custom_chat_completions,
delete_chat,
explode,
forget_level_chunk,
horse_screen_open,
map_item_data,
merchant_offers,
move_vehicle,
open_book,
open_screen,
open_sign_editor,
ping,
place_ghost_recipe,
player_combat_end,
player_combat_enter,
player_combat_kill,
player_look_at,
remove_mob_effect,
resource_pack_push,
resource_pack_pop,
respawn,
start_configuration,
entity_position_sync,
select_advancements_tab,
set_action_bar_text,
set_border_center,
set_border_lerp_size,
set_border_size,
set_border_warning_delay,
set_border_warning_distance,
set_camera,
set_display_objective,
set_objective,
set_passengers,
set_player_team,
set_score,
set_simulation_distance,
set_subtitle_text,
set_title_text,
set_titles_animation,
clear_titles,
sound_entity,
stop_sound,
tab_list,
tag_query,
take_item_entity,
bundle_delimiter,
damage_event,
hurt_animation,
ticking_state,
ticking_step,
reset_score,
cookie_request,
debug_sample,
pong_response,
store_cookie,
transfer,
move_minecart_along_track,
set_held_slot,
set_player_inventory,
projectile_power,
custom_report_details,
server_links,
player_rotation,
recipe_book_add,
recipe_book_remove,
recipe_book_settings,
test_instance_block_status,
]
);
}
// the order of these doesn't matter, that's decided by the protocol library
declare_packet_handlers!(
ClientboundGamePacket,
packet,
handler,
[
login,
set_chunk_cache_radius,
chunk_batch_start,
chunk_batch_finished,
custom_payload,
change_difficulty,
commands,
player_abilities,
set_cursor_item,
update_tags,
disconnect,
update_recipes,
entity_event,
player_position,
player_info_update,
player_info_remove,
set_chunk_cache_center,
chunks_biomes,
light_update,
level_chunk_with_light,
add_entity,
set_entity_data,
update_attributes,
set_entity_motion,
set_entity_link,
initialize_border,
set_time,
set_default_spawn_position,
set_health,
set_experience,
teleport_entity,
update_advancements,
rotate_head,
move_entity_pos,
move_entity_pos_rot,
move_entity_rot,
keep_alive,
remove_entities,
player_chat,
system_chat,
disguised_chat,
sound,
level_event,
block_update,
animate,
section_blocks_update,
game_event,
level_particles,
server_data,
set_equipment,
update_mob_effect,
award_stats,
block_changed_ack,
block_destruction,
block_entity_data,
block_event,
boss_event,
command_suggestions,
container_set_content,
container_set_data,
container_set_slot,
container_close,
cooldown,
custom_chat_completions,
delete_chat,
explode,
forget_level_chunk,
horse_screen_open,
map_item_data,
merchant_offers,
move_vehicle,
open_book,
open_screen,
open_sign_editor,
ping,
place_ghost_recipe,
player_combat_end,
player_combat_enter,
player_combat_kill,
player_look_at,
remove_mob_effect,
resource_pack_push,
resource_pack_pop,
respawn,
start_configuration,
entity_position_sync,
select_advancements_tab,
set_action_bar_text,
set_border_center,
set_border_lerp_size,
set_border_size,
set_border_warning_delay,
set_border_warning_distance,
set_camera,
set_display_objective,
set_objective,
set_passengers,
set_player_team,
set_score,
set_simulation_distance,
set_subtitle_text,
set_title_text,
set_titles_animation,
clear_titles,
sound_entity,
stop_sound,
tab_list,
tag_query,
take_item_entity,
bundle_delimiter,
damage_event,
hurt_animation,
ticking_state,
ticking_step,
reset_score,
cookie_request,
debug_sample,
pong_response,
store_cookie,
transfer,
move_minecart_along_track,
set_held_slot,
set_player_inventory,
projectile_power,
custom_report_details,
server_links,
player_rotation,
recipe_book_add,
recipe_book_remove,
recipe_book_settings,
test_instance_block_status,
]
);
}
pub struct GamePacketHandler<'a> {
@ -342,7 +321,7 @@ impl GamePacketHandler<'_> {
client_information
);
commands.trigger(SendPacketEvent::new(self.player,
azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { information: client_information.clone() },
azalea_protocol::packets::game::s_client_information::ServerboundClientInformation { client_information: client_information.clone() },
));
},
);
@ -1506,9 +1485,11 @@ impl GamePacketHandler<'_> {
pub fn start_configuration(&mut self, _p: &ClientboundStartConfiguration) {
debug!("Got start configuration packet");
as_system::<(Query<&RawConnection>, Commands)>(self.ecs, |(query, mut commands)| {
let raw_conn = query.get(self.player).unwrap();
let _ = raw_conn.write_packet(ServerboundConfigurationAcknowledged);
as_system::<Commands>(self.ecs, |mut commands| {
commands.trigger(SendPacketEvent::new(
self.player,
ServerboundConfigurationAcknowledged,
));
commands
.entity(self.player)

View file

@ -1,24 +1,20 @@
use std::sync::Arc;
use azalea_protocol::packets::login::ClientboundLoginPacket;
use azalea_protocol::packets::login::{ClientboundHello, ClientboundLoginPacket};
use bevy_ecs::prelude::*;
/// An event that's sent when we receive a login packet from the server. Note
/// that if you want to handle this in a system, you must add
/// `.before(azalea::packet::login::process_packet_events)` to it
/// because that system clears the events.
#[derive(Event, Debug, Clone)]
pub struct LoginPacketEvent {
/// The client entity that received the packet.
pub entity: Entity,
/// The packet that was actually received.
pub packet: Arc<ClientboundLoginPacket>,
}
use crate::Account;
#[derive(Event, Debug, Clone)]
pub struct ReceiveLoginPacketEvent {
/// The client entity that received the packet.
pub entity: Entity,
/// The packet that was actually received.
pub packet: ClientboundLoginPacket,
pub packet: Arc<ClientboundLoginPacket>,
}
#[derive(Event)]
pub struct ReceiveHelloEvent {
pub account: Account,
pub packet: ClientboundHello,
}

View file

@ -6,21 +6,53 @@ mod events;
use std::collections::HashSet;
use azalea_protocol::packets::{
Packet,
ConnectionProtocol, Packet,
login::{
ClientboundCookieRequest, ClientboundCustomQuery, ClientboundHello,
ClientboundLoginCompression, ClientboundLoginDisconnect, ClientboundLoginFinished,
ClientboundLoginPacket, ServerboundCustomQueryAnswer, ServerboundLoginPacket,
ClientboundLoginPacket, ServerboundCookieResponse, ServerboundCustomQueryAnswer,
ServerboundLoginAcknowledged, ServerboundLoginPacket,
},
};
use bevy_ecs::{prelude::*, system::SystemState};
use bevy_ecs::prelude::*;
use derive_more::{Deref, DerefMut};
pub use events::*;
use tokio::sync::mpsc;
use tracing::error;
use tracing::{debug, error};
use super::as_system;
use crate::declare_packet_handlers;
use crate::{
Account, GameProfileComponent, InConfigState, connection::NetworkConnection,
declare_packet_handlers, disconnect::DisconnectEvent,
};
pub fn process_packet(
ecs: &mut World,
player: Entity,
packet: &ClientboundLoginPacket,
state: &mut ConnectionProtocol,
net_conn: Option<&mut NetworkConnection>,
) {
let mut handler = LoginPacketHandler {
player,
ecs,
state,
net_conn,
};
declare_packet_handlers!(
ClientboundLoginPacket,
packet,
handler,
[
hello,
login_disconnect,
login_finished,
login_compression,
custom_query,
cookie_request
]
);
}
/// Event for sending a login packet to the server.
#[derive(Event)]
@ -35,103 +67,113 @@ impl SendLoginPacketEvent {
}
}
#[derive(Component)]
pub struct LoginSendPacketQueue {
pub tx: mpsc::UnboundedSender<ServerboundLoginPacket>,
}
/// A marker component for local players that are currently in the
/// `login` state.
#[derive(Component, Clone, Debug)]
pub struct InLoginState;
pub fn handle_send_packet_event(
mut send_packet_events: EventReader<SendLoginPacketEvent>,
mut query: Query<&mut LoginSendPacketQueue>,
) {
for event in send_packet_events.read() {
if let Ok(queue) = query.get_mut(event.entity) {
let _ = queue.tx.send(event.packet.clone());
} else {
error!("Sent SendPacketEvent for entity that doesn't have a LoginSendPacketQueue");
}
}
}
/// Plugins can add to this set if they want to handle a custom query packet
/// themselves. This component removed after the login state ends.
#[derive(Component, Default, Debug, Deref, DerefMut)]
pub struct IgnoreQueryIds(HashSet<u32>);
pub fn process_packet_events(ecs: &mut World) {
let mut events_owned = Vec::new();
let mut system_state: SystemState<EventReader<ReceiveLoginPacketEvent>> = SystemState::new(ecs);
let mut events = system_state.get_mut(ecs);
for ReceiveLoginPacketEvent {
entity: player_entity,
packet,
} in events.read()
{
// we do this so `ecs` isn't borrowed for the whole loop
events_owned.push((*player_entity, packet.clone()));
}
for (player_entity, packet) in events_owned {
let mut handler = LoginPacketHandler {
player: player_entity,
ecs,
};
declare_packet_handlers!(
ClientboundLoginPacket,
packet,
handler,
[
hello,
login_disconnect,
login_finished,
login_compression,
custom_query,
cookie_request
]
);
}
}
pub struct LoginPacketHandler<'a> {
pub ecs: &'a mut World,
pub player: Entity,
pub state: &'a mut ConnectionProtocol,
pub net_conn: Option<&'a mut NetworkConnection>,
}
impl LoginPacketHandler<'_> {
pub fn hello(&mut self, _p: ClientboundHello) {}
pub fn login_disconnect(&mut self, _p: ClientboundLoginDisconnect) {}
pub fn login_finished(&mut self, _p: ClientboundLoginFinished) {}
pub fn login_compression(&mut self, _p: ClientboundLoginCompression) {
// as_system::<Query<&mut RawConnection>>(self.ecs, |mut query| {
// if let Ok(mut raw_conn) = query.get_mut(self.player) {
// raw_conn.set_compression_threshold(p.compression_threshold);
// }
// });
}
pub fn custom_query(&mut self, p: ClientboundCustomQuery) {
as_system::<(EventWriter<SendLoginPacketEvent>, Query<&IgnoreQueryIds>)>(
self.ecs,
|(mut events, query)| {
let ignore_query_ids = query.get(self.player).ok().map(|x| x.0.clone());
if let Some(ignore_query_ids) = ignore_query_ids {
if ignore_query_ids.contains(&p.transaction_id) {
return;
}
}
pub fn hello(&mut self, p: &ClientboundHello) {
debug!("Got encryption request {p:?}");
events.send(SendLoginPacketEvent::new(
self.player,
ServerboundCustomQueryAnswer {
transaction_id: p.transaction_id,
data: None,
},
));
},
);
as_system::<(Commands, Query<&Account>)>(self.ecs, |(mut commands, query)| {
let Ok(account) = query.get(self.player) else {
error!(
"Expected Account component to be present on player when receiving hello packet."
);
return;
};
commands.trigger_targets(
ReceiveHelloEvent {
account: account.clone(),
packet: p.clone(),
},
self.player,
);
});
}
pub fn login_disconnect(&mut self, p: &ClientboundLoginDisconnect) {
debug!("Got disconnect {:?}", p);
as_system::<EventWriter<_>>(self.ecs, |mut events| {
events.send(DisconnectEvent {
entity: self.player,
reason: Some(p.reason.clone()),
});
});
}
pub fn login_finished(&mut self, p: &ClientboundLoginFinished) {
debug!(
"Got profile {:?}. handshake is finished and we're now switching to the configuration state",
p.game_profile
);
as_system::<Commands>(self.ecs, |mut commands| {
commands.trigger(SendLoginPacketEvent::new(
self.player,
ServerboundLoginAcknowledged,
));
commands
.entity(self.player)
.remove::<IgnoreQueryIds>()
.remove::<InLoginState>()
.insert(InConfigState)
.insert(GameProfileComponent(p.game_profile.clone()));
});
// break (conn.config(), p.game_profile);
}
pub fn login_compression(&mut self, p: &ClientboundLoginCompression) {
debug!("Got compression request {p:?}");
if let Some(net_conn) = &mut self.net_conn {
net_conn.set_compression_threshold(Some(p.compression_threshold as u32));
}
}
pub fn custom_query(&mut self, p: &ClientboundCustomQuery) {
debug!("Got custom query {p:?}");
as_system::<(Commands, Query<&IgnoreQueryIds>)>(self.ecs, |(mut commands, query)| {
let ignore_query_ids = query.get(self.player).ok().map(|x| x.0.clone());
if let Some(ignore_query_ids) = ignore_query_ids {
if ignore_query_ids.contains(&p.transaction_id) {
return;
}
}
commands.trigger(SendLoginPacketEvent::new(
self.player,
ServerboundCustomQueryAnswer {
transaction_id: p.transaction_id,
data: None,
},
));
});
}
pub fn cookie_request(&mut self, p: &ClientboundCookieRequest) {
debug!("Got cookie request {p:?}");
as_system::<Commands>(self.ecs, |mut commands| {
commands.trigger(SendLoginPacketEvent::new(
self.player,
ServerboundCookieResponse {
key: p.key.clone(),
// cookies aren't implemented
payload: None,
},
));
});
}
pub fn cookie_request(&mut self, _p: ClientboundCookieRequest) {}
}

View file

@ -1,16 +1,13 @@
use azalea_entity::metadata::Health;
use bevy_app::{App, First, Plugin, PreUpdate, Update};
use bevy_app::{App, Plugin, Update};
use bevy_ecs::{
prelude::*,
system::{SystemParam, SystemState},
};
use self::{
game::{
AddPlayerEvent, DeathEvent, InstanceLoadedEvent, KeepAliveEvent, RemovePlayerEvent,
ResourcePackEvent, UpdatePlayerEvent,
},
login::{LoginPacketEvent, SendLoginPacketEvent},
use self::game::{
AddPlayerEvent, DeathEvent, InstanceLoadedEvent, KeepAliveEvent, RemovePlayerEvent,
ResourcePackEvent, UpdatePlayerEvent,
};
use crate::{chat::ChatReceivedEvent, events::death_listener};
@ -36,50 +33,35 @@ pub fn death_event_on_0_health(
impl Plugin for PacketPlugin {
fn build(&self, app: &mut App) {
app.add_systems(
First,
(
game::emit_receive_packet_events,
config::emit_receive_config_packet_events,
),
)
.add_systems(
PreUpdate,
(
game::process_packet_events,
config::process_packet_events,
login::handle_send_packet_event,
login::process_packet_events,
),
)
.add_observer(game::handle_outgoing_packets_observer)
.add_observer(config::handle_outgoing_packets_observer)
.add_systems(
Update,
(
app.add_observer(game::handle_outgoing_packets_observer)
.add_observer(config::handle_outgoing_packets_observer)
.add_systems(
Update,
(
config::handle_outgoing_packets,
game::handle_outgoing_packets,
)
.chain(),
death_event_on_0_health.before(death_listener),
),
)
// we do this instead of add_event so we can handle the events ourselves
.init_resource::<Events<game::ReceivePacketEvent>>()
.init_resource::<Events<config::ReceiveConfigPacketEvent>>()
.add_event::<game::SendPacketEvent>()
.add_event::<config::SendConfigPacketEvent>()
.add_event::<AddPlayerEvent>()
.add_event::<RemovePlayerEvent>()
.add_event::<UpdatePlayerEvent>()
.add_event::<ChatReceivedEvent>()
.add_event::<DeathEvent>()
.add_event::<KeepAliveEvent>()
.add_event::<ResourcePackEvent>()
.add_event::<InstanceLoadedEvent>()
.add_event::<LoginPacketEvent>()
.add_event::<SendLoginPacketEvent>();
(
config::handle_outgoing_packets,
game::handle_outgoing_packets,
)
.chain(),
death_event_on_0_health.before(death_listener),
),
)
.add_event::<game::ReceiveGamePacketEvent>()
.add_event::<config::ReceiveConfigPacketEvent>()
.add_event::<login::ReceiveLoginPacketEvent>()
//
.add_event::<game::SendPacketEvent>()
.add_event::<config::SendConfigPacketEvent>()
.add_event::<login::SendLoginPacketEvent>()
//
.add_event::<AddPlayerEvent>()
.add_event::<RemovePlayerEvent>()
.add_event::<UpdatePlayerEvent>()
.add_event::<ChatReceivedEvent>()
.add_event::<DeathEvent>()
.add_event::<KeepAliveEvent>()
.add_event::<ResourcePackEvent>()
.add_event::<InstanceLoadedEvent>();
}
}

View file

@ -1,208 +0,0 @@
use std::fmt::Debug;
use std::sync::Arc;
use azalea_protocol::{
connect::{RawReadConnection, RawWriteConnection},
packets::{ConnectionProtocol, Packet, ProtocolPacket},
read::ReadPacketError,
write::serialize_packet,
};
use bevy_ecs::prelude::*;
use parking_lot::Mutex;
use thiserror::Error;
use tokio::sync::mpsc::{
self,
error::{SendError, TrySendError},
};
use tracing::error;
/// A component for clients that can read and write packets to the server. This
/// works with raw bytes, so you'll have to serialize/deserialize packets
/// yourself. It will do the compression and encryption for you though.
#[derive(Component)]
pub struct RawConnection {
pub reader: RawConnectionReader,
pub writer: RawConnectionWriter,
/// Packets sent to this will be sent to the server.
/// A task that reads packets from the server. The client is disconnected
/// when this task ends.
pub read_packets_task: tokio::task::JoinHandle<()>,
/// A task that writes packets from the server.
pub write_packets_task: tokio::task::JoinHandle<()>,
pub connection_protocol: ConnectionProtocol,
}
#[derive(Clone)]
pub struct RawConnectionReader {
pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
pub run_schedule_sender: mpsc::Sender<()>,
}
#[derive(Clone)]
pub struct RawConnectionWriter {
pub outgoing_packets_sender: mpsc::UnboundedSender<Box<[u8]>>,
}
#[derive(Error, Debug)]
pub enum WritePacketError {
#[error("Wrong protocol state: expected {expected:?}, got {got:?}")]
WrongState {
expected: ConnectionProtocol,
got: ConnectionProtocol,
},
#[error(transparent)]
Encoding(#[from] azalea_protocol::write::PacketEncodeError),
#[error(transparent)]
SendError {
#[from]
#[backtrace]
source: SendError<Box<[u8]>>,
},
}
impl RawConnection {
pub fn new(
run_schedule_sender: mpsc::Sender<()>,
connection_protocol: ConnectionProtocol,
raw_read_connection: RawReadConnection,
raw_write_connection: RawWriteConnection,
) -> Self {
let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel();
let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
let reader = RawConnectionReader {
incoming_packet_queue: incoming_packet_queue.clone(),
run_schedule_sender,
};
let writer = RawConnectionWriter {
outgoing_packets_sender,
};
let read_packets_task = tokio::spawn(reader.clone().read_task(raw_read_connection));
let write_packets_task = tokio::spawn(
writer
.clone()
.write_task(raw_write_connection, outgoing_packets_receiver),
);
Self {
reader,
writer,
read_packets_task,
write_packets_task,
connection_protocol,
}
}
pub fn write_raw_packet(&self, raw_packet: Box<[u8]>) -> Result<(), WritePacketError> {
self.writer.outgoing_packets_sender.send(raw_packet)?;
Ok(())
}
/// Write the packet with the given state to the server.
///
/// # Errors
///
/// Returns an error if the packet is not valid for the current state, or if
/// encoding it failed somehow (like it's too big or something).
pub fn write_packet<P: ProtocolPacket + Debug>(
&self,
packet: impl Packet<P>,
) -> Result<(), WritePacketError> {
let packet = packet.into_variant();
let raw_packet = serialize_packet(&packet)?;
self.write_raw_packet(raw_packet)?;
Ok(())
}
/// Returns whether the connection is still alive.
pub fn is_alive(&self) -> bool {
!self.read_packets_task.is_finished()
}
pub fn incoming_packet_queue(&self) -> Arc<Mutex<Vec<Box<[u8]>>>> {
self.reader.incoming_packet_queue.clone()
}
pub fn set_state(&mut self, connection_protocol: ConnectionProtocol) {
self.connection_protocol = connection_protocol;
}
}
impl RawConnectionReader {
/// Loop that reads from the connection and adds the packets to the queue +
/// runs the schedule.
pub async fn read_task(self, mut read_conn: RawReadConnection) {
fn log_for_error(error: &ReadPacketError) {
if !matches!(*error, ReadPacketError::ConnectionClosed) {
error!("Error reading packet from Client: {error:?}");
}
}
loop {
match read_conn.read().await {
Ok(raw_packet) => {
let mut incoming_packet_queue = self.incoming_packet_queue.lock();
incoming_packet_queue.push(raw_packet);
// this makes it so packets received at the same time are guaranteed to be
// handled in the same tick. this is also an attempt at making it so we can't
// receive any packets in the ticks/updates after being disconnected.
loop {
let raw_packet = match read_conn.try_read() {
Ok(p) => p,
Err(err) => {
log_for_error(&err);
return;
}
};
let Some(raw_packet) = raw_packet else { break };
incoming_packet_queue.push(raw_packet);
}
// tell the client to run all the systems
if self.run_schedule_sender.try_send(()) == Err(TrySendError::Closed(())) {
// the client was dropped
break;
}
}
Err(err) => {
log_for_error(&err);
return;
}
}
}
}
}
impl RawConnectionWriter {
/// Consume the [`ServerboundGamePacket`] queue and actually write the
/// packets to the server. It's like this so writing packets doesn't need to
/// be awaited.
///
/// [`ServerboundGamePacket`]: azalea_protocol::packets::game::ServerboundGamePacket
pub async fn write_task(
self,
mut write_conn: RawWriteConnection,
mut outgoing_packets_receiver: mpsc::UnboundedReceiver<Box<[u8]>>,
) {
while let Some(raw_packet) = outgoing_packets_receiver.recv().await {
if let Err(err) = write_conn.write(&raw_packet).await {
error!("Disconnecting because we couldn't write a packet: {err}.");
break;
};
}
// receiver is automatically closed when it's dropped
}
}
impl Drop for RawConnection {
/// Stop every active task when this `RawConnection` is dropped.
fn drop(&mut self) {
self.read_packets_task.abort();
self.write_packets_task.abort();
}
}

View file

@ -1,6 +1,5 @@
use std::{fmt::Debug, sync::Arc, time::Duration};
use std::{fmt::Debug, sync::Arc};
use azalea_auth::game_profile::GameProfile;
use azalea_buf::AzaleaWrite;
use azalea_core::delta::PositionDelta8;
use azalea_core::game_type::{GameMode, OptionalGameType};
@ -21,17 +20,13 @@ use azalea_world::palette::{PalettedContainer, PalettedContainerKind};
use azalea_world::{Chunk, Instance, MinecraftEntityId, Section};
use bevy_app::App;
use bevy_ecs::{prelude::*, schedule::ExecutorKind};
use parking_lot::{Mutex, RwLock};
use parking_lot::RwLock;
use simdnbt::owned::{NbtCompound, NbtTag};
use tokio::task::JoinHandle;
use tokio::{sync::mpsc, time::sleep};
use uuid::Uuid;
use crate::connection::RawConnection;
use crate::disconnect::DisconnectEvent;
use crate::{
ClientInformation, GameProfileComponent, InConfigState, InstanceHolder, LocalPlayerBundle,
raw_connection::{RawConnection, RawConnectionReader, RawConnectionWriter},
};
use crate::{ClientInformation, InConfigState, InstanceHolder, LocalPlayerBundle};
/// A way to simulate a client in a server, used for some internal tests.
pub struct Simulation {
@ -40,16 +35,13 @@ pub struct Simulation {
// the runtime needs to be kept around for the tasks to be considered alive
pub rt: tokio::runtime::Runtime,
pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
pub clear_outgoing_packets_receiver_task: JoinHandle<!>,
}
impl Simulation {
pub fn new(initial_connection_protocol: ConnectionProtocol) -> Self {
let mut app = create_simulation_app();
let mut entity = app.world_mut().spawn_empty();
let (player, clear_outgoing_packets_receiver_task, incoming_packet_queue, rt) =
let (player, rt) =
create_local_player_bundle(entity.id(), ConnectionProtocol::Configuration);
entity.insert(player);
@ -61,13 +53,7 @@ impl Simulation {
app.world_mut().entity_mut(entity).insert(InConfigState);
tick_app(&mut app);
let mut simulation = Self {
app,
entity,
rt,
incoming_packet_queue,
clear_outgoing_packets_receiver_task,
};
let mut simulation = Self { app, entity, rt };
#[allow(clippy::single_match)]
match initial_connection_protocol {
@ -95,9 +81,11 @@ impl Simulation {
simulation
}
pub fn receive_packet<P: ProtocolPacket + Debug>(&self, packet: impl Packet<P>) {
pub fn receive_packet<P: ProtocolPacket + Debug>(&mut self, packet: impl Packet<P>) {
let buf = azalea_protocol::write::serialize_packet(&packet.into_variant()).unwrap();
self.incoming_packet_queue.lock().push(buf);
self.with_component_mut::<RawConnection>(|raw_conn| {
raw_conn.injected_clientbound_packets.push(buf.clone());
});
}
pub fn tick(&mut self) {
@ -112,6 +100,14 @@ impl Simulation {
pub fn has_component<T: Component>(&self) -> bool {
self.app.world().get::<T>(self.entity).is_some()
}
pub fn with_component_mut<T: Component>(&mut self, f: impl FnOnce(&mut T)) {
f(&mut self
.app
.world_mut()
.entity_mut(self.entity)
.get_mut::<T>()
.unwrap());
}
pub fn resource<T: Resource + Clone>(&self) -> T {
self.app.world().get_resource::<T>().unwrap().clone()
}
@ -143,70 +139,24 @@ impl Simulation {
fn create_local_player_bundle(
entity: Entity,
connection_protocol: ConnectionProtocol,
) -> (
LocalPlayerBundle,
JoinHandle<!>,
Arc<Mutex<Vec<Box<[u8]>>>>,
tokio::runtime::Runtime,
) {
) -> (LocalPlayerBundle, tokio::runtime::Runtime) {
// unused since we'll trigger ticks ourselves
let (run_schedule_sender, _run_schedule_receiver) = mpsc::channel(1);
let (outgoing_packets_sender, mut outgoing_packets_receiver) = mpsc::unbounded_channel();
let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
let reader = RawConnectionReader {
incoming_packet_queue: incoming_packet_queue.clone(),
run_schedule_sender,
};
let writer = RawConnectionWriter {
outgoing_packets_sender,
};
let rt = tokio::runtime::Runtime::new().unwrap();
// the tasks can't die since that would make us send a DisconnectEvent
let read_packets_task = rt.spawn(async {
loop {
sleep(Duration::from_secs(60)).await;
}
});
let write_packets_task = rt.spawn(async {
loop {
sleep(Duration::from_secs(60)).await;
}
});
let clear_outgoing_packets_receiver_task = rt.spawn(async move {
loop {
let _ = outgoing_packets_receiver.recv().await;
}
});
let raw_connection = RawConnection {
reader,
writer,
read_packets_task,
write_packets_task,
connection_protocol,
};
let raw_connection = RawConnection::new_networkless(connection_protocol);
let instance = Instance::default();
let instance_holder = InstanceHolder::new(entity, Arc::new(RwLock::new(instance)));
let local_player_bundle = LocalPlayerBundle {
raw_connection,
game_profile: GameProfileComponent(GameProfile::new(Uuid::nil(), "azalea".to_owned())),
client_information: ClientInformation::default(),
instance_holder,
metadata: PlayerMetadataBundle::default(),
};
(
local_player_bundle,
clear_outgoing_packets_receiver_task,
incoming_packet_queue,
rt,
)
(local_player_bundle, rt)
}
fn create_simulation_app() -> App {

View file

@ -5,5 +5,5 @@ use crate::common::client_information::ClientInformation;
#[derive(Clone, Debug, AzBuf, ServerboundGamePacket)]
pub struct ServerboundClientInformation {
pub information: ClientInformation,
pub client_information: ClientInformation,
}

View file

@ -285,6 +285,8 @@ where
buffer.get_mut().extend_from_slice(&bytes);
}
}
/// Read a packet from the stream, then if necessary decrypt it, decompress
/// it, and split it.
pub fn try_read_raw_packet<R>(
stream: &mut R,
buffer: &mut Cursor<Vec<u8>>,

View file

@ -54,6 +54,15 @@ pub async fn write_raw_packet<W>(
where
W: AsyncWrite + Unpin + Send,
{
let network_packet = encode_to_network_packet(raw_packet, compression_threshold, cipher);
stream.write_all(&network_packet).await
}
pub fn encode_to_network_packet(
raw_packet: &[u8],
compression_threshold: Option<u32>,
cipher: &mut Option<Aes128CfbEnc>,
) -> Vec<u8> {
trace!("Writing raw packet: {raw_packet:?}");
let mut raw_packet = raw_packet.to_vec();
if let Some(threshold) = compression_threshold {
@ -64,7 +73,7 @@ where
if let Some(cipher) = cipher {
azalea_crypto::encrypt_packet(cipher, &mut raw_packet);
}
stream.write_all(&raw_packet).await
raw_packet
}
pub fn compression_encoder(

View file

@ -248,7 +248,7 @@ pub fn register(commands: &mut CommandDispatcher<Mutex<CommandSource>>) {
}
}
"bevy_ecs::event::collections::Events<azalea_client::packet::game::ReceivePacketEvent>" => {
let events = ecs.resource::<Events<game::ReceivePacketEvent>>();
let events = ecs.resource::<Events<game::ReceiveGamePacketEvent>>();
writeln!(report, "- Event count: {}", events.len()).unwrap();
}
"bevy_ecs::event::collections::Events<azalea_client::chunks::ReceiveChunkEvent>" => {

View file

@ -134,7 +134,7 @@ async fn handle(bot: Client, event: azalea::Event, state: State) -> anyhow::Resu
view_distance: 32,
..Default::default()
})
.await?;
.await;
if swarm.args.pathfinder_debug_particles {
bot.ecs
.lock()

View file

@ -1,7 +1,7 @@
use std::fmt::Debug;
use std::fmt::Formatter;
use azalea_client::packet::game::ReceivePacketEvent;
use azalea_client::packet::game::ReceiveGamePacketEvent;
use azalea_client::{
Client,
inventory::{CloseContainerEvent, ContainerClickEvent, Inventory},
@ -234,7 +234,10 @@ impl ContainerHandle {
#[derive(Component, Debug)]
pub struct WaitingForInventoryOpen;
fn handle_menu_opened_event(mut commands: Commands, mut events: EventReader<ReceivePacketEvent>) {
fn handle_menu_opened_event(
mut commands: Commands,
mut events: EventReader<ReceiveGamePacketEvent>,
) {
for event in events.read() {
if let ClientboundGamePacket::ContainerSetContent { .. } = event.packet.as_ref() {
commands

View file

@ -495,7 +495,8 @@ where
let Some(first_bot_state) = first_bot.query::<Option<&S>>(&mut ecs).cloned() else {
error!(
"the first bot ({} / {}) is missing the required state component! none of the client handler functions will be called.",
first_bot.profile.name, first_bot.entity
first_bot.username(),
first_bot.entity
);
continue;
};
@ -513,7 +514,8 @@ where
let Some(state) = bot.query::<Option<&S>>(&mut ecs).cloned() else {
error!(
"one of our bots ({} / {}) is missing the required state component! its client handler function will not be called.",
bot.profile.name, bot.entity
bot.username(),
bot.entity
);
continue;
};