diff --git a/Cargo.lock b/Cargo.lock index 00b49c19..5fc44844 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -284,6 +284,7 @@ dependencies = [ "bevy_app", "bevy_ecs", "derive_more", + "env_logger", "futures", "iyes_loopless", "log", diff --git a/azalea-client/Cargo.toml b/azalea-client/Cargo.toml index 41074c63..63cd4875 100644 --- a/azalea-client/Cargo.toml +++ b/azalea-client/Cargo.toml @@ -11,26 +11,29 @@ version = "0.5.0" [dependencies] anyhow = "1.0.59" async-trait = "0.1.58" -azalea-auth = { path = "../azalea-auth", version = "0.5.0" } -azalea-block = { path = "../azalea-block", version = "0.5.0" } -azalea-chat = { path = "../azalea-chat", version = "0.5.0" } -azalea-core = { path = "../azalea-core", version = "0.5.0" } -azalea-crypto = { path = "../azalea-crypto", version = "0.5.0" } -azalea-physics = { path = "../azalea-physics", version = "0.5.0" } -azalea-protocol = { path = "../azalea-protocol", version = "0.5.0" } -azalea-registry = { path = "../azalea-registry", version = "0.5.0" } -azalea-world = { path = "../azalea-world", version = "0.5.0" } -bevy_app = { version = "0.9.1", default-features = false } -bevy_ecs = { version = "0.9.1", default-features = false } +azalea-auth = {path = "../azalea-auth", version = "0.5.0"} +azalea-block = {path = "../azalea-block", version = "0.5.0"} +azalea-chat = {path = "../azalea-chat", version = "0.5.0"} +azalea-core = {path = "../azalea-core", version = "0.5.0"} +azalea-crypto = {path = "../azalea-crypto", version = "0.5.0"} +azalea-physics = {path = "../azalea-physics", version = "0.5.0"} +azalea-protocol = {path = "../azalea-protocol", version = "0.5.0"} +azalea-registry = {path = "../azalea-registry", version = "0.5.0"} +azalea-world = {path = "../azalea-world", version = "0.5.0"} +bevy_app = {version = "0.9.1", default-features = false} +bevy_ecs = {version = "0.9.1", default-features = false} +derive_more = {version = "0.99.17", features = ["deref", "deref_mut"]} futures = "0.3.25" iyes_loopless = "0.9.1" log = "0.4.17" nohash-hasher = "0.2.0" once_cell = "1.16.0" -parking_lot = { version = "^0.12.1", features = ["deadlock_detection"] } +parking_lot = {version = "^0.12.1", features = ["deadlock_detection"]} regex = "1.7.0" thiserror = "^1.0.34" -tokio = { version = "^1.21.2", features = ["sync"] } +tokio = {version = "^1.21.2", features = ["sync"]} typemap_rev = "0.2.0" uuid = "^1.1.2" -derive_more = { version = "0.99.17", features = ["deref", "deref_mut"] } + +[dev-dependencies] +env_logger = "0.9.1" diff --git a/azalea-client/examples/echo.rs b/azalea-client/examples/echo.rs index 5e7b751d..8b5f97ef 100644 --- a/azalea-client/examples/echo.rs +++ b/azalea-client/examples/echo.rs @@ -4,6 +4,7 @@ use azalea_client::{Account, Client, Event}; #[tokio::main] async fn main() { + env_logger::init(); // deadlock detection, you can safely delete this block if you're not trying to // debug deadlocks in azalea { diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index 742f979b..ae36829b 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -32,7 +32,7 @@ use azalea_protocol::{ resolver, ServerAddress, }; use azalea_world::{ - entity::Entity, EntityInfos, EntityPlugin, PartialWorld, World, WorldContainer, + entity::Entity, EntityInfos, EntityPlugin, Local, PartialWorld, World, WorldContainer, }; use bevy_app::App; use bevy_ecs::{ @@ -176,7 +176,7 @@ impl Client { let resolved_address = resolver::resolve_address(&address).await?; // An event that causes the schedule to run. This is only used internally. - let (run_schedule_sender, run_schedule_receiver) = mpsc::unbounded_channel(); + let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1); let ecs_lock = start_ecs(run_schedule_receiver, run_schedule_sender.clone()); { @@ -201,7 +201,7 @@ impl Client { account: &Account, address: &ServerAddress, resolved_address: &SocketAddr, - run_schedule_sender: mpsc::UnboundedSender<()>, + run_schedule_sender: mpsc::Sender<()>, ) -> Result<(Self, mpsc::UnboundedReceiver), JoinError> { let conn = Connection::new(resolved_address).await?; let (conn, game_profile) = Self::handshake(conn, account, address).await?; @@ -231,7 +231,6 @@ impl Client { // default to an empty world, it'll be set correctly later when we // get the login packet Arc::new(RwLock::new(World::default())), - ecs.resource_mut::().deref_mut(), tx, ); @@ -250,8 +249,12 @@ impl Client { local_player.tasks.push(read_packets_task); local_player.tasks.push(write_packets_task); - ecs.entity_mut(entity) - .insert((local_player, packet_receiver, PhysicsState::default())); + ecs.entity_mut(entity).insert(( + local_player, + packet_receiver, + PhysicsState::default(), + Local, + )); // just start up the game loop and we're ready! @@ -485,8 +488,8 @@ impl Client { #[doc(hidden)] pub fn start_ecs( - run_schedule_receiver: mpsc::UnboundedReceiver<()>, - run_schedule_sender: mpsc::UnboundedSender<()>, + run_schedule_receiver: mpsc::Receiver<()>, + run_schedule_sender: mpsc::Sender<()>, ) -> Arc> { // if you get an error right here that means you're doing something with locks // wrong read the error to see where the issue is @@ -534,7 +537,7 @@ pub fn start_ecs( async fn run_schedule_loop( ecs: Arc>, mut schedule: Schedule, - mut run_schedule_receiver: mpsc::UnboundedReceiver<()>, + mut run_schedule_receiver: mpsc::Receiver<()>, ) { loop { // whenever we get an event from run_schedule_receiver, run the schedule @@ -545,7 +548,7 @@ async fn run_schedule_loop( /// Send an event to run the schedule every 50 milliseconds. It will stop when /// the receiver is dropped. -pub async fn tick_run_schedule_loop(run_schedule_sender: mpsc::UnboundedSender<()>) { +pub async fn tick_run_schedule_loop(run_schedule_sender: mpsc::Sender<()>) { let mut game_tick_interval = time::interval(time::Duration::from_millis(50)); // TODO: Minecraft bursts up to 10 ticks and then skips, we should too game_tick_interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst); @@ -554,7 +557,7 @@ pub async fn tick_run_schedule_loop(run_schedule_sender: mpsc::UnboundedSender<( loop { game_tick_interval.tick().await; - if let Err(e) = run_schedule_sender.send(()) { + if let Err(e) = run_schedule_sender.send(()).await { println!("tick_run_schedule_loop error: {}", e); // the sender is closed so end the task return; diff --git a/azalea-client/src/local_player.rs b/azalea-client/src/local_player.rs index cc56ae55..b9f06dc0 100644 --- a/azalea-client/src/local_player.rs +++ b/azalea-client/src/local_player.rs @@ -8,6 +8,7 @@ use azalea_world::{ EntityInfos, PartialWorld, World, }; use bevy_ecs::{component::Component, query::Added, system::Query}; +use log::warn; use parking_lot::RwLock; use thiserror::Error; use tokio::{sync::mpsc, task::JoinHandle}; @@ -73,7 +74,6 @@ impl LocalPlayer { profile: GameProfile, packet_writer: mpsc::UnboundedSender, world: Arc>, - entity_infos: &mut EntityInfos, tx: mpsc::UnboundedSender, ) -> Self { let client_information = ClientInformation::default(); @@ -90,7 +90,6 @@ impl LocalPlayer { partial_world: Arc::new(RwLock::new(PartialWorld::new( client_information.view_distance.into(), Some(entity), - entity_infos, ))), world_name: None, @@ -129,6 +128,7 @@ pub fn update_in_loaded_chunk( mut commands: bevy_ecs::system::Commands, query: Query<(Entity, &LocalPlayer, &entity::Position)>, ) { + println!("update_in_loaded_chunk"); for (entity, local_player, position) in &query { let player_chunk_pos = ChunkPos::from(position); let in_loaded_chunk = local_player diff --git a/azalea-client/src/packet_handling.rs b/azalea-client/src/packet_handling.rs index 09bc0777..5681cfa6 100644 --- a/azalea-client/src/packet_handling.rs +++ b/azalea-client/src/packet_handling.rs @@ -1,4 +1,4 @@ -use std::{io::Cursor, sync::Arc}; +use std::{collections::HashSet, io::Cursor, sync::Arc}; use azalea_core::{ChunkPos, ResourceLocation, Vec3}; use azalea_protocol::{ @@ -17,7 +17,7 @@ use azalea_world::{ set_rotation, Dead, EntityBundle, EntityKind, LastSentPosition, MinecraftEntityId, Physics, PlayerBundle, Position, }, - EntityInfos, PartialWorld, WorldContainer, + EntityInfos, LoadedBy, PartialWorld, RelativeEntityUpdate, WorldContainer, }; use bevy_app::{App, Plugin}; use bevy_ecs::{ @@ -62,7 +62,7 @@ pub struct UpdatePlayerEvent(PlayerInfo); #[derive(Component, Clone)] pub struct PacketReceiver { pub packets: Arc>>, - pub run_schedule_sender: mpsc::UnboundedSender<()>, + pub run_schedule_sender: mpsc::Sender<()>, } fn handle_packets(ecs: &mut bevy_ecs::world::World) { @@ -167,7 +167,6 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { // player entity // in a shared world Some(player_entity), - &mut entity_infos, ); local_player.world = weak_world; @@ -403,24 +402,26 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { partial_world.chunks.view_center = ChunkPos::new(p.x, p.z); } ClientboundGamePacket::LevelChunkWithLight(p) => { - // debug!("Got chunk with light packet {} {}", p.x, p.z); + debug!("Got chunk with light packet {} {}", p.x, p.z); let pos = ChunkPos::new(p.x, p.z); let mut system_state: SystemState> = SystemState::new(ecs); let mut query = system_state.get_mut(ecs); - let mut local_player = query.get_mut(player_entity).unwrap(); - - let world = local_player.world.read(); - let partial_world = local_player.partial_world.read(); + let local_player = query.get_mut(player_entity).unwrap(); // OPTIMIZATION: if we already know about the chunk from the // shared world (and not ourselves), then we don't need to // parse it again. This is only used when we have a shared // world, since we check that the chunk isn't currently owned // by this client. - let shared_has_chunk = world.chunks.get(&pos).is_some(); - let this_client_has_chunk = partial_world.chunks.limited_get(&pos).is_some(); + let shared_has_chunk = local_player.world.read().chunks.get(&pos).is_some(); + let this_client_has_chunk = local_player + .partial_world + .read() + .chunks + .limited_get(&pos) + .is_some(); if shared_has_chunk && !this_client_has_chunk { trace!( "Skipping parsing chunk {:?} because we already know about it", @@ -431,8 +432,8 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { // ok we're sure we're going to mutate the world, so get exclusive write // access - let mut partial_world = local_player.partial_world.write(); let mut world = local_player.world.write(); + let mut partial_world = local_player.partial_world.write(); if let Err(e) = partial_world.chunks.replace_with_packet_data( &pos, @@ -455,31 +456,37 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { if let Some(world_name) = &local_player.world_name { let bundle = p.as_entity_bundle(world_name.clone()); - let mut entity_commands = commands.spawn((MinecraftEntityId(p.id), bundle)); + let mut entity_commands = commands.spawn(( + MinecraftEntityId(p.id), + LoadedBy(HashSet::from([player_entity])), + bundle, + )); // the bundle doesn't include the default entity metadata so we add that // separately p.apply_metadata(&mut entity_commands); } else { warn!("got add player packet but we haven't gotten a login packet yet"); } + + system_state.apply(ecs); } ClientboundGamePacket::SetEntityData(p) => { debug!("Got set entity data packet {:?}", p); let mut system_state: SystemState<( Commands, - Query<(&mut LocalPlayer, &EntityKind)>, + Query<&mut LocalPlayer>, + Query<&EntityKind>, )> = SystemState::new(ecs); - let (mut commands, mut query) = system_state.get_mut(ecs); - let (mut local_player, entity_kind) = query.get_mut(player_entity).unwrap(); + let (mut commands, mut query, entity_kind_query) = system_state.get_mut(ecs); + let local_player = query.get_mut(player_entity).unwrap(); - let partial_world = local_player.partial_world.write(); - let entity = partial_world - .entity_infos - .get_by_id(MinecraftEntityId(p.id)); - drop(partial_world); + let world = local_player.world.read(); + let entity = world.entity_by_id(&MinecraftEntityId(p.id)); + drop(world); if let Some(entity) = entity { + let entity_kind = entity_kind_query.get(entity).unwrap(); let mut entity_commands = commands.entity(entity); if let Err(e) = apply_metadata( &mut entity_commands, @@ -491,6 +498,8 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { } else { warn!("Server sent an entity data packet for an entity id ({}) that we don't know about", p.id); } + + system_state.apply(ecs); } ClientboundGamePacket::UpdateAttributes(_p) => { // debug!("Got update attributes packet {:?}", p); @@ -513,10 +522,17 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { if let Some(world_name) = &local_player.world_name { let bundle = p.as_player_bundle(world_name.clone()); - commands.spawn((MinecraftEntityId(p.id), bundle)); + let spawned = commands.spawn(( + MinecraftEntityId(p.id), + LoadedBy(HashSet::from([player_entity])), + bundle, + )); + println!("spawned player entity: {:?}", spawned.id()); } else { warn!("got add player packet but we haven't gotten a login packet yet"); } + + system_state.apply(ecs); } ClientboundGamePacket::InitializeBorder(p) => { debug!("Got initialize border packet {:?}", p); @@ -547,18 +563,25 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { debug!("Got set experience packet {:?}", p); } ClientboundGamePacket::TeleportEntity(p) => { - let mut system_state: SystemState> = + let mut system_state: SystemState<(Commands, Query<&mut LocalPlayer>)> = SystemState::new(ecs); - let mut query = system_state.get_mut(ecs); - let (mut local_player, mut position) = query.get_mut(player_entity).unwrap(); + let (mut commands, mut query) = system_state.get_mut(ecs); + let local_player = query.get_mut(player_entity).unwrap(); - let partial_world = local_player.partial_world.read(); - let partial_entity_infos = &partial_world.entity_infos; - let entity = partial_entity_infos.get_by_id(MinecraftEntityId(p.id)); - drop(partial_world); + let world = local_player.world.read(); + let entity = world.entity_by_id(&MinecraftEntityId(p.id)); + drop(world); if let Some(entity) = entity { - **position = p.position; + let mut new_position = p.position.clone(); + commands.add(RelativeEntityUpdate { + entity, + partial_world: local_player.partial_world.clone(), + update: Box::new(move |entity| { + let mut position = entity.get_mut::().unwrap(); + **position = new_position + }), + }); } else { warn!("Got teleport entity packet for unknown entity id {}", p.id); } @@ -570,45 +593,64 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { // debug!("Got rotate head packet {:?}", p); } ClientboundGamePacket::MoveEntityPos(p) => { - let mut system_state: SystemState> = + let mut system_state: SystemState<(Commands, Query<&mut LocalPlayer>)> = SystemState::new(ecs); - let mut query = system_state.get_mut(ecs); - let (mut local_player, mut position) = query.get_mut(player_entity).unwrap(); + let (mut commands, mut query) = system_state.get_mut(ecs); + let local_player = query.get_mut(player_entity).unwrap(); - let partial_world = local_player.partial_world.read(); - let partial_entity_infos = &partial_world.entity_infos; - let entity = partial_entity_infos.get_by_id(MinecraftEntityId(p.entity_id)); - drop(partial_world); + let world = local_player.world.read(); + let entity = world.entity_by_id(&MinecraftEntityId(p.entity_id)); + drop(world); if let Some(entity) = entity { - **position = position.with_delta(&p.delta); + let delta = p.delta.clone(); + commands.add(RelativeEntityUpdate { + entity, + partial_world: local_player.partial_world.clone(), + update: Box::new(move |entity| { + let mut position = entity.get_mut::().unwrap(); + **position = position.with_delta(&delta) + }), + }); } else { warn!( "Got move entity pos packet for unknown entity id {}", p.entity_id ); } + + system_state.apply(ecs); } ClientboundGamePacket::MoveEntityPosRot(p) => { - let mut system_state: SystemState> = + let mut system_state: SystemState<(Commands, Query<&mut LocalPlayer>)> = SystemState::new(ecs); - let mut query = system_state.get_mut(ecs); - let (mut local_player, mut position) = query.get_mut(player_entity).unwrap(); + let (mut commands, mut query) = system_state.get_mut(ecs); + let local_player = query.get_mut(player_entity).unwrap(); - let partial_world = local_player.partial_world.read(); - let partial_entity_infos = &partial_world.entity_infos; - let entity = partial_entity_infos.get_by_id(MinecraftEntityId(p.entity_id)); - drop(partial_world); + let world = local_player.world.read(); + let entity = world.entity_by_id(&MinecraftEntityId(p.entity_id)); + drop(world); if let Some(entity) = entity { - **position = position.with_delta(&p.delta); + let delta = p.delta.clone(); + commands.add(RelativeEntityUpdate { + entity, + partial_world: local_player.partial_world.clone(), + update: Box::new(move |entity| { + let mut position = entity.get_mut::().unwrap(); + **position = position.with_delta(&delta) + }), + }); } else { warn!( "Got move entity pos rot packet for unknown entity id {}", p.entity_id ); } + + system_state.apply(ecs); } + ClientboundGamePacket::MoveEntityRot(_p) => { // debug!("Got move entity rot packet {:?}", p); } @@ -618,7 +660,6 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { let mut system_state: SystemState> = SystemState::new(ecs); let mut query = system_state.get_mut(ecs); - let mut local_player = query.get_mut(player_entity).unwrap(); let mut local_player = query.get_mut(player_entity).unwrap(); local_player.write_packet(ServerboundKeepAlivePacket { id: p.id }.get()); @@ -747,6 +788,8 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { .send(Event::Death(Some(Arc::new(p.clone())))) .unwrap(); } + + system_state.apply(ecs); } ClientboundGamePacket::PlayerLookAt(_) => {} ClientboundGamePacket::RemoveMobEffect(_) => {} @@ -759,6 +802,8 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) { // Remove the Dead marker component from the player. commands.entity(player_entity).remove::(); + + system_state.apply(ecs); } ClientboundGamePacket::SelectAdvancementsTab(_) => {} ClientboundGamePacket::SetActionBarText(_) => {} @@ -804,7 +849,7 @@ impl PacketReceiver { while let Ok(packet) = read_conn.read().await { self.packets.lock().push(packet); // tell the client to run all the systems - self.run_schedule_sender.send(()).unwrap(); + self.run_schedule_sender.send(()).await.unwrap(); } } diff --git a/azalea-world/Cargo.toml b/azalea-world/Cargo.toml index 1e44ad04..d2d42940 100644 --- a/azalea-world/Cargo.toml +++ b/azalea-world/Cargo.toml @@ -15,7 +15,7 @@ azalea-chat = {path = "../azalea-chat", version = "^0.5.0"} azalea-core = {path = "../azalea-core", version = "^0.5.0", features = ["bevy_ecs"]} azalea-nbt = {path = "../azalea-nbt", version = "^0.5.0"} azalea-registry = {path = "../azalea-registry", version = "^0.5.0"} -bevy_app = { version = "0.9.1", default-features = false } +bevy_app = {version = "0.9.1", default-features = false} bevy_ecs = {version = "0.9.1", default-features = false} derive_more = {version = "0.99.17", features = ["deref", "deref_mut"]} enum-as-inner = "0.5.1" diff --git a/azalea-world/src/container.rs b/azalea-world/src/container.rs index 966e2c3b..5c9fa95d 100644 --- a/azalea-world/src/container.rs +++ b/azalea-world/src/container.rs @@ -1,6 +1,7 @@ use azalea_core::ResourceLocation; use bevy_ecs::system::Resource; use log::error; +use nohash_hasher::IntMap; use parking_lot::RwLock; use std::{ collections::HashMap, @@ -68,6 +69,7 @@ impl WorldContainer { let world = Arc::new(RwLock::new(World { chunks: ChunkStorage::new(height, min_y), entities_by_chunk: HashMap::new(), + entity_by_id: IntMap::default(), })); self.worlds.insert(name, Arc::downgrade(&world)); world diff --git a/azalea-world/src/entity_info.rs b/azalea-world/src/entity_info.rs index 91a4d553..487dab29 100644 --- a/azalea-world/src/entity_info.rs +++ b/azalea-world/src/entity_info.rs @@ -1,17 +1,31 @@ use crate::{ - entity::{self, add_dead, update_bounding_box, Entity, MinecraftEntityId}, - MaybeRemovedEntity, World, WorldContainer, + deduplicate_entities, + entity::{ + self, add_dead, update_bounding_box, Entity, EntityUuid, MinecraftEntityId, Position, + WorldName, + }, + update_entity_by_id_index, update_uuid_index, PartialWorld, World, WorldContainer, }; use azalea_core::ChunkPos; use bevy_app::{App, Plugin}; use bevy_ecs::{ - query::Changed, - schedule::SystemSet, - system::{Query, Res, ResMut, Resource}, + prelude::Component, + query::{Added, Changed, With, Without}, + schedule::{IntoSystemDescriptor, SystemSet}, + system::{Command, Commands, Query, Res, ResMut, Resource}, + world::EntityMut, }; -use log::warn; +use derive_more::{Deref, DerefMut}; +use iyes_loopless::prelude::*; +use log::{debug, trace, warn}; use nohash_hasher::{IntMap, IntSet}; -use std::{collections::HashMap, fmt::Debug, ops::DerefMut}; +use parking_lot::RwLock; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + ops::DerefMut, + sync::Arc, +}; use uuid::Uuid; /// Plugin handling some basic entity functionality. @@ -25,11 +39,27 @@ impl Plugin for EntityPlugin { .with_system(update_entity_chunk_positions) .with_system(remove_despawned_entities_from_indexes) .with_system(update_bounding_box) - .with_system(add_dead), + .with_system(add_dead) + .with_system(add_updates_received.label("add_updates_received")) + .with_system( + deduplicate_entities + .after("add_reference_count") + .label("deduplicate_entities"), + ) + .with_system(update_uuid_index.after("deduplicate_entities")) + .with_system(debug_detect_updates_received_on_local_entities) + .with_system(update_entity_by_id_index) + .with_system(debug_new_entity), ); } } +fn debug_new_entity(query: Query>) { + for entity in query.iter() { + debug!("new entity: {:?}", entity); + } +} + // How entity updates are processed (to avoid issues with shared worlds) // - each bot contains a map of { entity id: updates received } // - the shared world also contains a canonical "true" updates received for each @@ -69,70 +99,69 @@ pub struct PartialEntityInfos { /// This is used for shared worlds (i.e. swarms), to make sure we don't /// update entities twice on accident. pub updates_received: IntMap, - /// A set of all the entity ids in render distance. - pub(crate) loaded_entity_ids: IntSet, - - /// A map of Minecraft entity ids to Azalea ECS entities. - pub(crate) entity_by_id: IntMap, } impl PartialEntityInfos { - pub fn new(owner_entity: Option, entity_infos: &mut EntityInfos) -> Self { - if let Some(owner_entity) = owner_entity { - entity_infos.updates_received.insert(owner_entity, 0); - } + pub fn new(owner_entity: Option) -> Self { Self { owner_entity, updates_received: IntMap::default(), - loaded_entity_ids: IntSet::default(), - entity_by_id: IntMap::default(), } } +} - /// Whether the entity with the given protocol ID is being loaded by this - /// storage. - #[inline] - pub fn contains_id(&self, id: MinecraftEntityId) -> bool { - self.loaded_entity_ids.contains(&id) - } +/// A [`Command`] that applies a "relative update" to an entity, which means +/// this update won't be run multiple times by different clients in the same +/// world. +/// +/// This is used to avoid a bug where when there's multiple clients in the same +/// world and an entity sends a relative move packet to all clients, its +/// position gets desynced since the relative move is applied multiple times. +/// +/// Don't use this unless you actually got an entity update packet that all +/// other clients within render distance will get too. You usually don't need +/// this when the change isn't relative either. +pub struct RelativeEntityUpdate { + pub entity: Entity, + pub partial_world: Arc>, + // a function that takes the entity and updates it + pub update: Box, +} +impl Command for RelativeEntityUpdate { + fn write(self, world: &mut bevy_ecs::world::World) { + let partial_entity_infos = &mut self.partial_world.write().entity_infos; - /// Get an [`Entity`] from the given [`MinecraftEntityId`] (which is just a - /// u32 internally) if the entity is being loaded by this storage. - #[inline] - pub fn get_by_id(&self, id: MinecraftEntityId) -> Option { - self.entity_by_id.get(&id).copied() - } + let mut entity = world.entity_mut(self.entity); - /// Returns whether we're allowed to update this entity (to prevent two - /// clients in a shared world updating it twice), and acknowleges that - /// we WILL update it if it's true. Don't call this unless you actually - /// got an entity update that all other clients within render distance - /// will get too. - pub fn maybe_update( - &mut self, - entity: Entity, - id: &MinecraftEntityId, - entity_infos: &mut EntityInfos, - ) -> bool { - if Some(entity) == self.owner_entity { - // the owner of the entity is always allowed to update it - return true; + if Some(self.entity) == partial_entity_infos.owner_entity { + // if the entity owns this partial world, it's always allowed to update itself + (self.update)(&mut entity); + return; }; - let this_client_updates_received = self.updates_received.get(id).copied(); + let entity_id = *entity.get::().unwrap(); - let shared_updates_received = entity_infos.updates_received.get(&entity).copied(); + let Some(updates_received) = entity.get_mut::() else { + // a client tried to update another client, which isn't allowed + return; + }; - let can_update = this_client_updates_received == shared_updates_received; + let this_client_updates_received = partial_entity_infos + .updates_received + .get(&entity_id) + .copied(); + + let can_update = this_client_updates_received == Some(**updates_received); if can_update { let new_updates_received = this_client_updates_received.unwrap_or(0) + 1; - self.updates_received.insert(*id, new_updates_received); - entity_infos + partial_entity_infos .updates_received - .insert(entity, new_updates_received); - true - } else { - false + .insert(entity_id, new_updates_received); + + **entity.get_mut::().unwrap() = new_updates_received; + + let mut entity = world.entity_mut(self.entity); + (self.update)(&mut entity); } } } @@ -142,92 +171,16 @@ impl PartialEntityInfos { /// Things that are shared between all the partial worlds. #[derive(Resource, Default)] pub struct EntityInfos { - // in WeakEntityInfos, we have to use [`Entity`] since there *is* a chance of collision if - // we'd have used Minecraft entity IDs - /// The number of `PartialWorld`s that have this entity loaded. - /// (this is reference counting) - pub(crate) entity_reference_count: HashMap, /// An index of entities by their UUIDs pub(crate) entity_by_uuid: HashMap, - - /// The canonical number of updates we've gotten for every entity. - pub updates_received: HashMap, } impl EntityInfos { pub fn new() -> Self { Self { - entity_reference_count: HashMap::default(), entity_by_uuid: HashMap::default(), - updates_received: HashMap::default(), } } - - /// Call this if a [`PartialEntityStorage`] just removed an entity. - /// - /// It'll decrease the reference count and remove the entity from the - /// storage if there's no more references to it. - /// - /// Returns whether the entity was removed. - pub fn remove_entity_if_unused( - &mut self, - entity: Entity, - uuid: Uuid, - chunk: ChunkPos, - world: &mut World, - ) -> bool { - if let Some(count) = self.entity_reference_count.get_mut(&entity) { - *count -= 1; - if *count == 0 { - self.entity_reference_count.remove(&entity); - return true; - } - } else { - warn!("Tried to remove entity but it was not found."); - return false; - } - if let Some(entities_in_chunk) = world.entities_by_chunk.get_mut(&chunk) { - if entities_in_chunk.remove(&entity) { - // remove the chunk if there's no entities in it anymore - if entities_in_chunk.is_empty() { - world.entities_by_chunk.remove(&chunk); - } - } else { - warn!("Tried to remove entity from chunk {chunk:?} but the entity was not there."); - } - } else { - warn!("Tried to remove entity from chunk {chunk:?} but the chunk was not found."); - } - if self.entity_by_uuid.remove(&uuid).is_none() { - warn!("Tried to remove entity from uuid {uuid:?} but it was not found."); - } - if self.updates_received.remove(&entity).is_none() { - // if this happens it means we weren't tracking the updates_received for the - // client (bad) - warn!("Tried to remove entity from updates_received but it was not found."); - } - true - } - - /// Whether the entity is in the shared storage. To check if a Minecraft - /// entity ID is in the storage, you'll have to use - /// [`PartialEntityInfo::limited_contains_id`]. - #[inline] - pub fn contains_entity(&self, id: Entity) -> bool { - self.entity_reference_count.contains_key(&id) - } - - /// Get an [`Entity`] by its UUID. - /// - /// If you want to get an entity by its protocol ID, use - /// [`PartialEntityInfos::entity_by_id`]. - /// - /// Also note that if you're using a shared world (i.e. a client swarm), - /// this function might return the wrong entity if there's multiple - /// entities with the same uuid in different worlds. - pub fn entity_by_uuid(&self, uuid: &Uuid) -> Option<&Entity> { - self.entity_by_uuid.get(uuid) - } } /// Update the chunk position indexes in [`EntityInfos`]. @@ -263,28 +216,89 @@ fn update_entity_chunk_positions( } } } +/// A component that lists all the local player entities that have this entity +/// loaded. If this is empty, the entity will be removed from the ECS. +#[derive(Component, Clone, Deref, DerefMut)] +pub struct LoadedBy(pub HashSet); -pub fn remove_despawned_entities_from_indexes( - mut entity_infos: ResMut, - world_container: Res, +/// A component that counts the number of times this entity has been modified. +/// This is used for making sure two clients don't do the same relative update +/// on an entity. +/// +/// If an entity is local (i.e. it's a client/localplayer), this component +/// should NOT be present in the entity. +#[derive(Component, Debug, Deref, DerefMut)] +pub struct UpdatesReceived(u32); + +pub fn add_updates_received( + mut commands: Commands, query: Query< + Entity, ( - Entity, - &entity::EntityUuid, - &entity::Position, - &entity::WorldName, + Changed, + (Without, Without), ), - &MaybeRemovedEntity, >, ) { - for (entity, uuid, position, world_name) in &query { - let world = world_container.get(world_name).unwrap(); - entity_infos.remove_entity_if_unused( - entity, - **uuid, - (*position).into(), - world.write().deref_mut(), - ); + for entity in query.iter() { + // entities always start with 1 update received + commands.entity(entity).insert(UpdatesReceived(1)); + } +} + +/// A marker component that signifies that this entity is "local" and shouldn't +/// be updated by other clients. +#[derive(Component)] +pub struct Local; + +/// The [`UpdatesReceived`] component should never be on [`Local`] entities. +/// This warns if an entity has both components. +fn debug_detect_updates_received_on_local_entities( + query: Query, With)>, +) { + for entity in &query { + warn!("Entity {:?} has both Local and UpdatesReceived", entity); + } +} + +/// Despawn entities that aren't being loaded by anything. +fn remove_despawned_entities_from_indexes( + mut commands: Commands, + mut entity_infos: ResMut, + world_container: Res, + query: Query<(Entity, &EntityUuid, &Position, &WorldName, &LoadedBy), Changed>, +) { + for (entity, uuid, position, world_name, loaded_by) in &query { + let world_lock = world_container.get(world_name).unwrap(); + let mut world = world_lock.write(); + + // if the entity has no references left, despawn it + if !loaded_by.is_empty() { + continue; + } + + // remove the entity from the chunk index + let chunk = ChunkPos::from(*position); + if let Some(entities_in_chunk) = world.entities_by_chunk.get_mut(&chunk) { + if entities_in_chunk.remove(&entity) { + // remove the chunk if there's no entities in it anymore + if entities_in_chunk.is_empty() { + world.entities_by_chunk.remove(&chunk); + } + } else { + warn!("Tried to remove entity from chunk {chunk:?} but the entity was not there."); + } + } else { + warn!("Tried to remove entity from chunk {chunk:?} but the chunk was not found."); + } + // remove it from the uuid index + if entity_infos.entity_by_uuid.remove(&*uuid).is_none() { + warn!("Tried to remove entity {entity:?} from the uuid index but it was not there."); + } + // and now remove the entity from the ecs + commands.entity(entity).despawn(); + debug!("Despawned entity {entity:?} because it was not loaded by anything."); + return; } } diff --git a/azalea-world/src/lib.rs b/azalea-world/src/lib.rs index ddf4cb59..5401dcd7 100644 --- a/azalea-world/src/lib.rs +++ b/azalea-world/src/lib.rs @@ -15,7 +15,9 @@ use std::backtrace::Backtrace; pub use bit_storage::BitStorage; pub use chunk_storage::{Chunk, ChunkStorage, PartialChunkStorage}; pub use container::*; -pub use entity_info::{EntityInfos, PartialEntityInfos, EntityPlugin}; +pub use entity_info::{ + EntityInfos, EntityPlugin, LoadedBy, Local, PartialEntityInfos, RelativeEntityUpdate, +}; use thiserror::Error; pub use world::*; diff --git a/azalea-world/src/world.rs b/azalea-world/src/world.rs index cd04e628..2a1f9a2b 100644 --- a/azalea-world/src/world.rs +++ b/azalea-world/src/world.rs @@ -1,17 +1,24 @@ use crate::{ - entity::{self, Entity, MinecraftEntityId, WorldName}, + entity::{self, Entity, EntityUuid, MinecraftEntityId, Position, WorldName}, + entity_info::LoadedBy, ChunkStorage, EntityInfos, PartialChunkStorage, PartialEntityInfos, WorldContainer, }; use azalea_core::ChunkPos; use bevy_ecs::{ component::Component, - system::{Commands, Query}, + prelude::Bundle, + query::{Changed, Without}, + system::{Commands, Query, Res, ResMut}, }; -use std::fmt::Formatter; +use derive_more::{Deref, DerefMut}; +use log::{debug, error, info, warn}; +use nohash_hasher::IntMap; +use parking_lot::RwLock; use std::{ collections::{HashMap, HashSet}, fmt::Debug, }; +use std::{fmt::Formatter, sync::Arc}; /// PartialWorlds are usually owned by clients, and hold strong references to /// chunks and entities in [`WeakWorld`]s. @@ -29,100 +36,112 @@ pub struct PartialWorld { } impl PartialWorld { - pub fn new( - chunk_radius: u32, - owner_entity: Option, - entity_infos: &mut EntityInfos, - ) -> Self { + pub fn new(chunk_radius: u32, owner_entity: Option) -> Self { PartialWorld { chunks: PartialChunkStorage::new(chunk_radius), - entity_infos: PartialEntityInfos::new(owner_entity, entity_infos), + entity_infos: PartialEntityInfos::new(owner_entity), } } +} - /// Add an entity to the storage. - #[inline] - pub fn add_entity( - &mut self, - commands: &mut Commands, - bundle: impl bevy_ecs::bundle::Bundle, - entity_infos: &mut EntityInfos, - world: &mut World, - query: Query<(&entity::Position, &MinecraftEntityId, &entity::EntityUuid)>, - id_query: Query<&MinecraftEntityId>, - ) { - let mut entity_commands = commands.spawn(bundle); - let entity = entity_commands.id(); - let (position, &id, uuid) = query.get(entity).unwrap(); - let chunk_pos = ChunkPos::from(*position); +/// Remove new entities that have the same id as an existing entity, and +/// increase the reference counts. +/// +/// This is the reason why spawning entities into the ECS when you get a spawn +/// entity packet is okay. This system will make sure the new entity gets +/// combined into the old one. +pub fn deduplicate_entities( + mut commands: Commands, + mut query: Query< + (Entity, &MinecraftEntityId, &WorldName, &mut Position), + Changed, + >, + mut id_query: Query<&MinecraftEntityId>, + mut loaded_by_query: Query<&mut LoadedBy>, + mut entity_infos: ResMut, + mut world_container: ResMut, +) { + // if this entity already exists, remove it + for (entity, id, world_name, mut position) in query.iter_mut() { + let entity_chunk = ChunkPos::from(*position); + if let Some(world_lock) = world_container.get(world_name) { + let world = world_lock.write(); + if let Some(entities_in_chunk) = world.entities_by_chunk.get(&entity_chunk) { + for other_entity in entities_in_chunk { + // if it's the same entity, skip it + if other_entity == &entity { + continue; + } - // check every entity in this entitys chunk to make sure it doesn't already - // exist there - if let Some(entities_in_chunk) = world.entities_by_chunk.get(&chunk_pos) { - for entity in entities_in_chunk { - if id_query.get(*entity).unwrap() == &id { - // the entity is already in the world, so remove that extra entity we just made - entity_commands.despawn(); - return; + let other_entity_id = id_query + .get(*other_entity) + .expect("Entities should always have ids"); + if other_entity_id == id { + // this entity already exists!!! remove the one we just added but increase + // the reference count + let new_loaded_by = loaded_by_query + .get(entity) + .expect("Entities should always have the LoadedBy component") + .clone(); + let mut other_loaded_by = loaded_by_query + .get_mut(*other_entity) + .expect("Entities should always have the LoadedBy component"); + // merge them + other_loaded_by.extend(new_loaded_by.iter()); + commands.entity(entity).despawn(); + info!( + "Entity with id {id:?} already existed in the world, overwriting it with entity {entity:?}", + id = id, + entity = entity, + ); + break; + } } } - } - - let partial_entity_infos = &mut self.entity_infos; - partial_entity_infos.loaded_entity_ids.insert(id); - - // add the entity to the indexes - world - .entities_by_chunk - .entry(chunk_pos) - .or_default() - .insert(entity); - entity_infos.entity_by_uuid.insert(**uuid, entity); - // set our updates_received to the shared updates_received, unless it's - // not there in which case set both to 1 - if let Some(&shared_updates_received) = entity_infos.updates_received.get(&entity) { - // 0 means we're never tracking updates for this entity - if shared_updates_received != 0 || Some(entity) == partial_entity_infos.owner_entity { - partial_entity_infos - .updates_received - .insert(id, shared_updates_received); - } } else { - entity_infos.updates_received.insert(entity, 1); - partial_entity_infos.updates_received.insert(id, 1); + error!("Entity was inserted into a world that doesn't exist.") } } } -/// A component marker signifying that the entity may have been removed from the -/// world, but we're not entirely sure. -#[derive(Component)] -pub struct MaybeRemovedEntity; - -/// Clear all entities in a chunk. This will not clear them from the -/// shared storage unless there are no other references to them. -pub fn clear_entities_in_chunk( - commands: &mut Commands, - partial_entity_infos: &mut PartialEntityInfos, - chunk: &ChunkPos, - world_container: &WorldContainer, - world_name: &WorldName, - query: Query<&MinecraftEntityId>, +pub fn update_uuid_index( + mut entity_infos: ResMut, + query: Query<(Entity, &EntityUuid), Changed>, ) { - let world_lock = world_container.get(world_name).unwrap(); - let world = world_lock.read(); - - if let Some(entities) = world.entities_by_chunk.get(chunk).cloned() { - for &entity in &entities { - let id = query.get(entity).unwrap(); - if partial_entity_infos.loaded_entity_ids.remove(id) { - // maybe remove it from the storage - commands.entity(entity).insert(MaybeRemovedEntity); - } + for (entity, &uuid) in query.iter() { + if let Some(old_entity) = entity_infos.entity_by_uuid.insert(*uuid, entity) { + warn!( + "Entity with UUID {uuid:?} ({old_entity:?}) already existed in the world, overwriting it with entity {entity:?}", + uuid = *uuid, + ); } } } +// /// Clear all entities in a chunk. This will not clear them from the +// /// shared storage unless there are no other references to them. +// pub fn clear_entities_in_chunk( +// mut commands: Commands, +// partial_entity_infos: &mut PartialEntityInfos, +// chunk: &ChunkPos, +// world_container: &WorldContainer, +// world_name: &WorldName, +// mut query: Query<(&MinecraftEntityId, &mut ReferenceCount)>, +// ) { +// let world_lock = world_container.get(world_name).unwrap(); +// let world = world_lock.read(); + +// if let Some(entities) = world.entities_by_chunk.get(chunk).cloned() { +// for &entity in &entities { +// let (id, mut reference_count) = query.get_mut(entity).unwrap(); +// if partial_entity_infos.loaded_entity_ids.remove(id) { +// // decrease the reference count +// **reference_count -= 1; +// } +// } +// } +// } + /// A world where the chunks are stored as weak pointers. This is used for /// shared worlds. #[derive(Default, Debug)] @@ -131,6 +150,16 @@ pub struct World { /// An index of all the entities we know are in the chunks of the world pub entities_by_chunk: HashMap>, + + /// An index of Minecraft entity IDs to Azalea ECS entities. + pub entity_by_id: IntMap, +} + +impl World { + /// Get an ECS [`Entity`] from a Minecraft entity ID. + pub fn entity_by_id(&self, entity_id: &MinecraftEntityId) -> Option { + self.entity_by_id.get(entity_id).copied() + } } impl Debug for PartialWorld { @@ -154,3 +183,16 @@ impl Default for PartialWorld { } } } + +/// System to keep the entity_by_id index up-to-date. +pub fn update_entity_by_id_index( + mut query: Query<(Entity, &MinecraftEntityId, &WorldName), Changed>, + world_container: Res, +) { + for (entity, id, world_name) in query.iter_mut() { + let world_lock = world_container.get(world_name).unwrap(); + let mut world = world_lock.write(); + world.entity_by_id.insert(*id, entity); + debug!("Added {entity:?} to {world_name:?} with {id:?}."); + } +}