From 0a83dc73b4c06b9300b8e16f8a30d512374262cd Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 16 Jul 2023 05:50:02 -0500 Subject: [PATCH] add try_read to connection --- Cargo.lock | 1 + azalea-protocol/Cargo.toml | 1 + azalea-protocol/src/connect.rs | 19 +++++++++- azalea-protocol/src/read.rs | 65 ++++++++++++++++++++++++++++++---- 4 files changed, 79 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3a9ab65..2a33f8ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -463,6 +463,7 @@ dependencies = [ "bytes", "flate2", "futures", + "futures-lite", "futures-util", "log", "once_cell", diff --git a/azalea-protocol/Cargo.toml b/azalea-protocol/Cargo.toml index 50e6bbae..f1682e8b 100644 --- a/azalea-protocol/Cargo.toml +++ b/azalea-protocol/Cargo.toml @@ -38,6 +38,7 @@ byteorder = "^1.4.3" bytes = "^1.4.0" flate2 = "1.0.26" futures = "0.3.28" +futures-lite = "1.13.0" futures-util = "0.3.28" log = "0.4.19" serde = { version = "^1.0", features = ["serde_derive"] } diff --git a/azalea-protocol/src/connect.rs b/azalea-protocol/src/connect.rs index cb837ba5..5df1d874 100755 --- a/azalea-protocol/src/connect.rs +++ b/azalea-protocol/src/connect.rs @@ -6,7 +6,7 @@ use crate::packets::login::clientbound_hello_packet::ClientboundHelloPacket; use crate::packets::login::{ClientboundLoginPacket, ServerboundLoginPacket}; use crate::packets::status::{ClientboundStatusPacket, ServerboundStatusPacket}; use crate::packets::ProtocolPacket; -use crate::read::{read_packet, ReadPacketError}; +use crate::read::{read_packet, try_read_packet, ReadPacketError}; use crate::write::write_packet; use azalea_auth::game_profile::GameProfile; use azalea_auth::sessionserver::{ClientSessionServerError, ServerSessionServerError}; @@ -140,6 +140,17 @@ where ) .await } + + /// Try to read a packet from the stream, or return Ok(None) if there's no + /// packet. + pub fn try_read(&mut self) -> Result, Box> { + try_read_packet::( + &mut self.read_stream, + &mut self.buffer, + self.compression_threshold, + &mut self.dec_cipher, + ) + } } impl WriteConnection where @@ -183,6 +194,12 @@ where self.reader.read().await } + /// Try to read a packet from the other side of the connection, or return + /// Ok(None) if there's no packet to read. + pub fn try_read(&mut self) -> Result, Box> { + self.reader.try_read() + } + /// Write a packet to the other side of the connection. pub async fn write(&mut self, packet: W) -> std::io::Result<()> { self.writer.write(packet).await diff --git a/azalea-protocol/src/read.rs b/azalea-protocol/src/read.rs index d92897b9..bffb22bd 100755 --- a/azalea-protocol/src/read.rs +++ b/azalea-protocol/src/read.rs @@ -8,6 +8,7 @@ use bytes::Buf; use bytes::BytesMut; use flate2::read::ZlibDecoder; use futures::StreamExt; +use futures_lite::future; use log::{log_enabled, trace}; use std::backtrace::Backtrace; use std::{ @@ -204,6 +205,8 @@ pub fn compression_decoder( /// same frame, so we need to store the packet data that's left to read. /// /// The current protocol state must be passed as a generic. +/// +/// For the non-waiting version, see [`try_read_packet`]. pub async fn read_packet<'a, P: ProtocolPacket + Debug, R>( stream: &'a mut R, buffer: &mut BytesMut, @@ -214,12 +217,10 @@ where R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync, { let mut framed = FramedRead::new(stream, BytesCodec::new()); - let mut buf = loop { - if let Some(buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? { + loop { + if let Some(buf) = try_process_buffer::(buffer, compression_threshold)? { // we got a full packet!! - break buf; - } else { - // no full packet yet :( keep reading + return Ok(buf); }; // if we were given a cipher, decrypt the packet @@ -234,6 +235,58 @@ where } else { return Err(Box::new(ReadPacketError::ConnectionClosed)); }; + } +} + +/// Try to read a single packet from a stream. Returns None if we haven't +/// received a full packet yet. +pub fn try_read_packet( + stream: &mut R, + buffer: &mut BytesMut, + compression_threshold: Option, + cipher: &mut Option, +) -> Result, Box> +where + R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync, +{ + let mut framed = FramedRead::new(stream, BytesCodec::new()); + loop { + if let Some(buf) = try_process_buffer::(buffer, compression_threshold)? { + // we got a full packet!! + return Ok(Some(buf)); + }; + + // if we were given a cipher, decrypt the packet + if let Some(message) = future::block_on(future::poll_once(framed.next())) { + if let Some(message) = message { + let mut bytes = message.map_err(ReadPacketError::from)?; + + if let Some(cipher) = cipher { + azalea_crypto::decrypt_packet(cipher, &mut bytes); + } + + buffer.extend_from_slice(&bytes); + } else { + return Err(Box::new(ReadPacketError::ConnectionClosed)); + } + } else { + return Ok(None); + }; + } +} + +/// Try to get a Minecraft packet from a buffer. Returns None if the packet +/// isn't complete yet. +pub fn try_process_buffer( + buffer: &mut BytesMut, + compression_threshold: Option, +) -> Result, Box> +where + R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync, +{ + let Some(mut buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? else { + // no full packet yet :( + return Ok(None); }; if let Some(compression_threshold) = compression_threshold { @@ -255,5 +308,5 @@ where let packet = packet_decoder(&mut Cursor::new(&buf[..]))?; - Ok(packet) + Ok(Some(packet)) }