From 21acf4c84687eb40cb52746bdf40c2bbe9ab325a Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 23 Feb 2025 09:05:20 +0000 Subject: [PATCH] try to receive packets all at once before sending run_schedule_sender --- azalea-client/src/raw_connection.rs | 30 +++++++++++++++++++++++------ azalea-protocol/src/connect.rs | 18 ++++++++--------- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/azalea-client/src/raw_connection.rs b/azalea-client/src/raw_connection.rs index e3f35282..4a6587be 100644 --- a/azalea-client/src/raw_connection.rs +++ b/azalea-client/src/raw_connection.rs @@ -133,21 +133,39 @@ impl RawConnectionReader { /// Loop that reads from the connection and adds the packets to the queue + /// runs the schedule. pub async fn read_task(self, mut read_conn: RawReadConnection) { + fn log_for_error(error: &ReadPacketError) { + if !matches!(*error, ReadPacketError::ConnectionClosed) { + error!("Error reading packet from Client: {error:?}"); + } + } + loop { match read_conn.read().await { Ok(raw_packet) => { - self.incoming_packet_queue.lock().push(raw_packet); + let mut incoming_packet_queue = self.incoming_packet_queue.lock(); + + incoming_packet_queue.push(raw_packet); + loop { + let raw_packet = match read_conn.try_read() { + Ok(p) => p, + Err(err) => { + log_for_error(&err); + return; + } + }; + let Some(raw_packet) = raw_packet else { break }; + incoming_packet_queue.push(raw_packet); + } + // tell the client to run all the systems if self.run_schedule_sender.send(()).is_err() { // the client was dropped break; } } - Err(error) => { - if !matches!(*error, ReadPacketError::ConnectionClosed) { - error!("Error reading packet from Client: {error:?}"); - } - break; + Err(err) => { + log_for_error(&err); + return; } } } diff --git a/azalea-protocol/src/connect.rs b/azalea-protocol/src/connect.rs index 6ac14e0b..99a8a6b4 100755 --- a/azalea-protocol/src/connect.rs +++ b/azalea-protocol/src/connect.rs @@ -1,7 +1,7 @@ //! Connect to remote servers/clients. use std::fmt::Debug; -use std::io::Cursor; +use std::io::{self, Cursor}; use std::marker::PhantomData; use std::net::SocketAddr; @@ -155,7 +155,7 @@ impl RawReadConnection { } impl RawWriteConnection { - pub async fn write(&mut self, packet: &[u8]) -> std::io::Result<()> { + pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> { if let Err(e) = write_raw_packet( packet, &mut self.write_stream, @@ -165,7 +165,7 @@ impl RawWriteConnection { .await { // detect broken pipe - if e.kind() == std::io::ErrorKind::BrokenPipe { + if e.kind() == io::ErrorKind::BrokenPipe { info!("Broken pipe, shutting down connection."); if let Err(e) = self.shutdown().await { error!("Couldn't shut down: {}", e); @@ -177,7 +177,7 @@ impl RawWriteConnection { } /// End the connection. - pub async fn shutdown(&mut self) -> std::io::Result<()> { + pub async fn shutdown(&mut self) -> io::Result<()> { self.write_stream.shutdown().await } } @@ -206,12 +206,12 @@ where W: ProtocolPacket + Debug, { /// Write a packet to the server. - pub async fn write(&mut self, packet: W) -> std::io::Result<()> { + pub async fn write(&mut self, packet: W) -> io::Result<()> { self.raw.write(&serialize_packet(&packet).unwrap()).await } /// End the connection. - pub async fn shutdown(&mut self) -> std::io::Result<()> { + pub async fn shutdown(&mut self) -> io::Result<()> { self.raw.shutdown().await } } @@ -233,7 +233,7 @@ where } /// Write a packet to the other side of the connection. - pub async fn write(&mut self, packet: impl crate::packets::Packet) -> std::io::Result<()> { + pub async fn write(&mut self, packet: impl crate::packets::Packet) -> io::Result<()> { let packet = packet.into_variant(); self.writer.write(packet).await } @@ -248,7 +248,7 @@ where #[derive(Error, Debug)] pub enum ConnectionError { #[error("{0}")] - Io(#[from] std::io::Error), + Io(#[from] io::Error), } use socks5_impl::protocol::UserKey; @@ -287,7 +287,7 @@ impl Connection { let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth) .await - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; Self::new_from_stream(stream.into_inner()).await }