diff --git a/azalea-client/src/client.rs b/azalea-client/src/client.rs index 7a1c3ae0..7315f197 100644 --- a/azalea-client/src/client.rs +++ b/azalea-client/src/client.rs @@ -54,7 +54,10 @@ use parking_lot::{Mutex, RwLock}; use simdnbt::owned::NbtCompound; use thiserror::Error; use tokio::{ - sync::{broadcast, mpsc}, + sync::{ + broadcast, + mpsc::{self, error::TrySendError}, + }, time, }; use tracing::{debug, error}; @@ -115,7 +118,7 @@ pub struct Client { pub ecs: Arc>, /// Use this to force the client to run the schedule outside of a tick. - pub run_schedule_sender: mpsc::UnboundedSender<()>, + pub run_schedule_sender: mpsc::Sender<()>, } /// An error that happened while joining the server. @@ -145,7 +148,7 @@ pub struct StartClientOpts<'a> { pub address: &'a ServerAddress, pub resolved_address: &'a SocketAddr, pub proxy: Option, - pub run_schedule_sender: mpsc::UnboundedSender<()>, + pub run_schedule_sender: mpsc::Sender<()>, } impl<'a> StartClientOpts<'a> { @@ -155,7 +158,7 @@ impl<'a> StartClientOpts<'a> { resolved_address: &'a SocketAddr, ) -> StartClientOpts<'a> { // An event that causes the schedule to run. This is only used internally. - let (run_schedule_sender, run_schedule_receiver) = mpsc::unbounded_channel(); + let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1); let mut app = App::new(); app.add_plugins(DefaultPlugins); @@ -187,7 +190,7 @@ impl Client { profile: GameProfile, entity: Entity, ecs: Arc>, - run_schedule_sender: mpsc::UnboundedSender<()>, + run_schedule_sender: mpsc::Sender<()>, ) -> Self { Self { profile, @@ -841,8 +844,8 @@ impl Plugin for AzaleaPlugin { #[doc(hidden)] pub fn start_ecs_runner( mut app: App, - run_schedule_receiver: mpsc::UnboundedReceiver<()>, - run_schedule_sender: mpsc::UnboundedSender<()>, + run_schedule_receiver: mpsc::Receiver<()>, + run_schedule_sender: mpsc::Sender<()>, ) -> Arc> { // all resources should have been added by now so we can take the ecs from the // app @@ -861,13 +864,10 @@ pub fn start_ecs_runner( async fn run_schedule_loop( ecs: Arc>, outer_schedule_label: InternedScheduleLabel, - mut run_schedule_receiver: mpsc::UnboundedReceiver<()>, + mut run_schedule_receiver: mpsc::Receiver<()>, ) { let mut last_tick: Option = None; loop { - // get rid of any queued events - while let Ok(()) = run_schedule_receiver.try_recv() {} - // whenever we get an event from run_schedule_receiver, run the schedule run_schedule_receiver.recv().await; @@ -893,15 +893,15 @@ async fn run_schedule_loop( /// Send an event to run the schedule every 50 milliseconds. It will stop when /// the receiver is dropped. -pub async fn tick_run_schedule_loop(run_schedule_sender: mpsc::UnboundedSender<()>) { - let mut game_tick_interval = time::interval(time::Duration::from_millis(50)); +pub async fn tick_run_schedule_loop(run_schedule_sender: mpsc::Sender<()>) { + let mut game_tick_interval = time::interval(Duration::from_millis(50)); // TODO: Minecraft bursts up to 10 ticks and then skips, we should too game_tick_interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst); loop { game_tick_interval.tick().await; - if let Err(e) = run_schedule_sender.send(()) { - println!("tick_run_schedule_loop error: {e}"); + if let Err(TrySendError::Closed(())) = run_schedule_sender.try_send(()) { + error!("tick_run_schedule_loop failed because run_schedule_sender was closed"); // the sender is closed so end the task return; } diff --git a/azalea-client/src/plugins/chat/mod.rs b/azalea-client/src/plugins/chat/mod.rs index 66c77b56..aa5606eb 100644 --- a/azalea-client/src/plugins/chat/mod.rs +++ b/azalea-client/src/plugins/chat/mod.rs @@ -157,7 +157,7 @@ impl Client { content: message.to_string(), kind: ChatKind::Message, }); - self.run_schedule_sender.send(()).unwrap(); + let _ = self.run_schedule_sender.try_send(()); } /// Send a command packet to the server. The `command` argument should not @@ -171,7 +171,7 @@ impl Client { content: command.to_string(), kind: ChatKind::Command, }); - self.run_schedule_sender.send(()).unwrap(); + let _ = self.run_schedule_sender.try_send(()); } /// Send a message in chat. @@ -188,7 +188,7 @@ impl Client { entity: self.entity, content: content.to_string(), }); - self.run_schedule_sender.send(()).unwrap(); + let _ = self.run_schedule_sender.try_send(()); } } diff --git a/azalea-client/src/raw_connection.rs b/azalea-client/src/raw_connection.rs index 4a6587be..97e93f16 100644 --- a/azalea-client/src/raw_connection.rs +++ b/azalea-client/src/raw_connection.rs @@ -10,7 +10,10 @@ use azalea_protocol::{ use bevy_ecs::prelude::*; use parking_lot::Mutex; use thiserror::Error; -use tokio::sync::mpsc::{self, error::SendError}; +use tokio::sync::mpsc::{ + self, + error::{SendError, TrySendError}, +}; use tracing::error; /// A component for clients that can read and write packets to the server. This @@ -34,7 +37,7 @@ pub struct RawConnection { #[derive(Clone)] pub struct RawConnectionReader { pub incoming_packet_queue: Arc>>>, - pub run_schedule_sender: mpsc::UnboundedSender<()>, + pub run_schedule_sender: mpsc::Sender<()>, } #[derive(Clone)] pub struct RawConnectionWriter { @@ -60,7 +63,7 @@ pub enum WritePacketError { impl RawConnection { pub fn new( - run_schedule_sender: mpsc::UnboundedSender<()>, + run_schedule_sender: mpsc::Sender<()>, connection_protocol: ConnectionProtocol, raw_read_connection: RawReadConnection, raw_write_connection: RawWriteConnection, @@ -145,6 +148,9 @@ impl RawConnectionReader { let mut incoming_packet_queue = self.incoming_packet_queue.lock(); incoming_packet_queue.push(raw_packet); + // this makes it so packets received at the same time are guaranteed to be + // handled in the same tick. this is also an attempt at making it so we can't + // receive any packets in the ticks/updates after being disconnected. loop { let raw_packet = match read_conn.try_read() { Ok(p) => p, @@ -158,7 +164,7 @@ impl RawConnectionReader { } // tell the client to run all the systems - if self.run_schedule_sender.send(()).is_err() { + if self.run_schedule_sender.try_send(()) == Err(TrySendError::Closed(())) { // the client was dropped break; } diff --git a/azalea-client/src/test_simulation.rs b/azalea-client/src/test_simulation.rs index 27cf4a57..b9e9b8ec 100644 --- a/azalea-client/src/test_simulation.rs +++ b/azalea-client/src/test_simulation.rs @@ -111,7 +111,7 @@ fn create_local_player_bundle( tokio::runtime::Runtime, ) { // unused since we'll trigger ticks ourselves - let (run_schedule_sender, _run_schedule_receiver) = mpsc::unbounded_channel(); + let (run_schedule_sender, _run_schedule_receiver) = mpsc::channel(1); let (outgoing_packets_sender, mut outgoing_packets_receiver) = mpsc::unbounded_channel(); let incoming_packet_queue = Arc::new(Mutex::new(Vec::new())); diff --git a/azalea/src/swarm/mod.rs b/azalea/src/swarm/mod.rs index cc0d7eb1..c82bea82 100644 --- a/azalea/src/swarm/mod.rs +++ b/azalea/src/swarm/mod.rs @@ -45,7 +45,7 @@ pub struct Swarm { bots_tx: mpsc::UnboundedSender<(Option, Client)>, swarm_tx: mpsc::UnboundedSender, - run_schedule_sender: mpsc::UnboundedSender<()>, + run_schedule_sender: mpsc::Sender<()>, } /// Create a new [`Swarm`]. @@ -385,7 +385,7 @@ where swarm_tx.send(SwarmEvent::Init).unwrap(); - let (run_schedule_sender, run_schedule_receiver) = mpsc::unbounded_channel(); + let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1); let main_schedule_label = self.app.main().update_schedule.unwrap();