From 6f6289376a0d9ffe7e58506824e37f6b380961c3 Mon Sep 17 00:00:00 2001 From: mat Date: Fri, 7 Oct 2022 23:56:23 -0500 Subject: [PATCH] fix errors with rewritten packet reading i forgot i never tested it before LMAO --- Cargo.lock | 123 +++++++++++++++++++++------- azalea-buf/src/read.rs | 4 +- azalea-client/src/client.rs | 3 +- azalea-nbt/src/decode.rs | 6 +- azalea-protocol/Cargo.toml | 4 +- azalea-protocol/src/connect.rs | 2 +- azalea-protocol/src/lib.rs | 39 ++++++++- azalea-protocol/src/read.rs | 144 ++++++++++++++------------------- azalea-protocol/src/write.rs | 4 +- azalea/src/bot.rs | 10 ++- bot/src/main.rs | 2 +- 11 files changed, 210 insertions(+), 131 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ff10db62..128bea90 100755 --- a/Cargo.lock +++ b/Cargo.lock @@ -271,6 +271,8 @@ dependencies = [ "byteorder", "bytes", "flate2", + "futures", + "futures-util", "log", "serde", "serde_json", @@ -634,46 +636,89 @@ dependencies = [ ] [[package]] -name = "futures-channel" -version = "0.3.21" +name = "futures" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" [[package]] -name = "futures-io" -version = "0.3.21" +name = "futures-executor" +version = "0.3.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" - -[[package]] -name = "futures-sink" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" - -[[package]] -name = "futures-task" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" - -[[package]] -name = "futures-util" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab" dependencies = [ "futures-core", "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" + +[[package]] +name = "futures-macro" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" + +[[package]] +name = "futures-task" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" + +[[package]] +name = "futures-util" +version = "0.3.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -1494,16 +1539,36 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.6.10" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" +checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ "bytes", "futures-core", "futures-sink", - "log", "pin-project-lite", "tokio", + "tracing", +] + +[[package]] +name = "tracing" +version = "0.1.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b7358be39f2f274f322d2aaed611acc57f382e8eb1e5b48cb9ae30933495ce7" +dependencies = [ + "once_cell", ] [[package]] diff --git a/azalea-buf/src/read.rs b/azalea-buf/src/read.rs index 29f351c6..575066c4 100644 --- a/azalea-buf/src/read.rs +++ b/azalea-buf/src/read.rs @@ -42,10 +42,10 @@ pub enum BufReadError { } fn read_bytes<'a>(buf: &'a mut Cursor<&[u8]>, length: usize) -> Result<&'a [u8], BufReadError> { - if length > buf.get_ref().len() { + if length > (buf.get_ref().len() - buf.position() as usize) { return Err(BufReadError::UnexpectedEof { attempted_read: length, - actual_read: buf.get_ref().len(), + actual_read: buf.get_ref().len() - buf.position() as usize, }); } let initial_position = buf.position() as usize; diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index ed0a75e7..bbf78ee6 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -231,7 +231,8 @@ impl Client { /// Write a packet directly to the server. pub async fn write_packet(&self, packet: ServerboundGamePacket) -> Result<(), std::io::Error> { - self.write_conn.lock().await.write(packet).await + self.write_conn.lock().await.write(packet).await?; + Ok(()) } /// Disconnect from the server, ending all tasks. diff --git a/azalea-nbt/src/decode.rs b/azalea-nbt/src/decode.rs index 8a1dfab5..a811bb1f 100755 --- a/azalea-nbt/src/decode.rs +++ b/azalea-nbt/src/decode.rs @@ -9,7 +9,7 @@ use std::io::{BufRead, Read}; #[inline] fn read_bytes<'a>(buf: &'a mut Cursor<&[u8]>, length: usize) -> Result<&'a [u8], Error> { - if length > buf.get_ref().len() { + if length > (buf.get_ref().len() - buf.position() as usize) { return Err(Error::UnexpectedEof); } let initial_position = buf.position() as usize; @@ -95,7 +95,7 @@ impl Tag { // integers. 11 => { let length = stream.read_u32::()? as usize; - if length * 4 > stream.get_ref().len() { + if length * 4 > (stream.get_ref().len() - stream.position() as usize) { return Err(Error::UnexpectedEof); } let mut ints = Vec::with_capacity(length as usize); @@ -108,7 +108,7 @@ impl Tag { // integer (thus 4 bytes) and indicates the number of 8 byte longs. 12 => { let length = stream.read_u32::()? as usize; - if length * 8 > stream.get_ref().len() { + if length * 8 > (stream.get_ref().len() - stream.position() as usize) { return Err(Error::UnexpectedEof); } let mut longs = Vec::with_capacity(length as usize); diff --git a/azalea-protocol/Cargo.toml b/azalea-protocol/Cargo.toml index 62de47d9..415c0aa9 100755 --- a/azalea-protocol/Cargo.toml +++ b/azalea-protocol/Cargo.toml @@ -24,12 +24,14 @@ azalea-world = {path = "../azalea-world", version = "^0.1.0"} byteorder = "^1.4.3" bytes = "^1.1.0" flate2 = "1.0.23" +futures = "0.3.24" +futures-util = "0.3.24" log = "0.4.17" serde = {version = "1.0.130", features = ["serde_derive"]} serde_json = "^1.0.72" thiserror = "^1.0.34" tokio = {version = "^1.19.2", features = ["io-util", "net", "macros"]} -tokio-util = "^0.6.9" +tokio-util = {version = "0.7.4", features = ["codec"]} trust-dns-resolver = "^0.20.3" uuid = "1.1.2" diff --git a/azalea-protocol/src/connect.rs b/azalea-protocol/src/connect.rs index bd55e406..d7b9bd1d 100644 --- a/azalea-protocol/src/connect.rs +++ b/azalea-protocol/src/connect.rs @@ -57,7 +57,7 @@ where /// Write a packet to the server pub async fn write(&mut self, packet: W) -> std::io::Result<()> { write_packet( - packet, + &packet, &mut self.write_stream, self.compression_threshold, &mut self.enc_cipher, diff --git a/azalea-protocol/src/lib.rs b/azalea-protocol/src/lib.rs index 4da2ba90..58ffac0a 100755 --- a/azalea-protocol/src/lib.rs +++ b/azalea-protocol/src/lib.rs @@ -1,5 +1,9 @@ //! This lib is responsible for parsing Minecraft packets. +// these two are necessary for thiserror backtraces +#![feature(error_generic_member_access)] +#![feature(provide_any)] + use std::net::IpAddr; use std::str::FromStr; @@ -78,12 +82,10 @@ mod tests { } .get(); let mut stream = Vec::new(); - write_packet(packet, &mut stream, None, &mut None) + write_packet(&packet, &mut stream, None, &mut None) .await .unwrap(); - println!("stream: {stream:?}"); - let mut stream = Cursor::new(stream); let _ = read_packet::( @@ -95,4 +97,35 @@ mod tests { .await .unwrap(); } + + #[tokio::test] + async fn test_double_hello_packet() { + let packet = ServerboundHelloPacket { + username: "test".to_string(), + public_key: Some(ProfilePublicKeyData { + expires_at: 0, + key: b"idontthinkthisreallymattersijustwantittobelongforthetest".to_vec(), + key_signature: b"idontthinkthisreallymattersijustwantittobelongforthetest".to_vec(), + }), + profile_id: Some(Uuid::from_u128(0)), + } + .get(); + let mut stream = Vec::new(); + write_packet(&packet, &mut stream, None, &mut None) + .await + .unwrap(); + write_packet(&packet, &mut stream, None, &mut None) + .await + .unwrap(); + let mut stream = Cursor::new(stream); + + let mut buffer = BytesMut::new(); + + let _ = read_packet::(&mut stream, &mut buffer, None, &mut None) + .await + .unwrap(); + let _ = read_packet::(&mut stream, &mut buffer, None, &mut None) + .await + .unwrap(); + } } diff --git a/azalea-protocol/src/read.rs b/azalea-protocol/src/read.rs index eceede9d..4c398e96 100644 --- a/azalea-protocol/src/read.rs +++ b/azalea-protocol/src/read.rs @@ -5,16 +5,15 @@ use azalea_crypto::Aes128CfbDec; use bytes::Buf; use bytes::BytesMut; use flate2::read::ZlibDecoder; +use futures::StreamExt; use log::{log_enabled, trace}; -use std::io::Cursor; use std::{ - cell::Cell, - io::Read, - pin::Pin, - task::{Context, Poll}, + fmt::Debug, + io::{Cursor, Read}, }; use thiserror::Error; use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio_util::codec::{BytesCodec, FramedRead}; #[derive(Error, Debug)] pub enum ReadPacketError { @@ -28,18 +27,28 @@ pub enum ReadPacketError { UnknownPacketId { state_name: String, id: u32 }, #[error("Couldn't read packet id")] ReadPacketId { source: BufReadError }, - #[error("Couldn't decompress packet")] + #[error(transparent)] Decompress { #[from] + #[backtrace] source: DecompressionError, }, - #[error("Frame splitter error")] + #[error(transparent)] FrameSplitter { #[from] + #[backtrace] source: FrameSplitterError, }, #[error("Leftover data after reading packet {packet_name}: {data:?}")] LeftoverData { data: Vec, packet_name: String }, + #[error(transparent)] + IoError { + #[from] + #[backtrace] + source: std::io::Error, + }, + #[error("Connection closed")] + ConnectionClosed, } #[derive(Error, Debug)] @@ -52,6 +61,7 @@ pub enum FrameSplitterError { #[error("Io error")] Io { #[from] + #[backtrace] source: std::io::Error, }, #[error("Packet is longer than {max} bytes (is {size})")] @@ -77,9 +87,9 @@ fn parse_frame(buffer: &mut BytesMut) -> Result { }, }; - if length > buffer_copy.get_ref().len() { + if length > buffer_copy.remaining() { return Err(FrameSplitterError::BadLength { - max: buffer_copy.get_ref().len(), + max: buffer_copy.remaining(), size: length, }); } @@ -88,49 +98,33 @@ fn parse_frame(buffer: &mut BytesMut) -> Result { // from the real buffer now // the length of the varint that says the length of the whole packet - let varint_length = buffer.len() - buffer_copy.remaining(); - let _ = buffer.split_to(varint_length); + let varint_length = buffer.remaining() - buffer_copy.remaining(); + + buffer.advance(varint_length); let data = buffer.split_to(length); Ok(data) } -async fn frame_splitter<'a, R: ?Sized + Sized>( - stream: &mut R, - buffer: &'a mut BytesMut, -) -> Result, FrameSplitterError> -where - R: AsyncRead + std::marker::Unpin + std::marker::Send, -{ +fn frame_splitter<'a>(buffer: &'a mut BytesMut) -> Result>, FrameSplitterError> { // https://tokio.rs/tokio/tutorial/framing - loop { - let read_frame = parse_frame(buffer); - match read_frame { - Ok(frame) => return Ok(frame.to_vec()), - Err(err) => match err { - FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => { - // we probably just haven't read enough yet - } - _ => return Err(err), - }, - } - - let read_buf: usize = AsyncReadExt::read_buf(stream, buffer).await?; - if 0 == read_buf { - // The remote closed the connection. For this to be - // a clean shutdown, there should be no data in the - // read buffer. If there is, this means that the - // peer closed the socket while sending a frame. - if buffer.as_ref().is_empty() { - return Err(FrameSplitterError::ConnectionClosed); - } else { - return Err(FrameSplitterError::ConnectionReset); + let read_frame = parse_frame(buffer); + match read_frame { + Ok(frame) => return Ok(Some(frame.to_vec())), + Err(err) => match err { + FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => { + // we probably just haven't read enough yet } - } + _ => return Err(err), + }, } + + Ok(None) } -fn packet_decoder(stream: &mut Cursor<&[u8]>) -> Result { +fn packet_decoder( + stream: &mut Cursor<&[u8]>, +) -> Result { // Packet ID let packet_id = u32::var_read_from(stream).map_err(|e| ReadPacketError::ReadPacketId { source: e })?; @@ -152,6 +146,7 @@ pub enum DecompressionError { #[error("Io error")] Io { #[from] + #[backtrace] source: std::io::Error, }, #[error("Badly compressed packet - size of {size} is below server threshold of {threshold}")] @@ -197,42 +192,7 @@ fn compression_decoder( Ok(decoded_buf) } -struct EncryptedStream<'a, R> -where - R: AsyncRead + std::marker::Unpin + std::marker::Send, -{ - cipher: Cell<&'a mut Option>, - stream: &'a mut Pin<&'a mut R>, -} - -impl AsyncRead for EncryptedStream<'_, R> -where - R: AsyncRead + Unpin + Send, -{ - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - // i hate this - let polled = self.as_mut().stream.as_mut().poll_read(cx, buf); - match polled { - Poll::Ready(r) => { - // if we don't check for the remaining then we decrypt big packets incorrectly - // (but only on linux and release mode for some reason LMAO) - if buf.remaining() == 0 { - if let Some(cipher) = self.as_mut().cipher.get_mut() { - azalea_crypto::decrypt_packet(cipher, buf.filled_mut()); - } - } - Poll::Ready(r) - } - Poll::Pending => Poll::Pending, - } - } -} - -pub async fn read_packet<'a, P: ProtocolPacket, R>( +pub async fn read_packet<'a, P: ProtocolPacket + Debug, R>( stream: &'a mut R, buffer: &mut BytesMut, compression_threshold: Option, @@ -241,13 +201,29 @@ pub async fn read_packet<'a, P: ProtocolPacket, R>( where R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync, { - // if we were given a cipher, decrypt the packet - let mut encrypted_stream = EncryptedStream { - cipher: Cell::new(cipher), - stream: &mut Pin::new(stream), - }; + let mut framed = FramedRead::new(stream, BytesCodec::new()); + let mut buf = loop { + if let Some(buf) = frame_splitter(buffer)? { + // we got a full packet!! + break buf; + } else { + // no full packet yet :( keep reading + }; - let mut buf = frame_splitter(&mut encrypted_stream, buffer).await?; + // if we were given a cipher, decrypt the packet + if let Some(message) = framed.next().await { + let mut bytes = message.unwrap(); + println!("bytes: {:?}", bytes.len()); + + if let Some(cipher) = cipher { + azalea_crypto::decrypt_packet(cipher, &mut bytes); + } + + buffer.extend_from_slice(&bytes); + } else { + return Err(ReadPacketError::ConnectionClosed); + }; + }; if let Some(compression_threshold) = compression_threshold { buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)?; diff --git a/azalea-protocol/src/write.rs b/azalea-protocol/src/write.rs index b2ae2810..a04979a5 100755 --- a/azalea-protocol/src/write.rs +++ b/azalea-protocol/src/write.rs @@ -69,7 +69,7 @@ async fn compression_encoder( } pub async fn write_packet( - packet: P, + packet: &P, stream: &mut W, compression_threshold: Option, cipher: &mut Option, @@ -78,7 +78,7 @@ where P: ProtocolPacket + Debug, W: AsyncWrite + Unpin + Send, { - let mut buf = packet_encoder(&packet).unwrap(); + let mut buf = packet_encoder(packet).unwrap(); if let Some(threshold) = compression_threshold { buf = compression_encoder(&buf, threshold).await.unwrap(); } diff --git a/azalea/src/bot.rs b/azalea/src/bot.rs index a77e2a1c..1570fa5e 100644 --- a/azalea/src/bot.rs +++ b/azalea/src/bot.rs @@ -36,10 +36,12 @@ impl crate::Plugin for Plugin { async fn handle(self: Arc, mut bot: Client, event: Arc) { if let Event::Tick = *event { let mut state = self.state.lock(); - if bot.jumping() { - state.jumping_once = false; - } else if state.jumping_once { - bot.set_jumping(true); + if state.jumping_once { + if bot.jumping() { + state.jumping_once = false; + } else { + bot.set_jumping(true); + } } } } diff --git a/bot/src/main.rs b/bot/src/main.rs index 0a291fd8..2f79ad26 100644 --- a/bot/src/main.rs +++ b/bot/src/main.rs @@ -25,7 +25,7 @@ async fn main() { async fn handle(bot: Client, event: Arc, _state: Arc>) -> anyhow::Result<()> { if let Event::Tick = *event { - bot.jump(); + // bot.jump(); } Ok(())