1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 06:16:04 +00:00

make run_schedule a bounded channel

This commit is contained in:
mat 2025-02-23 17:10:47 +00:00
parent 21acf4c846
commit 2be4f0f2b6
5 changed files with 31 additions and 25 deletions

View file

@ -54,7 +54,10 @@ use parking_lot::{Mutex, RwLock};
use simdnbt::owned::NbtCompound;
use thiserror::Error;
use tokio::{
sync::{broadcast, mpsc},
sync::{
broadcast,
mpsc::{self, error::TrySendError},
},
time,
};
use tracing::{debug, error};
@ -115,7 +118,7 @@ pub struct Client {
pub ecs: Arc<Mutex<World>>,
/// Use this to force the client to run the schedule outside of a tick.
pub run_schedule_sender: mpsc::UnboundedSender<()>,
pub run_schedule_sender: mpsc::Sender<()>,
}
/// An error that happened while joining the server.
@ -145,7 +148,7 @@ pub struct StartClientOpts<'a> {
pub address: &'a ServerAddress,
pub resolved_address: &'a SocketAddr,
pub proxy: Option<Proxy>,
pub run_schedule_sender: mpsc::UnboundedSender<()>,
pub run_schedule_sender: mpsc::Sender<()>,
}
impl<'a> StartClientOpts<'a> {
@ -155,7 +158,7 @@ impl<'a> StartClientOpts<'a> {
resolved_address: &'a SocketAddr,
) -> StartClientOpts<'a> {
// An event that causes the schedule to run. This is only used internally.
let (run_schedule_sender, run_schedule_receiver) = mpsc::unbounded_channel();
let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
let mut app = App::new();
app.add_plugins(DefaultPlugins);
@ -187,7 +190,7 @@ impl Client {
profile: GameProfile,
entity: Entity,
ecs: Arc<Mutex<World>>,
run_schedule_sender: mpsc::UnboundedSender<()>,
run_schedule_sender: mpsc::Sender<()>,
) -> Self {
Self {
profile,
@ -841,8 +844,8 @@ impl Plugin for AzaleaPlugin {
#[doc(hidden)]
pub fn start_ecs_runner(
mut app: App,
run_schedule_receiver: mpsc::UnboundedReceiver<()>,
run_schedule_sender: mpsc::UnboundedSender<()>,
run_schedule_receiver: mpsc::Receiver<()>,
run_schedule_sender: mpsc::Sender<()>,
) -> Arc<Mutex<World>> {
// all resources should have been added by now so we can take the ecs from the
// app
@ -861,13 +864,10 @@ pub fn start_ecs_runner(
async fn run_schedule_loop(
ecs: Arc<Mutex<World>>,
outer_schedule_label: InternedScheduleLabel,
mut run_schedule_receiver: mpsc::UnboundedReceiver<()>,
mut run_schedule_receiver: mpsc::Receiver<()>,
) {
let mut last_tick: Option<Instant> = None;
loop {
// get rid of any queued events
while let Ok(()) = run_schedule_receiver.try_recv() {}
// whenever we get an event from run_schedule_receiver, run the schedule
run_schedule_receiver.recv().await;
@ -893,15 +893,15 @@ async fn run_schedule_loop(
/// Send an event to run the schedule every 50 milliseconds. It will stop when
/// the receiver is dropped.
pub async fn tick_run_schedule_loop(run_schedule_sender: mpsc::UnboundedSender<()>) {
let mut game_tick_interval = time::interval(time::Duration::from_millis(50));
pub async fn tick_run_schedule_loop(run_schedule_sender: mpsc::Sender<()>) {
let mut game_tick_interval = time::interval(Duration::from_millis(50));
// TODO: Minecraft bursts up to 10 ticks and then skips, we should too
game_tick_interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst);
loop {
game_tick_interval.tick().await;
if let Err(e) = run_schedule_sender.send(()) {
println!("tick_run_schedule_loop error: {e}");
if let Err(TrySendError::Closed(())) = run_schedule_sender.try_send(()) {
error!("tick_run_schedule_loop failed because run_schedule_sender was closed");
// the sender is closed so end the task
return;
}

View file

@ -157,7 +157,7 @@ impl Client {
content: message.to_string(),
kind: ChatKind::Message,
});
self.run_schedule_sender.send(()).unwrap();
let _ = self.run_schedule_sender.try_send(());
}
/// Send a command packet to the server. The `command` argument should not
@ -171,7 +171,7 @@ impl Client {
content: command.to_string(),
kind: ChatKind::Command,
});
self.run_schedule_sender.send(()).unwrap();
let _ = self.run_schedule_sender.try_send(());
}
/// Send a message in chat.
@ -188,7 +188,7 @@ impl Client {
entity: self.entity,
content: content.to_string(),
});
self.run_schedule_sender.send(()).unwrap();
let _ = self.run_schedule_sender.try_send(());
}
}

View file

@ -10,7 +10,10 @@ use azalea_protocol::{
use bevy_ecs::prelude::*;
use parking_lot::Mutex;
use thiserror::Error;
use tokio::sync::mpsc::{self, error::SendError};
use tokio::sync::mpsc::{
self,
error::{SendError, TrySendError},
};
use tracing::error;
/// A component for clients that can read and write packets to the server. This
@ -34,7 +37,7 @@ pub struct RawConnection {
#[derive(Clone)]
pub struct RawConnectionReader {
pub incoming_packet_queue: Arc<Mutex<Vec<Box<[u8]>>>>,
pub run_schedule_sender: mpsc::UnboundedSender<()>,
pub run_schedule_sender: mpsc::Sender<()>,
}
#[derive(Clone)]
pub struct RawConnectionWriter {
@ -60,7 +63,7 @@ pub enum WritePacketError {
impl RawConnection {
pub fn new(
run_schedule_sender: mpsc::UnboundedSender<()>,
run_schedule_sender: mpsc::Sender<()>,
connection_protocol: ConnectionProtocol,
raw_read_connection: RawReadConnection,
raw_write_connection: RawWriteConnection,
@ -145,6 +148,9 @@ impl RawConnectionReader {
let mut incoming_packet_queue = self.incoming_packet_queue.lock();
incoming_packet_queue.push(raw_packet);
// this makes it so packets received at the same time are guaranteed to be
// handled in the same tick. this is also an attempt at making it so we can't
// receive any packets in the ticks/updates after being disconnected.
loop {
let raw_packet = match read_conn.try_read() {
Ok(p) => p,
@ -158,7 +164,7 @@ impl RawConnectionReader {
}
// tell the client to run all the systems
if self.run_schedule_sender.send(()).is_err() {
if self.run_schedule_sender.try_send(()) == Err(TrySendError::Closed(())) {
// the client was dropped
break;
}

View file

@ -111,7 +111,7 @@ fn create_local_player_bundle(
tokio::runtime::Runtime,
) {
// unused since we'll trigger ticks ourselves
let (run_schedule_sender, _run_schedule_receiver) = mpsc::unbounded_channel();
let (run_schedule_sender, _run_schedule_receiver) = mpsc::channel(1);
let (outgoing_packets_sender, mut outgoing_packets_receiver) = mpsc::unbounded_channel();
let incoming_packet_queue = Arc::new(Mutex::new(Vec::new()));

View file

@ -45,7 +45,7 @@ pub struct Swarm {
bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
run_schedule_sender: mpsc::UnboundedSender<()>,
run_schedule_sender: mpsc::Sender<()>,
}
/// Create a new [`Swarm`].
@ -385,7 +385,7 @@ where
swarm_tx.send(SwarmEvent::Init).unwrap();
let (run_schedule_sender, run_schedule_receiver) = mpsc::unbounded_channel();
let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
let main_schedule_label = self.app.main().update_schedule.unwrap();