1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00
This commit is contained in:
mat 2025-04-11 17:31:23 -10:00
parent 16811560d8
commit 5557747a97
9 changed files with 162 additions and 140 deletions

View file

@ -26,11 +26,8 @@ use azalea_protocol::{
packets::{
self, ClientIntention, ConnectionProtocol, PROTOCOL_VERSION, Packet,
game::{self, ServerboundGamePacket},
handshake::{
ClientboundHandshakePacket, ServerboundHandshakePacket,
s_intention::ServerboundIntention,
},
login::{ClientboundLoginPacket, ServerboundLoginPacket, s_hello::ServerboundHello},
handshake::s_intention::ServerboundIntention,
login::s_hello::ServerboundHello,
},
resolver,
};
@ -276,6 +273,7 @@ impl Client {
let mut entity_mut = ecs.entity_mut(entity);
entity_mut.insert((
InLoginState,
// add the Account to the entity now so plugins can access it earlier
account.to_owned(),
// localentity is always present for our clients, even if we're not actually logged
@ -291,12 +289,21 @@ impl Client {
entity
};
let conn = if let Some(proxy) = proxy {
let mut conn = if let Some(proxy) = proxy {
Connection::new_with_proxy(resolved_address, proxy).await?
} else {
Connection::new(resolved_address).await?
};
let conn = Self::handshake(ecs_lock.clone(), entity, conn, account, address).await?;
debug!("Created connection to {resolved_address:?}");
conn.write(ServerboundIntention {
protocol_version: PROTOCOL_VERSION,
hostname: address.host.clone(),
port: address.port,
intention: ClientIntention::Login,
})
.await?;
let conn = conn.login();
let (read_conn, write_conn) = conn.into_split();
let (read_conn, write_conn) = (read_conn.raw, write_conn.raw);
@ -326,38 +333,9 @@ impl Client {
instance_holder,
metadata: azalea_entity::metadata::PlayerMetadataBundle::default(),
},
InConfigState,
// this component is never removed
LocalEntity,
));
}
let client = Client::new(entity, ecs_lock.clone(), run_schedule_sender.clone());
Ok(client)
}
/// Do a handshake with the server and get to the game state from the
/// initial handshake state.
///
/// This will also automatically refresh the account's access token if
/// it's expired.
pub async fn handshake(
ecs_lock: Arc<Mutex<World>>,
entity: Entity,
mut conn: Connection<ClientboundHandshakePacket, ServerboundHandshakePacket>,
account: &Account,
address: &ServerAddress,
) -> Result<Connection<ClientboundLoginPacket, ServerboundLoginPacket>, JoinError> {
// handshake
conn.write(ServerboundIntention {
protocol_version: PROTOCOL_VERSION,
hostname: address.host.clone(),
port: address.port,
intention: ClientIntention::Login,
})
.await?;
let conn = conn.login();
as_system::<Commands>(&mut ecs_lock.lock(), |mut commands| {
commands.entity(entity).insert((
crate::packet::login::IgnoreQueryIds::default(),
@ -374,7 +352,8 @@ impl Client {
))
});
Ok(conn)
let client = Client::new(entity, ecs_lock.clone(), run_schedule_sender.clone());
Ok(client)
}
/// Write a packet directly to the server.

View file

@ -19,7 +19,7 @@ use tokio::{
net::tcp::OwnedWriteHalf,
sync::mpsc::{self},
};
use tracing::{debug, error};
use tracing::{debug, error, trace};
use super::packet::{
config::ReceiveConfigPacketEvent, game::ReceiveGamePacketEvent, login::ReceiveLoginPacketEvent,
@ -35,63 +35,59 @@ impl Plugin for ConnectionPlugin {
pub fn read_packets(ecs: &mut World) {
// receive_game_packet_events: EventWriter<ReceiveGamePacketEvent>,
let mut query = ecs.query::<(Entity, &mut RawConnection)>();
let mut entity_and_conn_query = ecs.query::<(Entity, &mut RawConnection)>();
let mut conn_query = ecs.query::<&mut RawConnection>();
let mut entities_handling_packets = Vec::new();
let mut entities_with_injected_packets = Vec::new();
for (entity, mut raw_conn) in query.iter_mut(ecs) {
let state = raw_conn.state;
for (entity, mut raw_conn) in entity_and_conn_query.iter_mut(ecs) {
if !raw_conn.injected_clientbound_packets.is_empty() {
entities_with_injected_packets.push((
entity,
state,
mem::take(&mut raw_conn.injected_clientbound_packets),
));
}
let Some(net_conn) = raw_conn.network.take() else {
// means it's a networkless connection
if raw_conn.network.is_none() {
// no network connection, don't bother with the normal packet handling
continue;
};
entities_handling_packets.push((entity, state, net_conn));
}
entities_handling_packets.push(entity);
}
let mut queued_packet_events = QueuedPacketEvents::default();
// handle injected packets, see the comment on
// RawConnection::injected_clientbound_packets for more info
for (entity, mut state, raw_packets) in entities_with_injected_packets {
for (entity, raw_packets) in entities_with_injected_packets {
for raw_packet in raw_packets {
handle_raw_packet(
ecs,
&raw_packet,
entity,
&mut state,
None,
&mut queued_packet_events,
)
.unwrap();
let conn = conn_query.get(ecs, entity).unwrap();
let state = conn.state;
trace!("Received injected packet with bytes: {raw_packet:?}");
handle_raw_packet(ecs, &raw_packet, entity, state, &mut queued_packet_events).unwrap();
// update the state and for the client
let (_, mut raw_conn_component) = query.get_mut(ecs, entity).unwrap();
let (_, mut raw_conn_component) = entity_and_conn_query.get_mut(ecs, entity).unwrap();
raw_conn_component.state = state;
}
}
// we pass the mutable state and net_conn into the handlers so they're allowed
// to mutate it
for (entity, mut state, mut net_conn) in entities_handling_packets {
for entity in entities_handling_packets {
loop {
match net_conn.reader.try_read() {
let mut conn = conn_query.get_mut(ecs, entity).unwrap();
let net_conn = conn.net_conn().unwrap();
let read_res = net_conn.reader.try_read();
let state = conn.state;
match read_res {
Ok(Some(raw_packet)) => {
let raw_packet = Arc::<[u8]>::from(raw_packet);
if let Err(e) = handle_raw_packet(
ecs,
&raw_packet,
entity,
&mut state,
Some(&mut net_conn),
state,
&mut queued_packet_events,
) {
error!("Error reading packet: {e}");
@ -108,14 +104,12 @@ pub fn read_packets(ecs: &mut World) {
}
}
let mut net_conn = conn_query.get_mut(ecs, entity).unwrap();
if let Some(net_conn) = &mut net_conn.network {
// this needs to be done at some point every update, so we do it here right
// after the handlers are called
net_conn.poll_writer();
// update the state and network connections for the client
let (_, mut raw_conn_component) = query.get_mut(ecs, entity).unwrap();
raw_conn_component.state = state;
raw_conn_component.network = Some(net_conn);
}
}
queued_packet_events.send_events(ecs);
@ -217,9 +211,11 @@ impl RawConnection {
packet: impl Packet<P>,
) -> Result<(), WritePacketError> {
if let Some(network) = &mut self.network {
let packet = packet.into_variant();
let raw_packet = serialize_packet(&packet)?;
network.write_raw(&raw_packet)?;
network.write(packet)?;
} else {
debug!(
"tried to write packet to the network but there is no NetworkConnection. if you're trying to send a packet from the handler function, use self.write instead"
);
}
Ok(())
}
@ -233,8 +229,7 @@ pub fn handle_raw_packet(
ecs: &mut World,
raw_packet: &[u8],
entity: Entity,
state: &mut ConnectionProtocol,
net_conn: Option<&mut NetworkConnection>,
state: ConnectionProtocol,
queued_packet_events: &mut QueuedPacketEvents,
) -> Result<(), Box<ReadPacketError>> {
let stream = &mut Cursor::new(raw_packet);
@ -244,6 +239,7 @@ pub fn handle_raw_packet(
}
ConnectionProtocol::Game => {
let packet = Arc::new(deserialize_packet::<ClientboundGamePacket>(stream)?);
trace!("Packet: {packet:?}");
game::process_packet(ecs, entity, packet.as_ref());
queued_packet_events
.game
@ -254,13 +250,15 @@ pub fn handle_raw_packet(
}
ConnectionProtocol::Login => {
let packet = Arc::new(deserialize_packet::<ClientboundLoginPacket>(stream)?);
login::process_packet(ecs, entity, &packet, state, net_conn);
trace!("Packet: {packet:?}");
login::process_packet(ecs, entity, &packet);
queued_packet_events
.login
.push(ReceiveLoginPacketEvent { entity, packet });
}
ConnectionProtocol::Configuration => {
let packet = Arc::new(deserialize_packet::<ClientboundConfigPacket>(stream)?);
trace!("Packet: {packet:?}");
config::process_packet(ecs, entity, &packet);
queued_packet_events
.config
@ -283,6 +281,17 @@ pub struct NetworkConnection {
network_packet_writer_tx: mpsc::UnboundedSender<Box<[u8]>>,
}
impl NetworkConnection {
pub fn write<P: ProtocolPacket + Debug>(
&mut self,
packet: impl Packet<P>,
) -> Result<(), WritePacketError> {
let packet = packet.into_variant();
let raw_packet = serialize_packet(&packet)?;
self.write_raw(&raw_packet)?;
Ok(())
}
pub fn write_raw(&mut self, raw_packet: &[u8]) -> Result<(), WritePacketError> {
let network_packet = azalea_protocol::write::encode_to_network_packet(
raw_packet,
@ -299,11 +308,13 @@ impl NetworkConnection {
}
pub fn set_compression_threshold(&mut self, threshold: Option<u32>) {
trace!("Set compression threshold to {threshold:?}");
self.reader.compression_threshold = threshold;
}
/// Set the encryption key that is used to encrypt and decrypt packets. It's
/// the same for both reading and writing.
pub fn set_encryption_key(&mut self, key: [u8; 16]) {
trace!("Enabled protocol encryption");
let (enc_cipher, dec_cipher) = azalea_crypto::create_cipher(&key);
self.reader.dec_cipher = Some(dec_cipher);
self.enc_cipher = Some(enc_cipher);
@ -315,11 +326,13 @@ async fn write_task(
mut write_half: OwnedWriteHalf,
) {
while let Some(network_packet) = network_packet_writer_rx.recv().await {
trace!("writing encoded raw packet");
if let Err(e) = write_half.write_all(&network_packet).await {
debug!("Error writing packet to server: {e}");
break;
};
}
trace!("write task is done");
}
#[derive(Error, Debug)]

View file

@ -3,7 +3,7 @@ use azalea_protocol::packets::login::{ClientboundHello, ServerboundKey};
use bevy_app::prelude::*;
use bevy_ecs::prelude::*;
use bevy_tasks::{IoTaskPool, Task, futures_lite::future};
use tracing::error;
use tracing::{debug, error};
use super::{connection::RawConnection, packet::login::ReceiveHelloEvent};
use crate::{Account, JoinError};
@ -33,6 +33,7 @@ fn poll_auth_task(
) {
for (entity, mut auth_task, mut raw_conn) in query.iter_mut() {
if let Some(poll_res) = future::block_on(future::poll_once(&mut auth_task.0)) {
debug!("Finished auth");
commands.entity(entity).remove::<AuthTask>();
match poll_res {
Ok((packet, private_key)) => {

View file

@ -44,7 +44,7 @@ pub fn handle_outgoing_packets_observer(
);
return;
}
debug!("Sending packet: {:?}", event.packet);
debug!("Sending config packet: {:?}", event.packet);
if let Err(e) = raw_conn.write(event.packet.clone()) {
error!("Failed to send packet: {e}");
}

View file

@ -70,7 +70,7 @@ pub fn handle_outgoing_packets_observer(
return;
}
// debug!("Sending packet: {:?}", event.packet);
// debug!("Sending game packet: {:?}", event.packet);
if let Err(e) = raw_connection.write(event.packet.clone()) {
error!("Failed to send packet: {e}");
}

View file

@ -1,9 +1,14 @@
use std::sync::Arc;
use azalea_protocol::packets::login::{ClientboundHello, ClientboundLoginPacket};
use azalea_protocol::packets::{
Packet,
login::{ClientboundHello, ClientboundLoginPacket, ServerboundLoginPacket},
};
use bevy_ecs::prelude::*;
use tracing::{debug, error};
use crate::Account;
use super::InLoginState;
use crate::{Account, connection::RawConnection};
#[derive(Event, Debug, Clone)]
pub struct ReceiveLoginPacketEvent {
@ -18,3 +23,49 @@ pub struct ReceiveHelloEvent {
pub account: Account,
pub packet: ClientboundHello,
}
/// Event for sending a login packet to the server.
#[derive(Event, Clone)]
pub struct SendLoginPacketEvent {
pub sent_by: Entity,
pub packet: ServerboundLoginPacket,
}
impl SendLoginPacketEvent {
pub fn new(entity: Entity, packet: impl Packet<ServerboundLoginPacket>) -> Self {
let packet = packet.into_variant();
Self {
sent_by: entity,
packet,
}
}
}
pub fn handle_outgoing_packets_observer(
trigger: Trigger<SendLoginPacketEvent>,
mut query: Query<(&mut RawConnection, Option<&InLoginState>)>,
) {
let event = trigger.event();
if let Ok((mut raw_conn, in_login_state)) = query.get_mut(event.sent_by) {
if in_login_state.is_none() {
error!(
"Tried to send a login packet {:?} while not in login state",
event.packet
);
return;
}
debug!("Sending login packet: {:?}", event.packet);
if let Err(e) = raw_conn.write(event.packet.clone()) {
error!("Failed to send packet: {e}");
}
}
}
/// A system that converts [`SendLoginPacketEvent`] events into triggers so
/// they get received by [`handle_outgoing_packets_observer`].
pub fn handle_outgoing_packets(
mut commands: Commands,
mut events: EventReader<SendLoginPacketEvent>,
) {
for event in events.read() {
commands.trigger(event.clone());
}
}

View file

@ -6,12 +6,12 @@ mod events;
use std::collections::HashSet;
use azalea_protocol::packets::{
ConnectionProtocol, Packet,
ConnectionProtocol,
login::{
ClientboundCookieRequest, ClientboundCustomQuery, ClientboundHello,
ClientboundLoginCompression, ClientboundLoginDisconnect, ClientboundLoginFinished,
ClientboundLoginPacket, ServerboundCookieResponse, ServerboundCustomQueryAnswer,
ServerboundLoginAcknowledged, ServerboundLoginPacket,
ServerboundLoginAcknowledged,
},
};
use bevy_ecs::prelude::*;
@ -21,23 +21,12 @@ use tracing::{debug, error};
use super::as_system;
use crate::{
Account, GameProfileComponent, InConfigState, connection::NetworkConnection,
Account, GameProfileComponent, InConfigState, connection::RawConnection,
declare_packet_handlers, disconnect::DisconnectEvent,
};
pub fn process_packet(
ecs: &mut World,
player: Entity,
packet: &ClientboundLoginPacket,
state: &mut ConnectionProtocol,
net_conn: Option<&mut NetworkConnection>,
) {
let mut handler = LoginPacketHandler {
player,
ecs,
state,
net_conn,
};
pub fn process_packet(ecs: &mut World, player: Entity, packet: &ClientboundLoginPacket) {
let mut handler = LoginPacketHandler { player, ecs };
declare_packet_handlers!(
ClientboundLoginPacket,
@ -54,19 +43,6 @@ pub fn process_packet(
);
}
/// Event for sending a login packet to the server.
#[derive(Event)]
pub struct SendLoginPacketEvent {
pub entity: Entity,
pub packet: ServerboundLoginPacket,
}
impl SendLoginPacketEvent {
pub fn new(entity: Entity, packet: impl Packet<ServerboundLoginPacket>) -> Self {
let packet = packet.into_variant();
Self { entity, packet }
}
}
/// A marker component for local players that are currently in the
/// `login` state.
#[derive(Component, Clone, Debug)]
@ -80,8 +56,6 @@ pub struct IgnoreQueryIds(HashSet<u32>);
pub struct LoginPacketHandler<'a> {
pub ecs: &'a mut World,
pub player: Entity,
pub state: &'a mut ConnectionProtocol,
pub net_conn: Option<&'a mut NetworkConnection>,
}
impl LoginPacketHandler<'_> {
pub fn hello(&mut self, p: &ClientboundHello) {
@ -115,11 +89,13 @@ impl LoginPacketHandler<'_> {
}
pub fn login_finished(&mut self, p: &ClientboundLoginFinished) {
debug!(
"Got profile {:?}. handshake is finished and we're now switching to the configuration state",
"Got profile {:?}. login is finished and we're now switching to the config state",
p.game_profile
);
as_system::<Commands>(self.ecs, |mut commands| {
as_system::<(Commands, Query<&mut RawConnection>)>(
self.ecs,
|(mut commands, mut query)| {
commands.trigger(SendLoginPacketEvent::new(
self.player,
ServerboundLoginAcknowledged,
@ -131,16 +107,25 @@ impl LoginPacketHandler<'_> {
.remove::<InLoginState>()
.insert(InConfigState)
.insert(GameProfileComponent(p.game_profile.clone()));
});
// break (conn.config(), p.game_profile);
let mut conn = query
.get_mut(self.player)
.expect("RawConnection component should be present when receiving packets");
conn.state = ConnectionProtocol::Configuration;
},
);
}
pub fn login_compression(&mut self, p: &ClientboundLoginCompression) {
debug!("Got compression request {p:?}");
if let Some(net_conn) = &mut self.net_conn {
as_system::<Query<&mut RawConnection>>(self.ecs, |mut query| {
let mut conn = query
.get_mut(self.player)
.expect("RawConnection component should be present when receiving packets");
if let Some(net_conn) = &mut conn.net_conn() {
net_conn.set_compression_threshold(Some(p.compression_threshold as u32));
}
})
}
pub fn custom_query(&mut self, p: &ClientboundCustomQuery) {
debug!("Got custom query {p:?}");

View file

@ -35,12 +35,14 @@ impl Plugin for PacketPlugin {
fn build(&self, app: &mut App) {
app.add_observer(game::handle_outgoing_packets_observer)
.add_observer(config::handle_outgoing_packets_observer)
.add_observer(login::handle_outgoing_packets_observer)
.add_systems(
Update,
(
(
config::handle_outgoing_packets,
game::handle_outgoing_packets,
login::handle_outgoing_packets,
)
.chain(),
death_event_on_0_health.before(death_listener),

View file

@ -1,9 +0,0 @@
use azalea_buf::{AzBuf, UnsizedByteArray};
use azalea_protocol_macros::ServerboundLoginPacket;
#[derive(Clone, Debug, AzBuf, ServerboundLoginPacket)]
pub struct ServerboundCustomQuery {
#[var]
pub transaction_id: u32,
pub data: Option<UnsizedByteArray>,
}