1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

fix merge issues

This commit is contained in:
mat 2025-02-23 03:37:42 +00:00
parent 1812bfbe79
commit 55ceb63c81
5 changed files with 8 additions and 2208 deletions

View file

@ -288,21 +288,16 @@ impl<S> CommandDispatcher<S> {
next.push(child.copy_for(context.source.clone()));
}
}
} else {
match &context.command {
Some(context_command) => {
found_command = true;
} else if let Some(context_command) = &context.command {
found_command = true;
let value = context_command(context);
result += value;
// consumer.on_command_complete(context, true, value);
successful_forks += 1;
let value = context_command(context);
result += value;
// consumer.on_command_complete(context, true, value);
successful_forks += 1;
// TODO: allow context_command to error and handle
// those errors
}
_ => {}
}
// TODO: allow context_command to error and handle
// those errors
}
}

View file

@ -1,225 +0,0 @@
//! Used for Minecraft's chunk batching introduced in 23w31a (1.20.2). It's used
//! for making the server spread out how often it sends us chunk packets
//! depending on our receiving speed.
use std::{
io::Cursor,
ops::Deref,
time::{Duration, Instant},
};
use azalea_core::position::ChunkPos;
use azalea_protocol::packets::game::{
c_level_chunk_with_light::ClientboundLevelChunkWithLight,
s_chunk_batch_received::ServerboundChunkBatchReceived,
};
use bevy_app::{App, Plugin, Update};
use bevy_ecs::prelude::*;
use simdnbt::owned::BaseNbt;
use tracing::{error, trace};
use crate::{
InstanceHolder,
interact::handle_block_interact_event,
inventory::InventorySet,
packet::game::{SendPacketEvent, handle_send_packet_event},
respawn::perform_respawn,
};
pub struct ChunkPlugin;
impl Plugin for ChunkPlugin {
fn build(&self, app: &mut App) {
app.add_systems(
Update,
(
handle_chunk_batch_start_event,
handle_receive_chunk_events,
handle_chunk_batch_finished_event,
)
.chain()
.before(handle_send_packet_event)
.before(InventorySet)
.before(handle_block_interact_event)
.before(perform_respawn),
)
.add_event::<ReceiveChunkEvent>()
.add_event::<ChunkBatchStartEvent>()
.add_event::<ChunkBatchFinishedEvent>();
}
}
#[derive(Event)]
pub struct ReceiveChunkEvent {
pub entity: Entity,
pub packet: ClientboundLevelChunkWithLight,
}
#[derive(Component, Clone, Debug)]
pub struct ChunkBatchInfo {
pub start_time: Instant,
pub aggregated_duration_per_chunk: Duration,
pub old_samples_weight: u32,
}
#[derive(Event)]
pub struct ChunkBatchStartEvent {
pub entity: Entity,
}
#[derive(Event)]
pub struct ChunkBatchFinishedEvent {
pub entity: Entity,
pub batch_size: u32,
}
pub fn handle_receive_chunk_events(
mut events: EventReader<ReceiveChunkEvent>,
mut query: Query<&mut InstanceHolder>,
) {
for event in events.read() {
let pos = ChunkPos::new(event.packet.x, event.packet.z);
let local_player = query.get_mut(event.entity).unwrap();
let mut instance = local_player.instance.write();
let mut partial_instance = local_player.partial_instance.write();
// OPTIMIZATION: if we already know about the chunk from the shared world (and
// not ourselves), then we don't need to parse it again. This is only used when
// we have a shared world, since we check that the chunk isn't currently owned
// by this client.
let shared_chunk = instance.chunks.get(&pos);
let this_client_has_chunk = partial_instance.chunks.limited_get(&pos).is_some();
if !this_client_has_chunk {
if let Some(shared_chunk) = shared_chunk {
trace!("Skipping parsing chunk {pos:?} because we already know about it");
partial_instance
.chunks
.limited_set(&pos, Some(shared_chunk));
continue;
}
}
let heightmaps_nbt = &event.packet.chunk_data.heightmaps;
// necessary to make the unwrap_or work
let empty_nbt = BaseNbt::default();
let heightmaps = heightmaps_nbt.unwrap_or(&empty_nbt).deref();
if let Err(e) = partial_instance.chunks.replace_with_packet_data(
&pos,
&mut Cursor::new(&event.packet.chunk_data.data),
heightmaps,
&mut instance.chunks,
) {
error!(
"Couldn't set chunk data: {e}. World height: {}",
instance.chunks.height
);
}
}
}
impl ChunkBatchInfo {
pub fn batch_finished(&mut self, batch_size: u32) {
if batch_size == 0 {
return;
}
let batch_duration = self.start_time.elapsed();
let duration_per_chunk = batch_duration / batch_size;
let clamped_duration = Duration::clamp(
duration_per_chunk,
self.aggregated_duration_per_chunk / 3,
self.aggregated_duration_per_chunk * 3,
);
self.aggregated_duration_per_chunk =
((self.aggregated_duration_per_chunk * self.old_samples_weight) + clamped_duration)
/ (self.old_samples_weight + 1);
self.old_samples_weight = u32::min(49, self.old_samples_weight + 1);
}
pub fn desired_chunks_per_tick(&self) -> f32 {
(7000000. / self.aggregated_duration_per_chunk.as_nanos() as f64) as f32
}
}
pub fn handle_chunk_batch_start_event(
mut query: Query<&mut ChunkBatchInfo>,
mut events: EventReader<ChunkBatchStartEvent>,
) {
for event in events.read() {
if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
chunk_batch_info.start_time = Instant::now();
}
}
}
pub fn handle_chunk_batch_finished_event(
mut query: Query<&mut ChunkBatchInfo>,
mut events: EventReader<ChunkBatchFinishedEvent>,
mut send_packets: EventWriter<SendPacketEvent>,
) {
for event in events.read() {
if let Ok(mut chunk_batch_info) = query.get_mut(event.entity) {
chunk_batch_info.batch_finished(event.batch_size);
let desired_chunks_per_tick = chunk_batch_info.desired_chunks_per_tick();
send_packets.send(SendPacketEvent::new(
event.entity,
ServerboundChunkBatchReceived {
desired_chunks_per_tick,
},
));
}
}
}
#[derive(Clone, Debug)]
pub struct ChunkReceiveSpeedAccumulator {
batch_sizes: Vec<u32>,
/// as milliseconds
batch_durations: Vec<u32>,
index: usize,
filled_size: usize,
}
impl ChunkReceiveSpeedAccumulator {
pub fn new(capacity: usize) -> Self {
Self {
batch_sizes: vec![0; capacity],
batch_durations: vec![0; capacity],
index: 0,
filled_size: 0,
}
}
pub fn accumulate(&mut self, batch_size: u32, batch_duration: Duration) {
self.batch_sizes[self.index] = batch_size;
self.batch_durations[self.index] =
f32::clamp(batch_duration.as_millis() as f32, 0., 15000.) as u32;
self.index = (self.index + 1) % self.batch_sizes.len();
if self.filled_size < self.batch_sizes.len() {
self.filled_size += 1;
}
}
pub fn get_millis_per_chunk(&self) -> f64 {
let mut total_batch_size = 0;
let mut total_batch_duration = 0;
for i in 0..self.filled_size {
total_batch_size += self.batch_sizes[i];
total_batch_duration += self.batch_durations[i];
}
if total_batch_size == 0 {
return 0.;
}
total_batch_duration as f64 / total_batch_size as f64
}
}
impl Default for ChunkBatchInfo {
fn default() -> Self {
Self {
start_time: Instant::now(),
aggregated_duration_per_chunk: Duration::from_millis(2),
old_samples_weight: 1,
}
}
}

View file

@ -1,271 +0,0 @@
use std::io::Cursor;
use azalea_entity::indexing::EntityIdIndex;
use azalea_protocol::packets::config::s_finish_configuration::ServerboundFinishConfiguration;
use azalea_protocol::packets::config::s_keep_alive::ServerboundKeepAlive;
use azalea_protocol::packets::config::s_select_known_packs::ServerboundSelectKnownPacks;
use azalea_protocol::packets::config::{
self, ClientboundConfigPacket, ServerboundConfigPacket, ServerboundCookieResponse,
ServerboundResourcePack,
};
use azalea_protocol::packets::{ConnectionProtocol, Packet};
use azalea_protocol::read::deserialize_packet;
use bevy_ecs::prelude::*;
use bevy_ecs::system::SystemState;
use tracing::{debug, error, warn};
use crate::InstanceHolder;
use crate::client::InConfigState;
use crate::disconnect::DisconnectEvent;
use crate::local_player::Hunger;
use crate::packet::game::KeepAliveEvent;
use crate::raw_connection::RawConnection;
#[derive(Event, Debug, Clone)]
pub struct ConfigurationEvent {
/// The client entity that received the packet.
pub entity: Entity,
/// The packet that was actually received.
pub packet: ClientboundConfigPacket,
}
pub fn send_packet_events(
query: Query<(Entity, &RawConnection), With<InConfigState>>,
mut packet_events: ResMut<Events<ConfigurationEvent>>,
) {
// we manually clear and send the events at the beginning of each update
// since otherwise it'd cause issues with events in process_packet_events
// running twice
packet_events.clear();
for (player_entity, raw_conn) in &query {
let packets_lock = raw_conn.incoming_packet_queue();
let mut packets = packets_lock.lock();
if !packets.is_empty() {
for raw_packet in packets.iter() {
let packet = match deserialize_packet::<ClientboundConfigPacket>(&mut Cursor::new(
raw_packet,
)) {
Ok(packet) => packet,
Err(err) => {
error!("failed to read packet: {err:?}");
debug!("packet bytes: {raw_packet:?}");
continue;
}
};
packet_events.send(ConfigurationEvent {
entity: player_entity,
packet,
});
}
// clear the packets right after we read them
packets.clear();
}
}
}
pub fn process_packet_events(ecs: &mut World) {
let mut events_owned = Vec::new();
let mut system_state: SystemState<EventReader<ConfigurationEvent>> = SystemState::new(ecs);
let mut events = system_state.get_mut(ecs);
for ConfigurationEvent {
entity: player_entity,
packet,
} in events.read()
{
// we do this so `ecs` isn't borrowed for the whole loop
events_owned.push((*player_entity, packet.clone()));
}
for (player_entity, packet) in events_owned {
match packet {
ClientboundConfigPacket::RegistryData(p) => {
let mut system_state: SystemState<Query<&mut InstanceHolder>> =
SystemState::new(ecs);
let mut query = system_state.get_mut(ecs);
let instance_holder = query.get_mut(player_entity).unwrap();
let mut instance = instance_holder.instance.write();
// add the new registry data
instance.registries.append(p.registry_id, p.entries);
}
ClientboundConfigPacket::CustomPayload(p) => {
debug!("Got custom payload packet {p:?}");
}
ClientboundConfigPacket::Disconnect(p) => {
warn!("Got disconnect packet {p:?}");
let mut system_state: SystemState<EventWriter<DisconnectEvent>> =
SystemState::new(ecs);
let mut disconnect_events = system_state.get_mut(ecs);
disconnect_events.send(DisconnectEvent {
entity: player_entity,
reason: Some(p.reason.clone()),
});
}
ClientboundConfigPacket::FinishConfiguration(p) => {
debug!("got FinishConfiguration packet: {p:?}");
let mut system_state: SystemState<Query<&mut RawConnection>> =
SystemState::new(ecs);
let mut query = system_state.get_mut(ecs);
let mut raw_conn = query.get_mut(player_entity).unwrap();
raw_conn
.write_packet(ServerboundFinishConfiguration)
.expect(
"we should be in the right state and encoding this packet shouldn't fail",
);
raw_conn.set_state(ConnectionProtocol::Game);
// these components are added now that we're going to be in the Game state
ecs.entity_mut(player_entity)
.remove::<InConfigState>()
.insert(crate::JoinedClientBundle {
physics_state: crate::PhysicsState::default(),
inventory: crate::inventory::Inventory::default(),
tab_list: crate::local_player::TabList::default(),
current_sequence_number: crate::interact::CurrentSequenceNumber::default(),
last_sent_direction: crate::movement::LastSentLookDirection::default(),
abilities: crate::local_player::PlayerAbilities::default(),
permission_level: crate::local_player::PermissionLevel::default(),
hunger: Hunger::default(),
chunk_batch_info: crate::chunks::ChunkBatchInfo::default(),
entity_id_index: EntityIdIndex::default(),
mining: crate::mining::MineBundle::default(),
attack: crate::attack::AttackBundle::default(),
_local_entity: azalea_entity::LocalEntity,
});
}
ClientboundConfigPacket::KeepAlive(p) => {
debug!("Got keep alive packet (in configuration) {p:?} for {player_entity:?}");
let mut system_state: SystemState<(
Query<&RawConnection>,
EventWriter<KeepAliveEvent>,
)> = SystemState::new(ecs);
let (query, mut keepalive_events) = system_state.get_mut(ecs);
let raw_conn = query.get(player_entity).unwrap();
keepalive_events.send(KeepAliveEvent {
entity: player_entity,
id: p.id,
});
raw_conn
.write_packet(ServerboundKeepAlive { id: p.id })
.unwrap();
}
ClientboundConfigPacket::Ping(p) => {
debug!("Got ping packet {p:?}");
let mut system_state: SystemState<Query<&RawConnection>> = SystemState::new(ecs);
let mut query = system_state.get_mut(ecs);
let raw_conn = query.get_mut(player_entity).unwrap();
raw_conn
.write_packet(config::s_pong::ServerboundPong { id: p.id })
.unwrap();
}
ClientboundConfigPacket::ResourcePackPush(p) => {
debug!("Got resource pack packet {p:?}");
let mut system_state: SystemState<Query<&RawConnection>> = SystemState::new(ecs);
let mut query = system_state.get_mut(ecs);
let raw_conn = query.get_mut(player_entity).unwrap();
// always accept resource pack
raw_conn
.write_packet(ServerboundResourcePack {
id: p.id,
action: config::s_resource_pack::Action::Accepted,
})
.unwrap();
}
ClientboundConfigPacket::ResourcePackPop(_) => {
// we can ignore this
}
ClientboundConfigPacket::UpdateEnabledFeatures(p) => {
debug!("Got update enabled features packet {p:?}");
}
ClientboundConfigPacket::UpdateTags(_p) => {
debug!("Got update tags packet");
}
ClientboundConfigPacket::CookieRequest(p) => {
debug!("Got cookie request packet {p:?}");
let mut system_state: SystemState<Query<&RawConnection>> = SystemState::new(ecs);
let mut query = system_state.get_mut(ecs);
let raw_conn = query.get_mut(player_entity).unwrap();
raw_conn
.write_packet(ServerboundCookieResponse {
key: p.key,
// cookies aren't implemented
payload: None,
})
.unwrap();
}
ClientboundConfigPacket::ResetChat(p) => {
debug!("Got reset chat packet {p:?}");
}
ClientboundConfigPacket::StoreCookie(p) => {
debug!("Got store cookie packet {p:?}");
}
ClientboundConfigPacket::Transfer(p) => {
debug!("Got transfer packet {p:?}");
}
ClientboundConfigPacket::SelectKnownPacks(p) => {
debug!("Got select known packs packet {p:?}");
let mut system_state: SystemState<Query<&RawConnection>> = SystemState::new(ecs);
let mut query = system_state.get_mut(ecs);
let raw_conn = query.get_mut(player_entity).unwrap();
// resource pack management isn't implemented
raw_conn
.write_packet(ServerboundSelectKnownPacks {
known_packs: vec![],
})
.unwrap();
}
ClientboundConfigPacket::ServerLinks(_) => {}
ClientboundConfigPacket::CustomReportDetails(_) => {}
}
}
}
/// An event for sending a packet to the server while we're in the
/// `configuration` state.
#[derive(Event)]
pub struct SendConfigurationEvent {
pub sent_by: Entity,
pub packet: ServerboundConfigPacket,
}
impl SendConfigurationEvent {
pub fn new(sent_by: Entity, packet: impl Packet<ServerboundConfigPacket>) -> Self {
let packet = packet.into_variant();
Self { sent_by, packet }
}
}
pub fn handle_send_packet_event(
mut send_packet_events: EventReader<SendConfigurationEvent>,
mut query: Query<(&mut RawConnection, Option<&InConfigState>)>,
) {
for event in send_packet_events.read() {
if let Ok((raw_conn, in_configuration_state)) = query.get_mut(event.sent_by) {
if in_configuration_state.is_none() {
error!(
"Tried to send a configuration packet {:?} while not in configuration state",
event.packet
);
continue;
}
debug!("Sending packet: {:?}", event.packet);
if let Err(e) = raw_conn.write_packet(event.packet.clone()) {
error!("Failed to send packet: {e}");
}
}
}
}

File diff suppressed because it is too large Load diff

View file

@ -1,114 +0,0 @@
// login packets aren't actually handled here because compression/encryption
// would make packet handling a lot messier
use std::{collections::HashSet, sync::Arc};
use azalea_protocol::packets::{
Packet,
login::{
ClientboundLoginPacket, ServerboundLoginPacket,
s_custom_query_answer::ServerboundCustomQueryAnswer,
},
};
use bevy_ecs::{prelude::*, system::SystemState};
use derive_more::{Deref, DerefMut};
use tokio::sync::mpsc;
use tracing::error;
// this struct is defined here anyways though so it's consistent with the other
// ones
/// An event that's sent when we receive a login packet from the server. Note
/// that if you want to handle this in a system, you must add
/// `.before(azalea::packet::login::process_packet_events)` to it
/// because that system clears the events.
#[derive(Event, Debug, Clone)]
pub struct LoginPacketEvent {
/// The client entity that received the packet.
pub entity: Entity,
/// The packet that was actually received.
pub packet: Arc<ClientboundLoginPacket>,
}
/// Event for sending a login packet to the server.
#[derive(Event)]
pub struct SendLoginPacketEvent {
pub entity: Entity,
pub packet: ServerboundLoginPacket,
}
impl SendLoginPacketEvent {
pub fn new(entity: Entity, packet: impl Packet<ServerboundLoginPacket>) -> Self {
let packet = packet.into_variant();
Self { entity, packet }
}
}
#[derive(Component)]
pub struct LoginSendPacketQueue {
pub tx: mpsc::UnboundedSender<ServerboundLoginPacket>,
}
/// A marker component for local players that are currently in the
/// `login` state.
#[derive(Component, Clone, Debug)]
pub struct InLoginState;
pub fn handle_send_packet_event(
mut send_packet_events: EventReader<SendLoginPacketEvent>,
mut query: Query<&mut LoginSendPacketQueue>,
) {
for event in send_packet_events.read() {
if let Ok(queue) = query.get_mut(event.entity) {
let _ = queue.tx.send(event.packet.clone());
} else {
error!("Sent SendPacketEvent for entity that doesn't have a LoginSendPacketQueue");
}
}
}
/// Plugins can add to this set if they want to handle a custom query packet
/// themselves. This component removed after the login state ends.
#[derive(Component, Default, Debug, Deref, DerefMut)]
pub struct IgnoreQueryIds(HashSet<u32>);
pub fn process_packet_events(ecs: &mut World) {
let mut events_owned = Vec::new();
let mut system_state: SystemState<ResMut<Events<LoginPacketEvent>>> = SystemState::new(ecs);
let mut events = system_state.get_mut(ecs);
for LoginPacketEvent {
entity: player_entity,
packet,
} in events.drain()
{
// we do this so `ecs` isn't borrowed for the whole loop
events_owned.push((player_entity, packet));
}
for (player_entity, packet) in events_owned {
#[allow(clippy::single_match)]
match packet.as_ref() {
ClientboundLoginPacket::CustomQuery(p) => {
let mut system_state: SystemState<(
EventWriter<SendLoginPacketEvent>,
Query<&IgnoreQueryIds>,
)> = SystemState::new(ecs);
let (mut send_packet_events, query) = system_state.get_mut(ecs);
let ignore_query_ids = query.get(player_entity).ok().map(|x| x.0.clone());
if let Some(ignore_query_ids) = ignore_query_ids {
if ignore_query_ids.contains(&p.transaction_id) {
continue;
}
}
send_packet_events.send(SendLoginPacketEvent::new(
player_entity,
ServerboundCustomQueryAnswer {
transaction_id: p.transaction_id,
data: None,
},
));
}
_ => {}
}
}
}