1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

try to receive packets all at once before sending run_schedule_sender

This commit is contained in:
mat 2025-02-23 09:05:20 +00:00
parent dd557c8f29
commit 21acf4c846
2 changed files with 33 additions and 15 deletions

View file

@ -133,21 +133,39 @@ impl RawConnectionReader {
/// Loop that reads from the connection and adds the packets to the queue + /// Loop that reads from the connection and adds the packets to the queue +
/// runs the schedule. /// runs the schedule.
pub async fn read_task(self, mut read_conn: RawReadConnection) { pub async fn read_task(self, mut read_conn: RawReadConnection) {
fn log_for_error(error: &ReadPacketError) {
if !matches!(*error, ReadPacketError::ConnectionClosed) {
error!("Error reading packet from Client: {error:?}");
}
}
loop { loop {
match read_conn.read().await { match read_conn.read().await {
Ok(raw_packet) => { Ok(raw_packet) => {
self.incoming_packet_queue.lock().push(raw_packet); let mut incoming_packet_queue = self.incoming_packet_queue.lock();
incoming_packet_queue.push(raw_packet);
loop {
let raw_packet = match read_conn.try_read() {
Ok(p) => p,
Err(err) => {
log_for_error(&err);
return;
}
};
let Some(raw_packet) = raw_packet else { break };
incoming_packet_queue.push(raw_packet);
}
// tell the client to run all the systems // tell the client to run all the systems
if self.run_schedule_sender.send(()).is_err() { if self.run_schedule_sender.send(()).is_err() {
// the client was dropped // the client was dropped
break; break;
} }
} }
Err(error) => { Err(err) => {
if !matches!(*error, ReadPacketError::ConnectionClosed) { log_for_error(&err);
error!("Error reading packet from Client: {error:?}"); return;
}
break;
} }
} }
} }

View file

@ -1,7 +1,7 @@
//! Connect to remote servers/clients. //! Connect to remote servers/clients.
use std::fmt::Debug; use std::fmt::Debug;
use std::io::Cursor; use std::io::{self, Cursor};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
@ -155,7 +155,7 @@ impl RawReadConnection {
} }
impl RawWriteConnection { impl RawWriteConnection {
pub async fn write(&mut self, packet: &[u8]) -> std::io::Result<()> { pub async fn write(&mut self, packet: &[u8]) -> io::Result<()> {
if let Err(e) = write_raw_packet( if let Err(e) = write_raw_packet(
packet, packet,
&mut self.write_stream, &mut self.write_stream,
@ -165,7 +165,7 @@ impl RawWriteConnection {
.await .await
{ {
// detect broken pipe // detect broken pipe
if e.kind() == std::io::ErrorKind::BrokenPipe { if e.kind() == io::ErrorKind::BrokenPipe {
info!("Broken pipe, shutting down connection."); info!("Broken pipe, shutting down connection.");
if let Err(e) = self.shutdown().await { if let Err(e) = self.shutdown().await {
error!("Couldn't shut down: {}", e); error!("Couldn't shut down: {}", e);
@ -177,7 +177,7 @@ impl RawWriteConnection {
} }
/// End the connection. /// End the connection.
pub async fn shutdown(&mut self) -> std::io::Result<()> { pub async fn shutdown(&mut self) -> io::Result<()> {
self.write_stream.shutdown().await self.write_stream.shutdown().await
} }
} }
@ -206,12 +206,12 @@ where
W: ProtocolPacket + Debug, W: ProtocolPacket + Debug,
{ {
/// Write a packet to the server. /// Write a packet to the server.
pub async fn write(&mut self, packet: W) -> std::io::Result<()> { pub async fn write(&mut self, packet: W) -> io::Result<()> {
self.raw.write(&serialize_packet(&packet).unwrap()).await self.raw.write(&serialize_packet(&packet).unwrap()).await
} }
/// End the connection. /// End the connection.
pub async fn shutdown(&mut self) -> std::io::Result<()> { pub async fn shutdown(&mut self) -> io::Result<()> {
self.raw.shutdown().await self.raw.shutdown().await
} }
} }
@ -233,7 +233,7 @@ where
} }
/// Write a packet to the other side of the connection. /// Write a packet to the other side of the connection.
pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> std::io::Result<()> { pub async fn write(&mut self, packet: impl crate::packets::Packet<W>) -> io::Result<()> {
let packet = packet.into_variant(); let packet = packet.into_variant();
self.writer.write(packet).await self.writer.write(packet).await
} }
@ -248,7 +248,7 @@ where
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum ConnectionError { pub enum ConnectionError {
#[error("{0}")] #[error("{0}")]
Io(#[from] std::io::Error), Io(#[from] io::Error),
} }
use socks5_impl::protocol::UserKey; use socks5_impl::protocol::UserKey;
@ -287,7 +287,7 @@ impl Connection<ClientboundHandshakePacket, ServerboundHandshakePacket> {
let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth) let _ = socks5_impl::client::connect(&mut stream, address, proxy.auth)
.await .await
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
Self::new_from_stream(stream.into_inner()).await Self::new_from_stream(stream.into_inner()).await
} }