1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 06:16:04 +00:00

fix memory leak in simulation tests (lol)

also, change some vecs into boxed slices, and add RelativeEntityUpdate::new
This commit is contained in:
mat 2025-02-23 08:47:17 +00:00
parent e21e1b97bf
commit dd557c8f29
13 changed files with 113 additions and 56 deletions

View file

@ -199,6 +199,11 @@ impl<T: AzaleaRead> AzaleaRead for Vec<T> {
Ok(contents)
}
}
impl<T: AzaleaRead> AzaleaRead for Box<[T]> {
default fn azalea_read(buf: &mut Cursor<&[u8]>) -> Result<Self, BufReadError> {
Vec::<T>::azalea_read(buf).map(Vec::into_boxed_slice)
}
}
impl<T: AzaleaRead> AzaleaReadLimited for Vec<T> {
fn azalea_read_limited(buf: &mut Cursor<&[u8]>, limit: usize) -> Result<Self, BufReadError> {
let length = u32::azalea_read_var(buf)? as usize;
@ -216,6 +221,11 @@ impl<T: AzaleaRead> AzaleaReadLimited for Vec<T> {
Ok(contents)
}
}
impl<T: AzaleaRead> AzaleaReadLimited for Box<[T]> {
fn azalea_read_limited(buf: &mut Cursor<&[u8]>, limit: usize) -> Result<Self, BufReadError> {
Vec::<T>::azalea_read_limited(buf, limit).map(Vec::into_boxed_slice)
}
}
impl<K: AzaleaRead + Send + Eq + Hash, V: AzaleaRead + Send> AzaleaRead for HashMap<K, V> {
fn azalea_read(buf: &mut Cursor<&[u8]>) -> Result<Self, BufReadError> {
@ -297,6 +307,11 @@ impl<T: AzaleaReadVar> AzaleaReadVar for Vec<T> {
Ok(contents)
}
}
impl<T: AzaleaReadVar> AzaleaReadVar for Box<[T]> {
fn azalea_read_var(buf: &mut Cursor<&[u8]>) -> Result<Self, BufReadError> {
Vec::<T>::azalea_read_var(buf).map(Vec::into_boxed_slice)
}
}
impl AzaleaRead for i64 {
fn azalea_read(buf: &mut Cursor<&[u8]>) -> Result<Self, BufReadError> {

View file

@ -64,6 +64,11 @@ impl<T: AzaleaWrite> AzaleaWrite for Vec<T> {
self[..].azalea_write(buf)
}
}
impl<T: AzaleaWrite> AzaleaWrite for Box<[T]> {
default fn azalea_write(&self, buf: &mut impl Write) -> Result<(), io::Error> {
self[..].azalea_write(buf)
}
}
impl<T: AzaleaWrite> AzaleaWrite for [T] {
fn azalea_write(&self, buf: &mut impl Write) -> Result<(), io::Error> {
@ -167,7 +172,7 @@ impl AzaleaWriteVar for u16 {
}
}
impl<T: AzaleaWriteVar> AzaleaWriteVar for Vec<T> {
impl<T: AzaleaWriteVar> AzaleaWriteVar for [T] {
fn azalea_write_var(&self, buf: &mut impl Write) -> Result<(), io::Error> {
u32::azalea_write_var(&(self.len() as u32), buf)?;
for i in self {
@ -176,6 +181,16 @@ impl<T: AzaleaWriteVar> AzaleaWriteVar for Vec<T> {
Ok(())
}
}
impl<T: AzaleaWriteVar> AzaleaWriteVar for Vec<T> {
fn azalea_write_var(&self, buf: &mut impl Write) -> Result<(), io::Error> {
self[..].azalea_write_var(buf)
}
}
impl<T: AzaleaWriteVar> AzaleaWriteVar for Box<[T]> {
fn azalea_write_var(&self, buf: &mut impl Write) -> Result<(), io::Error> {
self[..].azalea_write_var(buf)
}
}
impl AzaleaWrite for u8 {
fn azalea_write(&self, buf: &mut impl Write) -> Result<(), io::Error> {

View file

@ -6,6 +6,7 @@
//! [`azalea`]: https://docs.rs/azalea
#![feature(error_generic_member_access)]
#![feature(never_type)]
mod account;
mod client;

View file

@ -1,8 +1,8 @@
use std::time::{SystemTime, UNIX_EPOCH};
use azalea_protocol::packets::{
game::{s_chat::LastSeenMessagesUpdate, ServerboundChat, ServerboundChatCommand},
Packet,
game::{ServerboundChat, ServerboundChatCommand, s_chat::LastSeenMessagesUpdate},
};
use bevy_ecs::prelude::*;

View file

@ -2,15 +2,15 @@ use std::io::Cursor;
use azalea_protocol::{
packets::{
config::{ClientboundConfigPacket, ServerboundConfigPacket},
Packet,
config::{ClientboundConfigPacket, ServerboundConfigPacket},
},
read::deserialize_packet,
};
use bevy_ecs::prelude::*;
use tracing::{debug, error};
use crate::{raw_connection::RawConnection, InConfigState};
use crate::{InConfigState, raw_connection::RawConnection};
#[derive(Event, Debug, Clone)]
pub struct ReceiveConfigPacketEvent {

View file

@ -1,7 +1,7 @@
mod events;
use azalea_protocol::packets::config::*;
use azalea_protocol::packets::ConnectionProtocol;
use azalea_protocol::packets::config::*;
use bevy_ecs::prelude::*;
use bevy_ecs::system::SystemState;
pub use events::*;
@ -12,7 +12,7 @@ use crate::client::InConfigState;
use crate::disconnect::DisconnectEvent;
use crate::packet::game::KeepAliveEvent;
use crate::raw_connection::RawConnection;
use crate::{declare_packet_handlers, InstanceHolder};
use crate::{InstanceHolder, declare_packet_handlers};
pub fn process_packet_events(ecs: &mut World) {
let mut events_owned = Vec::new();

View file

@ -752,9 +752,9 @@ impl GamePacketHandler<'_> {
// we use RelativeEntityUpdate because it makes sure changes aren't made
// multiple times
commands.entity(entity).queue(RelativeEntityUpdate {
partial_world: instance_holder.partial_instance.clone(),
update: Box::new(move |entity| {
commands.entity(entity).queue(RelativeEntityUpdate::new(
instance_holder.partial_instance.clone(),
move |entity| {
let entity_id = entity.id();
entity.world_scope(|world| {
let mut commands_system_state = SystemState::<Commands>::new(world);
@ -767,8 +767,8 @@ impl GamePacketHandler<'_> {
}
commands_system_state.apply(world);
});
}),
});
},
));
});
}
@ -805,14 +805,14 @@ impl GamePacketHandler<'_> {
z: p.delta.za as f64 / 8000.,
});
commands.entity(entity).queue(RelativeEntityUpdate {
partial_world: instance_holder.partial_instance.clone(),
update: Box::new(move |entity_mut| {
commands.entity(entity).queue(RelativeEntityUpdate::new(
instance_holder.partial_instance.clone(),
move |entity_mut| {
entity_mut.world_scope(|world| {
world.send_event(KnockbackEvent { entity, knockback })
});
}),
});
},
));
},
);
}
@ -868,9 +868,9 @@ impl GamePacketHandler<'_> {
x_rot: (p.change.look_direction.x_rot as i32 * 360) as f32 / 256.,
y_rot: (p.change.look_direction.y_rot as i32 * 360) as f32 / 256.,
};
commands.entity(entity).queue(RelativeEntityUpdate {
partial_world: instance_holder.partial_instance.clone(),
update: Box::new(move |entity| {
commands.entity(entity).queue(RelativeEntityUpdate::new(
instance_holder.partial_instance.clone(),
move |entity| {
let mut position = entity.get_mut::<Position>().unwrap();
if new_pos != **position {
**position = new_pos;
@ -883,8 +883,8 @@ impl GamePacketHandler<'_> {
// old_pos is set to the current position when we're teleported
let mut physics = entity.get_mut::<Physics>().unwrap();
physics.set_old_pos(&position);
}),
});
},
));
},
);
}
@ -913,9 +913,9 @@ impl GamePacketHandler<'_> {
let new_delta = p.delta.clone();
let new_on_ground = p.on_ground;
commands.entity(entity).queue(RelativeEntityUpdate {
partial_world: instance_holder.partial_instance.clone(),
update: Box::new(move |entity_mut| {
commands.entity(entity).queue(RelativeEntityUpdate::new(
instance_holder.partial_instance.clone(),
move |entity_mut| {
let mut physics = entity_mut.get_mut::<Physics>().unwrap();
let new_pos = physics.vec_delta_codec.decode(
new_delta.xa as i64,
@ -929,8 +929,8 @@ impl GamePacketHandler<'_> {
if new_pos != **position {
**position = new_pos;
}
}),
});
},
));
},
);
}
@ -962,9 +962,9 @@ impl GamePacketHandler<'_> {
let new_on_ground = p.on_ground;
commands.entity(entity).queue(RelativeEntityUpdate {
partial_world: instance_holder.partial_instance.clone(),
update: Box::new(move |entity_mut| {
commands.entity(entity).queue(RelativeEntityUpdate::new(
instance_holder.partial_instance.clone(),
move |entity_mut| {
let mut physics = entity_mut.get_mut::<Physics>().unwrap();
let new_pos = physics.vec_delta_codec.decode(
new_delta.xa as i64,
@ -983,8 +983,8 @@ impl GamePacketHandler<'_> {
if new_look_direction != *look_direction {
*look_direction = new_look_direction;
}
}),
});
},
));
},
);
}
@ -1003,9 +1003,9 @@ impl GamePacketHandler<'_> {
};
let new_on_ground = p.on_ground;
commands.entity(entity).queue(RelativeEntityUpdate {
partial_world: instance_holder.partial_instance.clone(),
update: Box::new(move |entity_mut| {
commands.entity(entity).queue(RelativeEntityUpdate::new(
instance_holder.partial_instance.clone(),
move |entity_mut| {
let mut physics = entity_mut.get_mut::<Physics>().unwrap();
physics.set_on_ground(new_on_ground);
@ -1013,8 +1013,8 @@ impl GamePacketHandler<'_> {
if new_look_direction != *look_direction {
*look_direction = new_look_direction;
}
}),
});
},
));
} else {
warn!(
"Got move entity rot packet for unknown entity id {}",
@ -1507,9 +1507,9 @@ impl GamePacketHandler<'_> {
let new_on_ground = p.on_ground;
let new_look_direction = p.values.look_direction;
commands.entity(entity).queue(RelativeEntityUpdate {
partial_world: instance_holder.partial_instance.clone(),
update: Box::new(move |entity_mut| {
commands.entity(entity).queue(RelativeEntityUpdate::new(
instance_holder.partial_instance.clone(),
move |entity_mut| {
let is_local_entity = entity_mut.get::<LocalEntity>().is_some();
let mut physics = entity_mut.get_mut::<Physics>().unwrap();
@ -1530,8 +1530,8 @@ impl GamePacketHandler<'_> {
let mut look_direction = entity_mut.get_mut::<LookDirection>().unwrap();
*look_direction = new_look_direction;
}),
});
},
));
},
);
}

View file

@ -21,6 +21,7 @@ use bevy_app::App;
use bevy_ecs::{prelude::*, schedule::ExecutorKind};
use parking_lot::{Mutex, RwLock};
use simdnbt::owned::Nbt;
use tokio::task::JoinHandle;
use tokio::{sync::mpsc, time::sleep};
use uuid::Uuid;
@ -39,14 +40,14 @@ pub struct Simulation {
pub rt: tokio::runtime::Runtime,
pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
pub outgoing_packets_receiver: mpsc::UnboundedReceiver<Box<[u8]>>,
pub clear_outgoing_packets_receiver_task: JoinHandle<!>,
}
impl Simulation {
pub fn new(initial_connection_protocol: ConnectionProtocol) -> Self {
let mut app = create_simulation_app();
let mut entity = app.world_mut().spawn_empty();
let (player, outgoing_packets_receiver, incoming_packet_queue, rt) =
let (player, clear_outgoing_packets_receiver_task, incoming_packet_queue, rt) =
create_local_player_bundle(entity.id(), initial_connection_protocol);
entity.insert(player);
@ -68,7 +69,7 @@ impl Simulation {
entity,
rt,
incoming_packet_queue,
outgoing_packets_receiver,
clear_outgoing_packets_receiver_task,
}
}
@ -105,14 +106,14 @@ fn create_local_player_bundle(
connection_protocol: ConnectionProtocol,
) -> (
LocalPlayerBundle,
mpsc::UnboundedReceiver<Box<[u8]>>,
JoinHandle<!>,
Arc<Mutex<Vec<Box<[u8]>>>>,
tokio::runtime::Runtime,
) {
// unused since we'll trigger ticks ourselves
let (run_schedule_sender, _run_schedule_receiver) = tokio::sync::mpsc::unbounded_channel();
let (run_schedule_sender, _run_schedule_receiver) = mpsc::unbounded_channel();
let (outgoing_packets_sender, outgoing_packets_receiver) = mpsc::unbounded_channel();
let (outgoing_packets_sender, mut outgoing_packets_receiver) = mpsc::unbounded_channel();
let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));
let reader = RawConnectionReader {
incoming_packet_queue: incoming_packet_queue.clone(),
@ -136,6 +137,12 @@ fn create_local_player_bundle(
}
});
let clear_outgoing_packets_receiver_task = rt.spawn(async move {
loop {
let _ = outgoing_packets_receiver.recv().await;
}
});
let raw_connection = RawConnection {
reader,
writer,
@ -160,7 +167,7 @@ fn create_local_player_bundle(
(
local_player_bundle,
outgoing_packets_receiver,
clear_outgoing_packets_receiver_task,
incoming_packet_queue,
rt,
)

View file

@ -46,6 +46,17 @@ pub struct RelativeEntityUpdate {
// a function that takes the entity and updates it
pub update: Box<dyn FnOnce(&mut EntityWorldMut) + Send + Sync>,
}
impl RelativeEntityUpdate {
pub fn new(
partial_world: Arc<RwLock<PartialInstance>>,
update: impl FnOnce(&mut EntityWorldMut) + Send + Sync + 'static,
) -> Self {
Self {
partial_world,
update: Box::new(update),
}
}
}
/// A component that counts the number of times this entity has been modified.
/// This is used for making sure two clients don't do the same relative update

View file

@ -72,7 +72,7 @@ const MAGIC: [(i32, i32, i32); 64] = [
/// A compact list of integers with the given number of bits per entry.
#[derive(Clone, Debug, Default)]
pub struct BitStorage {
pub data: Vec<u64>,
pub data: Box<[u64]>,
bits: usize,
mask: u64,
size: usize,
@ -106,7 +106,7 @@ impl BitStorage {
// 0 bit storage
if data.is_empty() {
return Ok(BitStorage {
data: Vec::new(),
data: Box::new([]),
bits,
size,
..Default::default()
@ -136,7 +136,7 @@ impl BitStorage {
};
Ok(BitStorage {
data: using_data,
data: using_data.into(),
bits,
mask,
size,

View file

@ -31,7 +31,7 @@ pub struct PartialChunkStorage {
chunk_radius: u32,
view_range: u32,
// chunks is a list of size chunk_radius * chunk_radius
chunks: Vec<Option<Arc<RwLock<Chunk>>>>,
chunks: Box<[Option<Arc<RwLock<Chunk>>>]>,
}
/// A storage for chunks where they're only stored weakly, so if they're not
@ -50,7 +50,7 @@ pub struct ChunkStorage {
/// coordinate.
#[derive(Debug)]
pub struct Chunk {
pub sections: Vec<Section>,
pub sections: Box<[Section]>,
/// Heightmaps are used for identifying the surface blocks in a chunk.
/// Usually for clients only `WorldSurface` and `MotionBlocking` are
/// present.
@ -84,7 +84,7 @@ impl Default for Section {
impl Default for Chunk {
fn default() -> Self {
Chunk {
sections: vec![Section::default(); (384 / 16) as usize],
sections: vec![Section::default(); (384 / 16) as usize].into(),
heightmaps: HashMap::new(),
}
}
@ -97,7 +97,7 @@ impl PartialChunkStorage {
view_center: ChunkPos::new(0, 0),
chunk_radius,
view_range,
chunks: vec![None; (view_range * view_range) as usize],
chunks: vec![None; (view_range * view_range) as usize].into(),
}
}
@ -341,6 +341,7 @@ impl Chunk {
let section = Section::azalea_read(buf)?;
sections.push(section);
}
let sections = sections.into_boxed_slice();
let mut heightmaps = HashMap::new();
for (name, heightmap) in heightmaps_nbt.iter() {

View file

@ -49,7 +49,7 @@ impl PalettedContainer {
let palette_type = PaletteKind::from_bits_and_type(server_bits_per_entry, container_type);
let palette = palette_type.read(buf)?;
let size = container_type.size();
let data = Vec::<u64>::azalea_read(buf)?;
let data = Box::<[u64]>::azalea_read(buf)?;
// we can only trust the bits per entry that we're sent if there's enough data
// that it'd be global. if it's not global, then we have to calculate it

View file

@ -172,6 +172,9 @@ pub fn register(commands: &mut CommandDispatcher<Mutex<CommandSource>>) {
let source = ctx.source.lock();
source.reply("Ok!");
source.bot.disconnect();
let ecs = source.bot.ecs.clone();
@ -181,6 +184,8 @@ pub fn register(commands: &mut CommandDispatcher<Mutex<CommandSource>>) {
let ecs = ecs.lock();
let report_path = env::temp_dir().join("azalea-ecs-leak-report.txt");
let mut report = File::create(&report_path).unwrap();
@ -203,6 +208,7 @@ pub fn register(commands: &mut CommandDispatcher<Mutex<CommandSource>>) {
writeln!(report).unwrap();
for (info, _) in ecs.iter_resources() {
let name = info.name();
writeln!(report, "Resource: {name}").unwrap();
@ -212,6 +218,7 @@ pub fn register(commands: &mut CommandDispatcher<Mutex<CommandSource>>) {
match name {
"azalea_world::container::InstanceContainer" => {
let instance_container = ecs.resource::<InstanceContainer>();
for (instance_name, instance) in &instance_container.instances {
writeln!(report, "- Name: {}", instance_name).unwrap();
writeln!(report, "- Reference count: {}", instance.strong_count())