1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 06:16:04 +00:00

add try_read to connection

This commit is contained in:
mat 2023-07-16 05:50:02 -05:00
parent 509c154b4d
commit 0a83dc73b4
4 changed files with 79 additions and 7 deletions

1
Cargo.lock generated
View file

@ -463,6 +463,7 @@ dependencies = [
"bytes",
"flate2",
"futures",
"futures-lite",
"futures-util",
"log",
"once_cell",

View file

@ -38,6 +38,7 @@ byteorder = "^1.4.3"
bytes = "^1.4.0"
flate2 = "1.0.26"
futures = "0.3.28"
futures-lite = "1.13.0"
futures-util = "0.3.28"
log = "0.4.19"
serde = { version = "^1.0", features = ["serde_derive"] }

View file

@ -6,7 +6,7 @@ use crate::packets::login::clientbound_hello_packet::ClientboundHelloPacket;
use crate::packets::login::{ClientboundLoginPacket, ServerboundLoginPacket};
use crate::packets::status::{ClientboundStatusPacket, ServerboundStatusPacket};
use crate::packets::ProtocolPacket;
use crate::read::{read_packet, ReadPacketError};
use crate::read::{read_packet, try_read_packet, ReadPacketError};
use crate::write::write_packet;
use azalea_auth::game_profile::GameProfile;
use azalea_auth::sessionserver::{ClientSessionServerError, ServerSessionServerError};
@ -140,6 +140,17 @@ where
)
.await
}
/// Try to read a packet from the stream, or return Ok(None) if there's no
/// packet.
pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
try_read_packet::<R, _>(
&mut self.read_stream,
&mut self.buffer,
self.compression_threshold,
&mut self.dec_cipher,
)
}
}
impl<W> WriteConnection<W>
where
@ -183,6 +194,12 @@ where
self.reader.read().await
}
/// Try to read a packet from the other side of the connection, or return
/// Ok(None) if there's no packet to read.
pub fn try_read(&mut self) -> Result<Option<R>, Box<ReadPacketError>> {
self.reader.try_read()
}
/// Write a packet to the other side of the connection.
pub async fn write(&mut self, packet: W) -> std::io::Result<()> {
self.writer.write(packet).await

View file

@ -8,6 +8,7 @@ use bytes::Buf;
use bytes::BytesMut;
use flate2::read::ZlibDecoder;
use futures::StreamExt;
use futures_lite::future;
use log::{log_enabled, trace};
use std::backtrace::Backtrace;
use std::{
@ -204,6 +205,8 @@ pub fn compression_decoder(
/// same frame, so we need to store the packet data that's left to read.
///
/// The current protocol state must be passed as a generic.
///
/// For the non-waiting version, see [`try_read_packet`].
pub async fn read_packet<'a, P: ProtocolPacket + Debug, R>(
stream: &'a mut R,
buffer: &mut BytesMut,
@ -214,12 +217,10 @@ where
R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync,
{
let mut framed = FramedRead::new(stream, BytesCodec::new());
let mut buf = loop {
if let Some(buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? {
loop {
if let Some(buf) = try_process_buffer::<P, R>(buffer, compression_threshold)? {
// we got a full packet!!
break buf;
} else {
// no full packet yet :( keep reading
return Ok(buf);
};
// if we were given a cipher, decrypt the packet
@ -234,6 +235,58 @@ where
} else {
return Err(Box::new(ReadPacketError::ConnectionClosed));
};
}
}
/// Try to read a single packet from a stream. Returns None if we haven't
/// received a full packet yet.
pub fn try_read_packet<P: ProtocolPacket + Debug, R>(
stream: &mut R,
buffer: &mut BytesMut,
compression_threshold: Option<u32>,
cipher: &mut Option<Aes128CfbDec>,
) -> Result<Option<P>, Box<ReadPacketError>>
where
R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync,
{
let mut framed = FramedRead::new(stream, BytesCodec::new());
loop {
if let Some(buf) = try_process_buffer::<P, R>(buffer, compression_threshold)? {
// we got a full packet!!
return Ok(Some(buf));
};
// if we were given a cipher, decrypt the packet
if let Some(message) = future::block_on(future::poll_once(framed.next())) {
if let Some(message) = message {
let mut bytes = message.map_err(ReadPacketError::from)?;
if let Some(cipher) = cipher {
azalea_crypto::decrypt_packet(cipher, &mut bytes);
}
buffer.extend_from_slice(&bytes);
} else {
return Err(Box::new(ReadPacketError::ConnectionClosed));
}
} else {
return Ok(None);
};
}
}
/// Try to get a Minecraft packet from a buffer. Returns None if the packet
/// isn't complete yet.
pub fn try_process_buffer<P: ProtocolPacket + Debug, R>(
buffer: &mut BytesMut,
compression_threshold: Option<u32>,
) -> Result<Option<P>, Box<ReadPacketError>>
where
R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync,
{
let Some(mut buf) = frame_splitter(buffer).map_err(ReadPacketError::from)? else {
// no full packet yet :(
return Ok(None);
};
if let Some(compression_threshold) = compression_threshold {
@ -255,5 +308,5 @@ where
let packet = packet_decoder(&mut Cursor::new(&buf[..]))?;
Ok(packet)
Ok(Some(packet))
}