1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

fix errors with rewritten packet reading

i forgot i never tested it before LMAO
This commit is contained in:
mat 2022-10-07 23:56:23 -05:00
parent e9d8d0357e
commit 6f6289376a
11 changed files with 210 additions and 131 deletions

123
Cargo.lock generated
View file

@ -271,6 +271,8 @@ dependencies = [
"byteorder",
"bytes",
"flate2",
"futures",
"futures-util",
"log",
"serde",
"serde_json",
@ -634,46 +636,89 @@ dependencies = [
]
[[package]]
name = "futures-channel"
version = "0.3.21"
name = "futures"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.21"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf"
[[package]]
name = "futures-io"
version = "0.3.21"
name = "futures-executor"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
[[package]]
name = "futures-sink"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
[[package]]
name = "futures-task"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
[[package]]
name = "futures-util"
version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68"
[[package]]
name = "futures-macro"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56"
[[package]]
name = "futures-task"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1"
[[package]]
name = "futures-util"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
@ -1494,16 +1539,36 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.6.10"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507"
checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
"tracing",
]
[[package]]
name = "tracing"
version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b7358be39f2f274f322d2aaed611acc57f382e8eb1e5b48cb9ae30933495ce7"
dependencies = [
"once_cell",
]
[[package]]

View file

@ -42,10 +42,10 @@ pub enum BufReadError {
}
fn read_bytes<'a>(buf: &'a mut Cursor<&[u8]>, length: usize) -> Result<&'a [u8], BufReadError> {
if length > buf.get_ref().len() {
if length > (buf.get_ref().len() - buf.position() as usize) {
return Err(BufReadError::UnexpectedEof {
attempted_read: length,
actual_read: buf.get_ref().len(),
actual_read: buf.get_ref().len() - buf.position() as usize,
});
}
let initial_position = buf.position() as usize;

View file

@ -231,7 +231,8 @@ impl Client {
/// Write a packet directly to the server.
pub async fn write_packet(&self, packet: ServerboundGamePacket) -> Result<(), std::io::Error> {
self.write_conn.lock().await.write(packet).await
self.write_conn.lock().await.write(packet).await?;
Ok(())
}
/// Disconnect from the server, ending all tasks.

View file

@ -9,7 +9,7 @@ use std::io::{BufRead, Read};
#[inline]
fn read_bytes<'a>(buf: &'a mut Cursor<&[u8]>, length: usize) -> Result<&'a [u8], Error> {
if length > buf.get_ref().len() {
if length > (buf.get_ref().len() - buf.position() as usize) {
return Err(Error::UnexpectedEof);
}
let initial_position = buf.position() as usize;
@ -95,7 +95,7 @@ impl Tag {
// integers.
11 => {
let length = stream.read_u32::<BE>()? as usize;
if length * 4 > stream.get_ref().len() {
if length * 4 > (stream.get_ref().len() - stream.position() as usize) {
return Err(Error::UnexpectedEof);
}
let mut ints = Vec::with_capacity(length as usize);
@ -108,7 +108,7 @@ impl Tag {
// integer (thus 4 bytes) and indicates the number of 8 byte longs.
12 => {
let length = stream.read_u32::<BE>()? as usize;
if length * 8 > stream.get_ref().len() {
if length * 8 > (stream.get_ref().len() - stream.position() as usize) {
return Err(Error::UnexpectedEof);
}
let mut longs = Vec::with_capacity(length as usize);

View file

@ -24,12 +24,14 @@ azalea-world = {path = "../azalea-world", version = "^0.1.0"}
byteorder = "^1.4.3"
bytes = "^1.1.0"
flate2 = "1.0.23"
futures = "0.3.24"
futures-util = "0.3.24"
log = "0.4.17"
serde = {version = "1.0.130", features = ["serde_derive"]}
serde_json = "^1.0.72"
thiserror = "^1.0.34"
tokio = {version = "^1.19.2", features = ["io-util", "net", "macros"]}
tokio-util = "^0.6.9"
tokio-util = {version = "0.7.4", features = ["codec"]}
trust-dns-resolver = "^0.20.3"
uuid = "1.1.2"

View file

@ -57,7 +57,7 @@ where
/// Write a packet to the server
pub async fn write(&mut self, packet: W) -> std::io::Result<()> {
write_packet(
packet,
&packet,
&mut self.write_stream,
self.compression_threshold,
&mut self.enc_cipher,

View file

@ -1,5 +1,9 @@
//! This lib is responsible for parsing Minecraft packets.
// these two are necessary for thiserror backtraces
#![feature(error_generic_member_access)]
#![feature(provide_any)]
use std::net::IpAddr;
use std::str::FromStr;
@ -78,12 +82,10 @@ mod tests {
}
.get();
let mut stream = Vec::new();
write_packet(packet, &mut stream, None, &mut None)
write_packet(&packet, &mut stream, None, &mut None)
.await
.unwrap();
println!("stream: {stream:?}");
let mut stream = Cursor::new(stream);
let _ = read_packet::<ServerboundLoginPacket, _>(
@ -95,4 +97,35 @@ mod tests {
.await
.unwrap();
}
#[tokio::test]
async fn test_double_hello_packet() {
let packet = ServerboundHelloPacket {
username: "test".to_string(),
public_key: Some(ProfilePublicKeyData {
expires_at: 0,
key: b"idontthinkthisreallymattersijustwantittobelongforthetest".to_vec(),
key_signature: b"idontthinkthisreallymattersijustwantittobelongforthetest".to_vec(),
}),
profile_id: Some(Uuid::from_u128(0)),
}
.get();
let mut stream = Vec::new();
write_packet(&packet, &mut stream, None, &mut None)
.await
.unwrap();
write_packet(&packet, &mut stream, None, &mut None)
.await
.unwrap();
let mut stream = Cursor::new(stream);
let mut buffer = BytesMut::new();
let _ = read_packet::<ServerboundLoginPacket, _>(&mut stream, &mut buffer, None, &mut None)
.await
.unwrap();
let _ = read_packet::<ServerboundLoginPacket, _>(&mut stream, &mut buffer, None, &mut None)
.await
.unwrap();
}
}

View file

@ -5,16 +5,15 @@ use azalea_crypto::Aes128CfbDec;
use bytes::Buf;
use bytes::BytesMut;
use flate2::read::ZlibDecoder;
use futures::StreamExt;
use log::{log_enabled, trace};
use std::io::Cursor;
use std::{
cell::Cell,
io::Read,
pin::Pin,
task::{Context, Poll},
fmt::Debug,
io::{Cursor, Read},
};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_util::codec::{BytesCodec, FramedRead};
#[derive(Error, Debug)]
pub enum ReadPacketError {
@ -28,18 +27,28 @@ pub enum ReadPacketError {
UnknownPacketId { state_name: String, id: u32 },
#[error("Couldn't read packet id")]
ReadPacketId { source: BufReadError },
#[error("Couldn't decompress packet")]
#[error(transparent)]
Decompress {
#[from]
#[backtrace]
source: DecompressionError,
},
#[error("Frame splitter error")]
#[error(transparent)]
FrameSplitter {
#[from]
#[backtrace]
source: FrameSplitterError,
},
#[error("Leftover data after reading packet {packet_name}: {data:?}")]
LeftoverData { data: Vec<u8>, packet_name: String },
#[error(transparent)]
IoError {
#[from]
#[backtrace]
source: std::io::Error,
},
#[error("Connection closed")]
ConnectionClosed,
}
#[derive(Error, Debug)]
@ -52,6 +61,7 @@ pub enum FrameSplitterError {
#[error("Io error")]
Io {
#[from]
#[backtrace]
source: std::io::Error,
},
#[error("Packet is longer than {max} bytes (is {size})")]
@ -77,9 +87,9 @@ fn parse_frame(buffer: &mut BytesMut) -> Result<BytesMut, FrameSplitterError> {
},
};
if length > buffer_copy.get_ref().len() {
if length > buffer_copy.remaining() {
return Err(FrameSplitterError::BadLength {
max: buffer_copy.get_ref().len(),
max: buffer_copy.remaining(),
size: length,
});
}
@ -88,49 +98,33 @@ fn parse_frame(buffer: &mut BytesMut) -> Result<BytesMut, FrameSplitterError> {
// from the real buffer now
// the length of the varint that says the length of the whole packet
let varint_length = buffer.len() - buffer_copy.remaining();
let _ = buffer.split_to(varint_length);
let varint_length = buffer.remaining() - buffer_copy.remaining();
buffer.advance(varint_length);
let data = buffer.split_to(length);
Ok(data)
}
async fn frame_splitter<'a, R: ?Sized + Sized>(
stream: &mut R,
buffer: &'a mut BytesMut,
) -> Result<Vec<u8>, FrameSplitterError>
where
R: AsyncRead + std::marker::Unpin + std::marker::Send,
{
fn frame_splitter<'a>(buffer: &'a mut BytesMut) -> Result<Option<Vec<u8>>, FrameSplitterError> {
// https://tokio.rs/tokio/tutorial/framing
loop {
let read_frame = parse_frame(buffer);
match read_frame {
Ok(frame) => return Ok(frame.to_vec()),
Err(err) => match err {
FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
// we probably just haven't read enough yet
}
_ => return Err(err),
},
}
let read_buf: usize = AsyncReadExt::read_buf(stream, buffer).await?;
if 0 == read_buf {
// The remote closed the connection. For this to be
// a clean shutdown, there should be no data in the
// read buffer. If there is, this means that the
// peer closed the socket while sending a frame.
if buffer.as_ref().is_empty() {
return Err(FrameSplitterError::ConnectionClosed);
} else {
return Err(FrameSplitterError::ConnectionReset);
let read_frame = parse_frame(buffer);
match read_frame {
Ok(frame) => return Ok(Some(frame.to_vec())),
Err(err) => match err {
FrameSplitterError::BadLength { .. } | FrameSplitterError::Io { .. } => {
// we probably just haven't read enough yet
}
}
_ => return Err(err),
},
}
Ok(None)
}
fn packet_decoder<P: ProtocolPacket>(stream: &mut Cursor<&[u8]>) -> Result<P, ReadPacketError> {
fn packet_decoder<P: ProtocolPacket + Debug>(
stream: &mut Cursor<&[u8]>,
) -> Result<P, ReadPacketError> {
// Packet ID
let packet_id =
u32::var_read_from(stream).map_err(|e| ReadPacketError::ReadPacketId { source: e })?;
@ -152,6 +146,7 @@ pub enum DecompressionError {
#[error("Io error")]
Io {
#[from]
#[backtrace]
source: std::io::Error,
},
#[error("Badly compressed packet - size of {size} is below server threshold of {threshold}")]
@ -197,42 +192,7 @@ fn compression_decoder(
Ok(decoded_buf)
}
struct EncryptedStream<'a, R>
where
R: AsyncRead + std::marker::Unpin + std::marker::Send,
{
cipher: Cell<&'a mut Option<Aes128CfbDec>>,
stream: &'a mut Pin<&'a mut R>,
}
impl<R> AsyncRead for EncryptedStream<'_, R>
where
R: AsyncRead + Unpin + Send,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// i hate this
let polled = self.as_mut().stream.as_mut().poll_read(cx, buf);
match polled {
Poll::Ready(r) => {
// if we don't check for the remaining then we decrypt big packets incorrectly
// (but only on linux and release mode for some reason LMAO)
if buf.remaining() == 0 {
if let Some(cipher) = self.as_mut().cipher.get_mut() {
azalea_crypto::decrypt_packet(cipher, buf.filled_mut());
}
}
Poll::Ready(r)
}
Poll::Pending => Poll::Pending,
}
}
}
pub async fn read_packet<'a, P: ProtocolPacket, R>(
pub async fn read_packet<'a, P: ProtocolPacket + Debug, R>(
stream: &'a mut R,
buffer: &mut BytesMut,
compression_threshold: Option<u32>,
@ -241,13 +201,29 @@ pub async fn read_packet<'a, P: ProtocolPacket, R>(
where
R: AsyncRead + std::marker::Unpin + std::marker::Send + std::marker::Sync,
{
// if we were given a cipher, decrypt the packet
let mut encrypted_stream = EncryptedStream {
cipher: Cell::new(cipher),
stream: &mut Pin::new(stream),
};
let mut framed = FramedRead::new(stream, BytesCodec::new());
let mut buf = loop {
if let Some(buf) = frame_splitter(buffer)? {
// we got a full packet!!
break buf;
} else {
// no full packet yet :( keep reading
};
let mut buf = frame_splitter(&mut encrypted_stream, buffer).await?;
// if we were given a cipher, decrypt the packet
if let Some(message) = framed.next().await {
let mut bytes = message.unwrap();
println!("bytes: {:?}", bytes.len());
if let Some(cipher) = cipher {
azalea_crypto::decrypt_packet(cipher, &mut bytes);
}
buffer.extend_from_slice(&bytes);
} else {
return Err(ReadPacketError::ConnectionClosed);
};
};
if let Some(compression_threshold) = compression_threshold {
buf = compression_decoder(&mut Cursor::new(&buf[..]), compression_threshold)?;

View file

@ -69,7 +69,7 @@ async fn compression_encoder(
}
pub async fn write_packet<P, W>(
packet: P,
packet: &P,
stream: &mut W,
compression_threshold: Option<u32>,
cipher: &mut Option<Aes128CfbEnc>,
@ -78,7 +78,7 @@ where
P: ProtocolPacket + Debug,
W: AsyncWrite + Unpin + Send,
{
let mut buf = packet_encoder(&packet).unwrap();
let mut buf = packet_encoder(packet).unwrap();
if let Some(threshold) = compression_threshold {
buf = compression_encoder(&buf, threshold).await.unwrap();
}

View file

@ -36,10 +36,12 @@ impl crate::Plugin for Plugin {
async fn handle(self: Arc<Self>, mut bot: Client, event: Arc<Event>) {
if let Event::Tick = *event {
let mut state = self.state.lock();
if bot.jumping() {
state.jumping_once = false;
} else if state.jumping_once {
bot.set_jumping(true);
if state.jumping_once {
if bot.jumping() {
state.jumping_once = false;
} else {
bot.set_jumping(true);
}
}
}
}

View file

@ -25,7 +25,7 @@ async fn main() {
async fn handle(bot: Client, event: Arc<Event>, _state: Arc<Mutex<State>>) -> anyhow::Result<()> {
if let Event::Tick = *event {
bot.jump();
// bot.jump();
}
Ok(())