mirror of
https://github.com/mat-1/azalea.git
synced 2025-08-02 06:16:04 +00:00
continue switching swarm to builder and fix stuff
This commit is contained in:
parent
228e53adad
commit
f8230e40af
8 changed files with 523 additions and 331 deletions
|
@ -1,12 +1,11 @@
|
|||
pub use crate::chat::ChatPacket;
|
||||
use crate::{
|
||||
events::{Event, LocalPlayerEvents},
|
||||
local_player::{
|
||||
death_event, send_tick_event, update_in_loaded_chunk, GameProfileComponent, LocalPlayer,
|
||||
PhysicsState,
|
||||
death_event, update_in_loaded_chunk, GameProfileComponent, LocalPlayer, PhysicsState,
|
||||
},
|
||||
movement::{local_player_ai_step, send_position, sprint_listener, walk_listener},
|
||||
packet_handling::{self, PacketHandlerPlugin},
|
||||
plugins::PluginStates,
|
||||
Account, PlayerInfo, StartSprintEvent, StartWalkEvent,
|
||||
};
|
||||
|
||||
|
@ -38,53 +37,17 @@ use azalea_world::{
|
|||
entity::Entity, EntityInfos, EntityPlugin, Local, PartialWorld, World, WorldContainer,
|
||||
};
|
||||
use bevy_app::App;
|
||||
use bevy_ecs::{
|
||||
query::{ReadOnlyWorldQuery, WorldQuery},
|
||||
schedule::{IntoSystemDescriptor, Schedule, Stage, SystemSet},
|
||||
world::EntityRef,
|
||||
};
|
||||
use bevy_ecs::schedule::{IntoSystemDescriptor, Schedule, Stage, SystemSet};
|
||||
use bevy_time::TimePlugin;
|
||||
use iyes_loopless::prelude::*;
|
||||
use log::{debug, error};
|
||||
use parking_lot::{Mutex, MutexGuard, RwLock};
|
||||
use std::{fmt::Debug, io, net::SocketAddr, ops::DerefMut, sync::Arc, time::Duration};
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
use std::{fmt::Debug, io, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use thiserror::Error;
|
||||
use tokio::{sync::mpsc, time};
|
||||
|
||||
pub type ClientInformation = ServerboundClientInformationPacket;
|
||||
|
||||
/// Something that happened in-game, such as a tick passing or chat message
|
||||
/// being sent.
|
||||
///
|
||||
/// Note: Events are sent before they're processed, so for example game ticks
|
||||
/// happen at the beginning of a tick before anything has happened.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Event {
|
||||
/// Happens right after the bot switches into the Game state, but before
|
||||
/// it's actually spawned. This can be useful for setting the client
|
||||
/// information with `Client::set_client_information`, so the packet
|
||||
/// doesn't have to be sent twice.
|
||||
Init,
|
||||
/// The client is now in the world. Fired when we receive a login packet.
|
||||
Login,
|
||||
/// A chat message was sent in the game chat.
|
||||
Chat(ChatPacket),
|
||||
/// Happens 20 times per second, but only when the world is loaded.
|
||||
Tick,
|
||||
Packet(Arc<ClientboundGamePacket>),
|
||||
/// A player joined the game (or more specifically, was added to the tab
|
||||
/// list).
|
||||
AddPlayer(PlayerInfo),
|
||||
/// A player left the game (or maybe is still in the game and was just
|
||||
/// removed from the tab list).
|
||||
RemovePlayer(PlayerInfo),
|
||||
/// A player was updated in the tab list (gamemode, display
|
||||
/// name, or latency changed).
|
||||
UpdatePlayer(PlayerInfo),
|
||||
/// The client player died in-game.
|
||||
Death(Option<Arc<ClientboundPlayerCombatKillPacket>>),
|
||||
}
|
||||
|
||||
/// Client has the things that a user interacting with the library will want.
|
||||
/// Things that a player in the world will want to know are in [`LocalPlayer`].
|
||||
#[derive(Clone)]
|
||||
|
@ -101,11 +64,6 @@ pub struct Client {
|
|||
/// The world that this client is in.
|
||||
pub world: Arc<RwLock<PartialWorld>>,
|
||||
|
||||
/// Plugins are a way for other crates to add custom functionality to the
|
||||
/// client and keep state. If you're not making a plugin and you're using
|
||||
/// the `azalea` crate. you can ignore this field.
|
||||
pub plugins: Arc<PluginStates>,
|
||||
|
||||
/// The entity component system. You probably don't need to access this
|
||||
/// directly. Note that if you're using a shared world (i.e. a swarm), this
|
||||
/// will contain all entities in all worlds.
|
||||
|
@ -147,9 +105,6 @@ impl Client {
|
|||
// default our id to 0, it'll be set later
|
||||
entity,
|
||||
world: Arc::new(RwLock::new(PartialWorld::default())),
|
||||
// The plugins can be modified by the user by replacing the plugins
|
||||
// field right after this. No Mutex so the user doesn't need to .lock().
|
||||
plugins: Arc::new(PluginStates::default()),
|
||||
|
||||
ecs,
|
||||
}
|
||||
|
@ -184,12 +139,8 @@ impl Client {
|
|||
|
||||
// An event that causes the schedule to run. This is only used internally.
|
||||
let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
|
||||
let (ecs_lock, _app) = start_ecs(run_schedule_receiver, run_schedule_sender.clone());
|
||||
|
||||
{
|
||||
let mut ecs = ecs_lock.lock();
|
||||
ecs.init_resource::<EntityInfos>();
|
||||
}
|
||||
let app = init_ecs_app();
|
||||
let ecs_lock = start_ecs(app, run_schedule_receiver, run_schedule_sender.clone());
|
||||
|
||||
Self::start_client(
|
||||
ecs_lock,
|
||||
|
@ -237,7 +188,6 @@ impl Client {
|
|||
// default to an empty world, it'll be set correctly later when we
|
||||
// get the login packet
|
||||
Arc::new(RwLock::new(World::default())),
|
||||
tx,
|
||||
);
|
||||
|
||||
// start receiving packets
|
||||
|
@ -261,6 +211,7 @@ impl Client {
|
|||
GameProfileComponent(game_profile),
|
||||
PhysicsState::default(),
|
||||
Local,
|
||||
LocalPlayerEvents(tx),
|
||||
));
|
||||
|
||||
// just start up the game loop and we're ready!
|
||||
|
@ -484,11 +435,16 @@ impl Client {
|
|||
}
|
||||
}
|
||||
|
||||
/// Create the [`App`]. This won't actually run anything yet.
|
||||
///
|
||||
/// Note that you usually only need this if you're creating a client manually,
|
||||
/// otherwise use [`Client::join`].
|
||||
///
|
||||
/// Use [`start_ecs`] to actually start running the app and then
|
||||
/// [`Client::start_client`] to add a client to the ECS and make it join a
|
||||
/// server.
|
||||
#[doc(hidden)]
|
||||
pub fn start_ecs(
|
||||
run_schedule_receiver: mpsc::Receiver<()>,
|
||||
run_schedule_sender: mpsc::Sender<()>,
|
||||
) -> (Arc<Mutex<bevy_ecs::world::World>>, App) {
|
||||
pub fn init_ecs_app() -> App {
|
||||
// if you get an error right here that means you're doing something with locks
|
||||
// wrong read the error to see where the issue is
|
||||
// you might be able to just drop the lock or put it in its own scope to fix
|
||||
|
@ -511,8 +467,7 @@ pub fn start_ecs(
|
|||
local_player_ai_step
|
||||
.before("ai_step")
|
||||
.after("sprint_listener"),
|
||||
)
|
||||
.with_system(send_tick_event),
|
||||
),
|
||||
);
|
||||
|
||||
// fire the Death event when the player dies.
|
||||
|
@ -523,7 +478,19 @@ pub fn start_ecs(
|
|||
app.add_plugin(TimePlugin); // from bevy_time
|
||||
|
||||
app.init_resource::<WorldContainer>();
|
||||
app.init_resource::<EntityInfos>();
|
||||
|
||||
app
|
||||
}
|
||||
|
||||
/// Start running the ECS loop! You must create your `App` from [`init_ecs_app`]
|
||||
/// first.
|
||||
#[doc(hidden)]
|
||||
pub fn start_ecs(
|
||||
app: App,
|
||||
run_schedule_receiver: mpsc::Receiver<()>,
|
||||
run_schedule_sender: mpsc::Sender<()>,
|
||||
) -> Arc<Mutex<bevy_ecs::world::World>> {
|
||||
// all resources should have been added by now so we can take the ecs from the
|
||||
// app
|
||||
let ecs = Arc::new(Mutex::new(app.world));
|
||||
|
@ -535,7 +502,7 @@ pub fn start_ecs(
|
|||
));
|
||||
tokio::spawn(tick_run_schedule_loop(run_schedule_sender));
|
||||
|
||||
(ecs, app)
|
||||
ecs
|
||||
}
|
||||
|
||||
async fn run_schedule_loop(
|
||||
|
|
189
azalea-client/src/events.rs
Normal file
189
azalea-client/src/events.rs
Normal file
|
@ -0,0 +1,189 @@
|
|||
//! Defines the [`Event`] enum and makes those events trigger when they're sent
|
||||
//! in the ECS.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use azalea_protocol::packets::game::{
|
||||
clientbound_player_combat_kill_packet::ClientboundPlayerCombatKillPacket, ClientboundGamePacket,
|
||||
};
|
||||
use azalea_world::entity::{Entity, MinecraftEntityId};
|
||||
use bevy_app::Plugin;
|
||||
use bevy_ecs::{
|
||||
prelude::{Component, EventReader},
|
||||
query::{Added, Changed},
|
||||
system::Query,
|
||||
};
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use futures::SinkExt;
|
||||
use iyes_loopless::prelude::*;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::{
|
||||
packet_handling::{
|
||||
AddPlayerEvent, ChatReceivedEvent, DeathEvent, PacketReceiver, RemovePlayerEvent,
|
||||
UpdatePlayerEvent,
|
||||
},
|
||||
ChatPacket, PlayerInfo,
|
||||
};
|
||||
|
||||
// (for contributors):
|
||||
// HOW TO ADD A NEW (packet based) EVENT:
|
||||
// - make a struct that contains an entity field and a data field (look in
|
||||
// packet_handling.rs for examples, also you should end the struct name with
|
||||
// "Event")
|
||||
// - the entity field is the local player entity that's receiving the event
|
||||
// - in packet_handling, you always have a variable called player_entity that
|
||||
// you can use
|
||||
// - add the event struct in the `impl Plugin for PacketHandlerPlugin`
|
||||
// - to get the event writer, you have to get an
|
||||
// EventWriter<SomethingHappenedEvent> from the SystemState (the convention is
|
||||
// to end your variable with the word "events", like "something_events")
|
||||
//
|
||||
// - then here in this file, add it to the Event enum
|
||||
// - and make an event listener system/function like the other ones and put the
|
||||
// function in the `impl Plugin for EventPlugin`
|
||||
|
||||
/// Something that happened in-game, such as a tick passing or chat message
|
||||
/// being sent.
|
||||
///
|
||||
/// Note: Events are sent before they're processed, so for example game ticks
|
||||
/// happen at the beginning of a tick before anything has happened.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Event {
|
||||
/// Happens right after the bot switches into the Game state, but before
|
||||
/// it's actually spawned. This can be useful for setting the client
|
||||
/// information with `Client::set_client_information`, so the packet
|
||||
/// doesn't have to be sent twice.
|
||||
Init,
|
||||
/// The client is now in the world. Fired when we receive a login packet.
|
||||
Login,
|
||||
/// A chat message was sent in the game chat.
|
||||
Chat(ChatPacket),
|
||||
/// Happens 20 times per second, but only when the world is loaded.
|
||||
Tick,
|
||||
Packet(Arc<ClientboundGamePacket>),
|
||||
/// A player joined the game (or more specifically, was added to the tab
|
||||
/// list).
|
||||
AddPlayer(PlayerInfo),
|
||||
/// A player left the game (or maybe is still in the game and was just
|
||||
/// removed from the tab list).
|
||||
RemovePlayer(PlayerInfo),
|
||||
/// A player was updated in the tab list (gamemode, display
|
||||
/// name, or latency changed).
|
||||
UpdatePlayer(PlayerInfo),
|
||||
/// The client player died in-game.
|
||||
Death(Option<Arc<ClientboundPlayerCombatKillPacket>>),
|
||||
}
|
||||
|
||||
/// A component that contains an event sender for events that are only
|
||||
/// received by local players. The receiver for this is returned by
|
||||
/// [`Client::start_client`].
|
||||
///
|
||||
/// [`Client::start_client`]: crate::Client::start_client
|
||||
#[derive(Component, Deref, DerefMut)]
|
||||
pub struct LocalPlayerEvents(pub mpsc::UnboundedSender<Event>);
|
||||
|
||||
pub struct EventPlugin;
|
||||
impl Plugin for EventPlugin {
|
||||
fn build(&self, app: &mut bevy_app::App) {
|
||||
app.add_system(chat_listener)
|
||||
.add_system(login_listener)
|
||||
.add_system(init_listener)
|
||||
.add_system(packet_listener)
|
||||
.add_system(add_player_listener)
|
||||
.add_system(update_player_listener)
|
||||
.add_system(remove_player_listener)
|
||||
.add_system(death_listener)
|
||||
.add_fixed_timestep_system("tick", 0, tick_listener);
|
||||
}
|
||||
}
|
||||
|
||||
// when LocalPlayerEvents is added, it means the client just started
|
||||
fn init_listener(query: Query<&LocalPlayerEvents, Added<LocalPlayerEvents>>) {
|
||||
for local_player_events in &query {
|
||||
local_player_events.send(Event::Init).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// when MinecraftEntityId is added, it means the player is now in the world
|
||||
fn login_listener(query: Query<&LocalPlayerEvents, Added<MinecraftEntityId>>) {
|
||||
for local_player_events in &query {
|
||||
local_player_events.send(Event::Login).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn chat_listener(query: Query<&LocalPlayerEvents>, mut events: EventReader<ChatReceivedEvent>) {
|
||||
for event in events.iter() {
|
||||
let local_player_events = query
|
||||
.get(event.entity)
|
||||
.expect("Non-localplayer entities shouldn't be able to receive chat events");
|
||||
local_player_events
|
||||
.send(Event::Chat(event.packet.clone()))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn tick_listener(query: Query<&LocalPlayerEvents>) {
|
||||
for local_player_events in &query {
|
||||
local_player_events.send(Event::Tick).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn packet_listener(query: Query<(&LocalPlayerEvents, &PacketReceiver), Changed<PacketReceiver>>) {
|
||||
for (local_player_events, packet_receiver) in &query {
|
||||
for packet in packet_receiver.packets.lock().iter() {
|
||||
local_player_events
|
||||
.send(Event::Packet(packet.clone().into()))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn add_player_listener(query: Query<&LocalPlayerEvents>, mut events: EventReader<AddPlayerEvent>) {
|
||||
for event in events.iter() {
|
||||
let local_player_events = query
|
||||
.get(event.entity)
|
||||
.expect("Non-localplayer entities shouldn't be able to receive add player events");
|
||||
local_player_events
|
||||
.send(Event::AddPlayer(event.info.clone()))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn update_player_listener(
|
||||
query: Query<&LocalPlayerEvents>,
|
||||
mut events: EventReader<UpdatePlayerEvent>,
|
||||
) {
|
||||
for event in events.iter() {
|
||||
let local_player_events = query
|
||||
.get(event.entity)
|
||||
.expect("Non-localplayer entities shouldn't be able to receive add player events");
|
||||
local_player_events
|
||||
.send(Event::UpdatePlayer(event.info.clone()))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_player_listener(
|
||||
query: Query<&LocalPlayerEvents>,
|
||||
mut events: EventReader<RemovePlayerEvent>,
|
||||
) {
|
||||
for event in events.iter() {
|
||||
let local_player_events = query
|
||||
.get(event.entity)
|
||||
.expect("Non-localplayer entities shouldn't be able to receive add player events");
|
||||
local_player_events
|
||||
.send(Event::RemovePlayer(event.info.clone()))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
fn death_listener(query: Query<&LocalPlayerEvents>, mut events: EventReader<DeathEvent>) {
|
||||
for event in events.iter() {
|
||||
if let Ok(local_player_events) = query.get(event.entity) {
|
||||
local_player_events
|
||||
.send(Event::Death(event.packet.clone().map(|p| p.into())))
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,6 +15,7 @@ mod account;
|
|||
mod chat;
|
||||
mod client;
|
||||
mod entity_query;
|
||||
mod events;
|
||||
mod get_mc_dir;
|
||||
mod local_player;
|
||||
mod movement;
|
||||
|
@ -24,7 +25,8 @@ mod player;
|
|||
|
||||
pub use account::Account;
|
||||
pub use bevy_ecs as ecs;
|
||||
pub use client::{start_ecs, ChatPacket, Client, ClientInformation, Event, JoinError};
|
||||
pub use client::{init_ecs_app, start_ecs, ChatPacket, Client, ClientInformation, JoinError};
|
||||
pub use events::Event;
|
||||
pub use local_player::{GameProfileComponent, LocalPlayer};
|
||||
pub use movement::{SprintDirection, StartSprintEvent, StartWalkEvent, WalkDirection};
|
||||
pub use player::PlayerInfo;
|
||||
|
|
|
@ -5,17 +5,16 @@ use azalea_core::{ChunkPos, ResourceLocation};
|
|||
use azalea_protocol::packets::game::ServerboundGamePacket;
|
||||
use azalea_world::{
|
||||
entity::{self, Dead, Entity},
|
||||
EntityInfos, PartialWorld, World,
|
||||
PartialWorld, World,
|
||||
};
|
||||
use bevy_ecs::{component::Component, query::Added, system::Query};
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use log::warn;
|
||||
use parking_lot::RwLock;
|
||||
use thiserror::Error;
|
||||
use tokio::{sync::mpsc, task::JoinHandle};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{ClientInformation, Event, PlayerInfo, WalkDirection};
|
||||
use crate::{events::{Event, LocalPlayerEvents}, ClientInformation, PlayerInfo, WalkDirection};
|
||||
|
||||
/// A player that you control that is currently in a Minecraft server.
|
||||
#[derive(Component)]
|
||||
|
@ -34,8 +33,6 @@ pub struct LocalPlayer {
|
|||
pub world: Arc<RwLock<World>>,
|
||||
pub world_name: Option<ResourceLocation>,
|
||||
|
||||
pub tx: mpsc::UnboundedSender<Event>,
|
||||
|
||||
/// A list of async tasks that are running and will stop running when this
|
||||
/// LocalPlayer is dropped or disconnected with [`Self::disconnect`]
|
||||
pub(crate) tasks: Vec<JoinHandle<()>>,
|
||||
|
@ -80,7 +77,6 @@ impl LocalPlayer {
|
|||
entity: Entity,
|
||||
packet_writer: mpsc::UnboundedSender<ServerboundGamePacket>,
|
||||
world: Arc<RwLock<World>>,
|
||||
tx: mpsc::UnboundedSender<Event>,
|
||||
) -> Self {
|
||||
let client_information = ClientInformation::default();
|
||||
|
||||
|
@ -97,8 +93,6 @@ impl LocalPlayer {
|
|||
))),
|
||||
world_name: None,
|
||||
|
||||
tx,
|
||||
|
||||
tasks: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
@ -121,12 +115,6 @@ impl LocalPlayer {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn send_tick_event(query: Query<&LocalPlayer>) {
|
||||
for local_player in &query {
|
||||
local_player.tx.send(Event::Tick).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
/// Update the [`LocalPlayerInLoadedChunk`] component for all [`LocalPlayer`]s.
|
||||
pub fn update_in_loaded_chunk(
|
||||
mut commands: bevy_ecs::system::Commands,
|
||||
|
@ -149,9 +137,9 @@ pub fn update_in_loaded_chunk(
|
|||
}
|
||||
|
||||
/// Send the "Death" event for [`LocalPlayer`]s that died with no reason.
|
||||
pub fn death_event(query: Query<&LocalPlayer, Added<Dead>>) {
|
||||
for local_player in &query {
|
||||
local_player.tx.send(Event::Death(None)).unwrap();
|
||||
pub fn death_event(query: Query<&LocalPlayerEvents, Added<Dead>>) {
|
||||
for local_player_events in &query {
|
||||
local_player_events.send(Event::Death(None)).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ use azalea_core::{ChunkPos, ResourceLocation, Vec3};
|
|||
use azalea_protocol::{
|
||||
connect::{ReadConnection, WriteConnection},
|
||||
packets::game::{
|
||||
clientbound_player_combat_kill_packet::ClientboundPlayerCombatKillPacket,
|
||||
serverbound_accept_teleportation_packet::ServerboundAcceptTeleportationPacket,
|
||||
serverbound_custom_payload_packet::ServerboundCustomPayloadPacket,
|
||||
serverbound_keep_alive_packet::ServerboundKeepAlivePacket,
|
||||
|
@ -22,10 +23,10 @@ use azalea_world::{
|
|||
use bevy_app::{App, Plugin};
|
||||
use bevy_ecs::{
|
||||
component::Component,
|
||||
prelude::Entity,
|
||||
prelude::{Entity, EventWriter},
|
||||
query::Changed,
|
||||
schedule::{IntoSystemDescriptor, SystemSet},
|
||||
system::{Commands, Query, Res, ResMut, SystemState},
|
||||
system::{Commands, Query, ResMut, SystemState},
|
||||
};
|
||||
use log::{debug, error, trace, warn};
|
||||
use parking_lot::Mutex;
|
||||
|
@ -47,19 +48,46 @@ impl Plugin for PacketHandlerPlugin {
|
|||
)
|
||||
.add_event::<AddPlayerEvent>()
|
||||
.add_event::<RemovePlayerEvent>()
|
||||
.add_event::<UpdatePlayerEvent>();
|
||||
.add_event::<UpdatePlayerEvent>()
|
||||
.add_event::<ChatReceivedEvent>();
|
||||
}
|
||||
}
|
||||
|
||||
/// A player joined the game (or more specifically, was added to the tab
|
||||
/// list).
|
||||
pub struct AddPlayerEvent(PlayerInfo);
|
||||
/// list of a local player).
|
||||
pub struct AddPlayerEvent {
|
||||
/// The local player entity that received this event.
|
||||
pub entity: Entity,
|
||||
pub info: PlayerInfo,
|
||||
}
|
||||
/// A player left the game (or maybe is still in the game and was just
|
||||
/// removed from the tab list).
|
||||
pub struct RemovePlayerEvent(PlayerInfo);
|
||||
/// A player was updated in the tab list (gamemode, display
|
||||
/// removed from the tab list of a local player).
|
||||
pub struct RemovePlayerEvent {
|
||||
/// The local player entity that received this event.
|
||||
pub entity: Entity,
|
||||
pub info: PlayerInfo,
|
||||
}
|
||||
/// A player was updated in the tab list of a local player (gamemode, display
|
||||
/// name, or latency changed).
|
||||
pub struct UpdatePlayerEvent(PlayerInfo);
|
||||
pub struct UpdatePlayerEvent {
|
||||
/// The local player entity that received this event.
|
||||
pub entity: Entity,
|
||||
pub info: PlayerInfo,
|
||||
}
|
||||
|
||||
/// A client received a chat message packet.
|
||||
pub struct ChatReceivedEvent {
|
||||
pub entity: Entity,
|
||||
pub packet: ChatPacket,
|
||||
}
|
||||
|
||||
/// Event for when an entity dies. dies. If it's a local player and there's a
|
||||
/// reason in the death screen, the [`ClientboundPlayerCombatKillPacket`] will
|
||||
/// be included.
|
||||
pub struct DeathEvent {
|
||||
pub entity: Entity,
|
||||
pub packet: Option<ClientboundPlayerCombatKillPacket>,
|
||||
}
|
||||
|
||||
/// Something that receives packets from the server.
|
||||
#[derive(Component, Clone)]
|
||||
|
@ -207,8 +235,6 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) {
|
|||
.get(),
|
||||
);
|
||||
|
||||
local_player.tx.send(Event::Login).unwrap();
|
||||
|
||||
system_state.apply(ecs);
|
||||
}
|
||||
ClientboundGamePacket::SetChunkCacheRadius(p) => {
|
||||
|
@ -338,15 +364,19 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) {
|
|||
ClientboundGamePacket::PlayerInfoUpdate(p) => {
|
||||
debug!("Got player info packet {:?}", p);
|
||||
|
||||
let mut system_state: SystemState<Query<&mut LocalPlayer>> =
|
||||
SystemState::new(ecs);
|
||||
let mut query = system_state.get_mut(ecs);
|
||||
let mut system_state: SystemState<(
|
||||
Query<&mut LocalPlayer>,
|
||||
EventWriter<AddPlayerEvent>,
|
||||
EventWriter<UpdatePlayerEvent>,
|
||||
)> = SystemState::new(ecs);
|
||||
let (mut query, mut add_player_events, mut update_player_events) =
|
||||
system_state.get_mut(ecs);
|
||||
let mut local_player = query.get_mut(player_entity).unwrap();
|
||||
|
||||
for updated_info in &p.entries {
|
||||
// add the new player maybe
|
||||
if p.actions.add_player {
|
||||
let player_info = PlayerInfo {
|
||||
let info = PlayerInfo {
|
||||
profile: updated_info.profile.clone(),
|
||||
uuid: updated_info.profile.uuid,
|
||||
gamemode: updated_info.game_mode,
|
||||
|
@ -355,8 +385,11 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) {
|
|||
};
|
||||
local_player
|
||||
.players
|
||||
.insert(updated_info.profile.uuid, player_info.clone());
|
||||
local_player.tx.send(Event::AddPlayer(player_info)).unwrap();
|
||||
.insert(updated_info.profile.uuid, info.clone());
|
||||
add_player_events.send(AddPlayerEvent {
|
||||
entity: player_entity,
|
||||
info: info.clone(),
|
||||
});
|
||||
} else if let Some(info) =
|
||||
local_player.players.get_mut(&updated_info.profile.uuid)
|
||||
{
|
||||
|
@ -371,8 +404,10 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) {
|
|||
if p.actions.update_display_name {
|
||||
info.display_name = updated_info.display_name.clone();
|
||||
}
|
||||
let info = info.clone();
|
||||
local_player.tx.send(Event::UpdatePlayer(info)).unwrap();
|
||||
update_player_events.send(UpdatePlayerEvent {
|
||||
entity: player_entity,
|
||||
info: info.clone(),
|
||||
});
|
||||
} else {
|
||||
warn!(
|
||||
"Ignoring PlayerInfoUpdate for unknown player {}",
|
||||
|
@ -382,14 +417,19 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) {
|
|||
}
|
||||
}
|
||||
ClientboundGamePacket::PlayerInfoRemove(p) => {
|
||||
let mut system_state: SystemState<Query<&mut LocalPlayer>> =
|
||||
SystemState::new(ecs);
|
||||
let mut query = system_state.get_mut(ecs);
|
||||
let mut system_state: SystemState<(
|
||||
Query<&mut LocalPlayer>,
|
||||
EventWriter<RemovePlayerEvent>,
|
||||
)> = SystemState::new(ecs);
|
||||
let (mut query, mut remove_player_events) = system_state.get_mut(ecs);
|
||||
let mut local_player = query.get_mut(player_entity).unwrap();
|
||||
|
||||
for uuid in &p.profile_ids {
|
||||
if let Some(info) = local_player.players.remove(uuid) {
|
||||
local_player.tx.send(Event::RemovePlayer(info)).unwrap();
|
||||
remove_player_events.send(RemovePlayerEvent {
|
||||
entity: player_entity,
|
||||
info,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -553,10 +593,20 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) {
|
|||
ClientboundGamePacket::SetHealth(p) => {
|
||||
debug!("Got set health packet {:?}", p);
|
||||
|
||||
let mut system_state: SystemState<Query<&mut Health>> = SystemState::new(ecs);
|
||||
let mut query = system_state.get_mut(ecs);
|
||||
let mut system_state: SystemState<(
|
||||
Query<&mut Health>,
|
||||
EventWriter<DeathEvent>,
|
||||
)> = SystemState::new(ecs);
|
||||
let (mut query, mut death_events) = system_state.get_mut(ecs);
|
||||
let mut health = query.get_mut(player_entity).unwrap();
|
||||
|
||||
if p.health == 0. && **health != 0. {
|
||||
death_events.send(DeathEvent {
|
||||
entity: player_entity,
|
||||
packet: None,
|
||||
});
|
||||
}
|
||||
|
||||
**health = p.health;
|
||||
|
||||
// the `Dead` component is added by the `update_dead` system
|
||||
|
@ -674,28 +724,32 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) {
|
|||
ClientboundGamePacket::PlayerChat(p) => {
|
||||
debug!("Got player chat packet {:?}", p);
|
||||
|
||||
let mut system_state: SystemState<Query<&mut LocalPlayer>> =
|
||||
SystemState::new(ecs);
|
||||
let mut query = system_state.get_mut(ecs);
|
||||
let mut system_state: SystemState<(
|
||||
Query<&mut LocalPlayer>,
|
||||
EventWriter<ChatReceivedEvent>,
|
||||
)> = SystemState::new(ecs);
|
||||
let (mut query, mut chat_events) = system_state.get_mut(ecs);
|
||||
let local_player = query.get_mut(player_entity).unwrap();
|
||||
|
||||
local_player
|
||||
.tx
|
||||
.send(Event::Chat(ChatPacket::Player(Arc::new(p.clone()))))
|
||||
.unwrap();
|
||||
chat_events.send(ChatReceivedEvent {
|
||||
entity: player_entity,
|
||||
packet: ChatPacket::Player(Arc::new(p.clone())),
|
||||
});
|
||||
}
|
||||
ClientboundGamePacket::SystemChat(p) => {
|
||||
debug!("Got system chat packet {:?}", p);
|
||||
|
||||
let mut system_state: SystemState<Query<&mut LocalPlayer>> =
|
||||
SystemState::new(ecs);
|
||||
let mut query = system_state.get_mut(ecs);
|
||||
let mut system_state: SystemState<(
|
||||
Query<&mut LocalPlayer>,
|
||||
EventWriter<ChatReceivedEvent>,
|
||||
)> = SystemState::new(ecs);
|
||||
let (mut query, mut chat_events) = system_state.get_mut(ecs);
|
||||
let local_player = query.get_mut(player_entity).unwrap();
|
||||
|
||||
local_player
|
||||
.tx
|
||||
.send(Event::Chat(ChatPacket::System(Arc::new(p.clone()))))
|
||||
.unwrap();
|
||||
chat_events.send(ChatReceivedEvent {
|
||||
entity: player_entity,
|
||||
packet: ChatPacket::System(Arc::new(p.clone())),
|
||||
});
|
||||
}
|
||||
ClientboundGamePacket::Sound(_p) => {
|
||||
// debug!("Got sound packet {:?}", p);
|
||||
|
@ -780,17 +834,17 @@ fn handle_packets(ecs: &mut bevy_ecs::world::World) {
|
|||
let mut system_state: SystemState<(
|
||||
Commands,
|
||||
Query<(&mut LocalPlayer, &MinecraftEntityId, Option<&Dead>)>,
|
||||
EventWriter<DeathEvent>,
|
||||
)> = SystemState::new(ecs);
|
||||
let (mut commands, mut query) = system_state.get_mut(ecs);
|
||||
let (mut commands, mut query, mut death_events) = system_state.get_mut(ecs);
|
||||
let (local_player, entity_id, dead) = query.get_mut(player_entity).unwrap();
|
||||
|
||||
if **entity_id == p.player_id && dead.is_none() {
|
||||
commands.entity(player_entity).insert(Dead);
|
||||
|
||||
local_player
|
||||
.tx
|
||||
.send(Event::Death(Some(Arc::new(p.clone()))))
|
||||
.unwrap();
|
||||
death_events.send(DeathEvent {
|
||||
entity: player_entity,
|
||||
packet: Some(p.clone()),
|
||||
});
|
||||
}
|
||||
|
||||
system_state.apply(ecs);
|
||||
|
|
|
@ -1,24 +1,20 @@
|
|||
use crate::{
|
||||
entity::{self, Entity, EntityUuid, MinecraftEntityId, Position, WorldName},
|
||||
entity::{Entity, EntityUuid, MinecraftEntityId, Position, WorldName},
|
||||
entity_info::LoadedBy,
|
||||
ChunkStorage, EntityInfos, PartialChunkStorage, PartialEntityInfos, WorldContainer,
|
||||
};
|
||||
use azalea_core::ChunkPos;
|
||||
use bevy_ecs::{
|
||||
component::Component,
|
||||
prelude::Bundle,
|
||||
query::{Changed, Without},
|
||||
query::Changed,
|
||||
system::{Commands, Query, Res, ResMut},
|
||||
};
|
||||
use derive_more::{Deref, DerefMut};
|
||||
use log::{debug, error, info, warn};
|
||||
use nohash_hasher::IntMap;
|
||||
use parking_lot::RwLock;
|
||||
use std::fmt::Formatter;
|
||||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
fmt::Debug,
|
||||
};
|
||||
use std::{fmt::Formatter, sync::Arc};
|
||||
|
||||
/// PartialWorlds are usually owned by clients, and hold strong references to
|
||||
/// chunks and entities in [`WeakWorld`]s.
|
||||
|
@ -53,16 +49,15 @@ impl PartialWorld {
|
|||
pub fn deduplicate_entities(
|
||||
mut commands: Commands,
|
||||
mut query: Query<
|
||||
(Entity, &MinecraftEntityId, &WorldName, &mut Position),
|
||||
(Entity, &MinecraftEntityId, &WorldName, &Position),
|
||||
Changed<MinecraftEntityId>,
|
||||
>,
|
||||
mut id_query: Query<&MinecraftEntityId>,
|
||||
id_query: Query<&MinecraftEntityId>,
|
||||
mut loaded_by_query: Query<&mut LoadedBy>,
|
||||
mut entity_infos: ResMut<EntityInfos>,
|
||||
mut world_container: ResMut<WorldContainer>,
|
||||
world_container: Res<WorldContainer>,
|
||||
) {
|
||||
// if this entity already exists, remove it
|
||||
for (entity, id, world_name, mut position) in query.iter_mut() {
|
||||
for (entity, id, world_name, position) in query.iter_mut() {
|
||||
let entity_chunk = ChunkPos::from(*position);
|
||||
if let Some(world_lock) = world_container.get(world_name) {
|
||||
let world = world_lock.write();
|
||||
|
|
|
@ -15,165 +15,168 @@
|
|||
|
||||
use crate::{Swarm, SwarmEvent};
|
||||
use async_trait::async_trait;
|
||||
use azalea_client::{ChatPacket, Client, Event};
|
||||
use azalea_client::{packet_handling::ChatReceivedEvent, ChatPacket, Client, Event, LocalPlayer};
|
||||
use bevy_ecs::{
|
||||
prelude::{Component, Entity, EventReader, EventWriter},
|
||||
query::{Added, Without},
|
||||
schedule::SystemSet,
|
||||
system::{Commands, Query, Res, ResMut, Resource},
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
use tokio::sync::broadcast::{Receiver, Sender};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Plugin {
|
||||
pub swarm_state: SwarmState,
|
||||
pub tx: Sender<ChatPacket>,
|
||||
}
|
||||
|
||||
impl crate::Plugin for Plugin {
|
||||
type State = State;
|
||||
|
||||
fn build(&self) -> State {
|
||||
State {
|
||||
chat_index: Arc::new(Mutex::new(0)),
|
||||
swarm_state: self.swarm_state.clone(),
|
||||
tx: self.tx.clone(),
|
||||
}
|
||||
pub struct Plugin;
|
||||
impl bevy_app::Plugin for Plugin {
|
||||
fn build(&self, app: &mut bevy_app::App) {
|
||||
app.add_system(chat_listener)
|
||||
.add_system(add_default_client_state)
|
||||
.add_system(update_min_index_and_shrink_queue)
|
||||
.insert_resource(GlobalChatState {
|
||||
chat_queue: VecDeque::new(),
|
||||
chat_min_index: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct State {
|
||||
pub chat_index: Arc<Mutex<usize>>,
|
||||
pub tx: Sender<ChatPacket>,
|
||||
pub swarm_state: SwarmState,
|
||||
/// Add a `ClientChatState` when a new client is added to the world.
|
||||
fn add_default_client_state(
|
||||
mut commands: Commands,
|
||||
query: Query<Entity, (Added<LocalPlayer>, Without<ClientChatState>)>,
|
||||
global_chat_state: Res<GlobalChatState>,
|
||||
) {
|
||||
for entity in query.iter() {
|
||||
commands.entity(entity).insert(ClientChatState {
|
||||
chat_index: global_chat_state.chat_min_index,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SwarmState {
|
||||
pub chat_queue: Arc<Mutex<VecDeque<ChatPacket>>>,
|
||||
pub chat_min_index: Arc<Mutex<usize>>,
|
||||
pub rx: Arc<tokio::sync::Mutex<Receiver<ChatPacket>>>,
|
||||
#[derive(Component)]
|
||||
pub struct ClientChatState {
|
||||
pub chat_index: usize,
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn handle_chat(&self, message: ChatPacket) {
|
||||
// When a bot receives a chat messages, it looks into the queue to find the
|
||||
// earliest instance of the message content that's after the bot's chat index.
|
||||
// If it finds it, then its personal index is simply updated. Otherwise, fire
|
||||
// the event and add to the queue.
|
||||
/// A chat message that no other bots have seen yet was received by a bot.
|
||||
pub struct NewChatMessageEvent(ChatPacket);
|
||||
|
||||
let mut chat_queue = self.swarm_state.chat_queue.lock();
|
||||
let chat_min_index = self.swarm_state.chat_min_index.lock();
|
||||
let mut chat_index = self.chat_index.lock();
|
||||
#[derive(Resource)]
|
||||
pub struct GlobalChatState {
|
||||
pub chat_queue: VecDeque<ChatPacket>,
|
||||
pub chat_min_index: usize,
|
||||
}
|
||||
|
||||
if *chat_min_index > *chat_index {
|
||||
// if this happens it's because this bot just logged in, so
|
||||
// ignore it and let another bot handle it
|
||||
println!("chat_min_index ({chat_min_index}) > chat_index ({chat_index})");
|
||||
*chat_index = *chat_min_index;
|
||||
return;
|
||||
}
|
||||
let actual_vec_index = *chat_index - *chat_min_index;
|
||||
fn chat_listener(
|
||||
query: Query<&ClientChatState>,
|
||||
events: EventReader<ChatReceivedEvent>,
|
||||
mut global_chat_state: ResMut<GlobalChatState>,
|
||||
new_chat_messages_events: EventWriter<NewChatMessageEvent>,
|
||||
) {
|
||||
for event in events.iter() {
|
||||
if let Ok(client_chat_state) = query.get(event.entity) {
|
||||
// When a bot receives a chat messages, it looks into the queue to find the
|
||||
// earliest instance of the message content that's after the bot's chat index.
|
||||
// If it finds it, then its personal index is simply updated. Otherwise, fire
|
||||
// the event and add to the queue.
|
||||
|
||||
// go through the queue and find the first message that's after the bot's index
|
||||
let mut found = false;
|
||||
for (i, past_message) in chat_queue.iter().enumerate().skip(actual_vec_index) {
|
||||
if past_message == &message {
|
||||
// found the message, update the index
|
||||
*chat_index = i + *chat_min_index + 1;
|
||||
found = true;
|
||||
break;
|
||||
if global_chat_state.chat_min_index > client_chat_state.chat_index {
|
||||
// if this happens it's because this bot just logged in, so
|
||||
// ignore it and let another bot handle it
|
||||
println!(
|
||||
"chat_min_index ({}) > chat_index ({})",
|
||||
global_chat_state.chat_min_index, client_chat_state.chat_index
|
||||
);
|
||||
client_chat_state.chat_index = global_chat_state.chat_min_index;
|
||||
return;
|
||||
}
|
||||
let actual_vec_index = client_chat_state.chat_index - global_chat_state.chat_min_index;
|
||||
|
||||
// go through the queue and find the first message that's after the bot's index
|
||||
let mut found = false;
|
||||
for (i, past_message) in global_chat_state
|
||||
.chat_queue
|
||||
.iter()
|
||||
.enumerate()
|
||||
.skip(actual_vec_index)
|
||||
{
|
||||
if past_message == &event.packet {
|
||||
// found the message, update the index
|
||||
client_chat_state.chat_index = i + global_chat_state.chat_min_index + 1;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
// didn't find the message, so fire the swarm event and add to the queue
|
||||
new_chat_messages_events.send(NewChatMessageEvent(event.packet.clone()));
|
||||
global_chat_state.chat_queue.push_back(event.packet);
|
||||
client_chat_state.chat_index =
|
||||
global_chat_state.chat_queue.len() + global_chat_state.chat_min_index;
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
// didn't find the message, so fire the swarm event and add to the queue
|
||||
self.tx
|
||||
.send(message.clone())
|
||||
.expect("failed to send chat message to swarm");
|
||||
chat_queue.push_back(message);
|
||||
*chat_index = chat_queue.len() + *chat_min_index;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl crate::PluginState for State {
|
||||
async fn handle(self: Box<Self>, event: Event, _bot: Client) {
|
||||
// we're allowed to access Plugin::swarm_state since it's shared for every bot
|
||||
if let Event::Chat(m) = event {
|
||||
self.handle_chat(m);
|
||||
}
|
||||
}
|
||||
}
|
||||
// impl GlobalChatState {
|
||||
// async fn start<S>(self, swarm: Swarm)
|
||||
// where
|
||||
// S: Send + Sync + Clone + 'static,
|
||||
// {
|
||||
// // it should never be locked unless we reused the same plugin for two
|
||||
// swarms // (bad)
|
||||
// let mut rx = self.rx.lock().await;
|
||||
// while let Ok(m) = rx.recv().await {
|
||||
// swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap();
|
||||
// let bot_states = swarm
|
||||
// .bot_datas
|
||||
// .lock()
|
||||
// .iter()
|
||||
// .map(|(bot, _)| {
|
||||
// bot.plugins
|
||||
// .get::<ClientChatState>()
|
||||
// .expect("Chat plugin not installed")
|
||||
// .clone()
|
||||
// })
|
||||
// .collect::<Vec<_>>();
|
||||
// self.handle_new_chat_message(&bot_states);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
impl SwarmState {
|
||||
pub fn new<S>(swarm: Swarm<S>) -> (Self, Sender<ChatPacket>)
|
||||
where
|
||||
S: Send + Sync + Clone + 'static,
|
||||
{
|
||||
let (tx, rx) = tokio::sync::broadcast::channel(1);
|
||||
|
||||
let swarm_state = SwarmState {
|
||||
chat_queue: Arc::new(Mutex::new(VecDeque::new())),
|
||||
chat_min_index: Arc::new(Mutex::new(0)),
|
||||
rx: Arc::new(tokio::sync::Mutex::new(rx)),
|
||||
};
|
||||
tokio::spawn(swarm_state.clone().start(swarm));
|
||||
|
||||
(swarm_state, tx)
|
||||
}
|
||||
async fn start<S>(self, swarm: Swarm<S>)
|
||||
where
|
||||
S: Send + Sync + Clone + 'static,
|
||||
{
|
||||
// it should never be locked unless we reused the same plugin for two swarms
|
||||
// (bad)
|
||||
let mut rx = self.rx.lock().await;
|
||||
while let Ok(m) = rx.recv().await {
|
||||
swarm.swarm_tx.send(SwarmEvent::Chat(m)).unwrap();
|
||||
let bot_states = swarm
|
||||
.bot_datas
|
||||
.lock()
|
||||
.iter()
|
||||
.map(|(bot, _)| {
|
||||
bot.plugins
|
||||
.get::<State>()
|
||||
.expect("Chat plugin not installed")
|
||||
.clone()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
self.handle_new_chat_message(&bot_states);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SwarmState {
|
||||
pub fn handle_new_chat_message(&self, bot_states: &[State]) {
|
||||
fn update_min_index_and_shrink_queue(
|
||||
query: Query<&ClientChatState>,
|
||||
mut global_chat_state: ResMut<GlobalChatState>,
|
||||
events: EventReader<NewChatMessageEvent>,
|
||||
) {
|
||||
for event in events.iter() {
|
||||
// To make sure the queue doesn't grow too large, we keep a `chat_min_index`
|
||||
// in Swarm that's set to the smallest index of all the bots, and we remove all
|
||||
// messages from the queue that are before that index.
|
||||
|
||||
let chat_min_index = *self.chat_min_index.lock();
|
||||
let mut new_chat_min_index = usize::MAX;
|
||||
for bot_state in bot_states {
|
||||
let this_chat_index = *bot_state.chat_index.lock();
|
||||
for client_chat_state in query.iter() {
|
||||
let this_chat_index = client_chat_state.chat_index;
|
||||
if this_chat_index < new_chat_min_index {
|
||||
new_chat_min_index = this_chat_index;
|
||||
}
|
||||
}
|
||||
|
||||
let mut chat_queue = self.chat_queue.lock();
|
||||
if chat_min_index > new_chat_min_index {
|
||||
if global_chat_state.chat_min_index > new_chat_min_index {
|
||||
println!(
|
||||
"chat_min_index ({chat_min_index}) > new_chat_min_index ({new_chat_min_index})"
|
||||
"chat_min_index ({chat_min_index}) > new_chat_min_index ({new_chat_min_index})",
|
||||
chat_min_index = global_chat_state.chat_min_index,
|
||||
);
|
||||
return;
|
||||
}
|
||||
// remove all messages from the queue that are before the min index
|
||||
for _ in 0..(new_chat_min_index - chat_min_index) {
|
||||
chat_queue.pop_front();
|
||||
for _ in 0..(new_chat_min_index - global_chat_state.chat_min_index) {
|
||||
global_chat_state.chat_queue.pop_front();
|
||||
}
|
||||
|
||||
// update the min index
|
||||
*self.chat_min_index.lock() = new_chat_min_index;
|
||||
self.chat_min_index = new_chat_min_index;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -184,18 +187,18 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_swarm_chat() {
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(1);
|
||||
let swarm_state = SwarmState {
|
||||
let swarm_state = GlobalChatState {
|
||||
chat_queue: Arc::new(Mutex::new(VecDeque::new())),
|
||||
chat_min_index: Arc::new(Mutex::new(0)),
|
||||
rx: Arc::new(tokio::sync::Mutex::new(rx)),
|
||||
};
|
||||
let mut bot_states = vec![];
|
||||
let bot0 = State {
|
||||
let bot0 = ClientChatState {
|
||||
swarm_state: swarm_state.clone(),
|
||||
chat_index: Arc::new(Mutex::new(0)),
|
||||
tx: tx.clone(),
|
||||
};
|
||||
let bot1 = State {
|
||||
let bot1 = ClientChatState {
|
||||
swarm_state: swarm_state.clone(),
|
||||
chat_index: Arc::new(Mutex::new(0)),
|
||||
tx: tx.clone(),
|
||||
|
@ -234,13 +237,13 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_new_bot() {
|
||||
let (tx, mut rx) = tokio::sync::broadcast::channel(1);
|
||||
let swarm_state = SwarmState {
|
||||
let swarm_state = GlobalChatState {
|
||||
chat_queue: Arc::new(Mutex::new(VecDeque::new())),
|
||||
chat_min_index: Arc::new(Mutex::new(0)),
|
||||
rx: Arc::new(tokio::sync::Mutex::new(rx)),
|
||||
};
|
||||
let mut bot_states = vec![];
|
||||
let bot0 = State {
|
||||
let bot0 = ClientChatState {
|
||||
swarm_state: swarm_state.clone(),
|
||||
chat_index: Arc::new(Mutex::new(0)),
|
||||
tx: tx.clone(),
|
||||
|
@ -254,7 +257,7 @@ mod tests {
|
|||
Ok(ChatPacket::new("a"))
|
||||
);
|
||||
// now a second bot joined and got a different chat message
|
||||
let bot1 = State {
|
||||
let bot1 = ClientChatState {
|
||||
swarm_state: swarm_state.clone(),
|
||||
chat_index: Arc::new(Mutex::new(0)),
|
||||
tx: tx.clone(),
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
/// Swarms are a way to conveniently control many bots.
|
||||
mod chat;
|
||||
mod plugins;
|
||||
|
||||
pub use self::plugins::*;
|
||||
use crate::HandleFn;
|
||||
use azalea_client::{start_ecs, Account, ChatPacket, Client, Event, JoinError};
|
||||
use azalea_client::{init_ecs_app, start_ecs, Account, ChatPacket, Client, Event, JoinError};
|
||||
use azalea_protocol::{
|
||||
connect::ConnectionError,
|
||||
resolver::{self, ResolverError},
|
||||
ServerAddress,
|
||||
};
|
||||
use azalea_world::WorldContainer;
|
||||
use bevy_app::Plugin;
|
||||
use bevy_ecs::system::Resource;
|
||||
use futures::future::join_all;
|
||||
use log::error;
|
||||
use parking_lot::{Mutex, RwLock};
|
||||
|
@ -27,25 +27,23 @@ use tokio::sync::mpsc;
|
|||
/// It's used to make the [`Swarm::add`] function work.
|
||||
///
|
||||
/// [`azalea::start_swarm`]: fn.start_swarm.html
|
||||
#[derive(Clone)]
|
||||
pub struct Swarm<S> {
|
||||
bot_datas: Arc<Mutex<Vec<(Client, S)>>>,
|
||||
|
||||
#[derive(Clone, Resource)]
|
||||
pub struct Swarm {
|
||||
pub ecs: Arc<Mutex<bevy_ecs::world::World>>,
|
||||
// bot_datas: Arc<Mutex<Vec<(Client, S)>>>,
|
||||
resolved_address: SocketAddr,
|
||||
address: ServerAddress,
|
||||
pub world_container: Arc<RwLock<WorldContainer>>,
|
||||
pub ecs_lock: Arc<Mutex<bevy_ecs::world::World>>,
|
||||
/// Plugins that are set for new bots
|
||||
plugins: Plugins,
|
||||
|
||||
bots_tx: mpsc::UnboundedSender<(Option<Event>, (Client, S))>,
|
||||
bots_tx: mpsc::UnboundedSender<(Option<Event>, Client)>,
|
||||
swarm_tx: mpsc::UnboundedSender<SwarmEvent>,
|
||||
|
||||
run_schedule_sender: mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
/// Create a new [`Swarm`].
|
||||
pub struct SwarmBuilder<S, SS, A, Fut, SwarmFut>
|
||||
pub struct SwarmBuilder<S, SS, Fut, SwarmFut>
|
||||
where
|
||||
Fut: Future<Output = Result<(), anyhow::Error>>,
|
||||
SwarmFut: Future<Output = Result<(), anyhow::Error>>,
|
||||
|
@ -61,10 +59,10 @@ where
|
|||
/// The state for the overall swarm.
|
||||
swarm_state: SS,
|
||||
/// The function that's called every time a bot receives an [`Event`].
|
||||
handler: HandleFn<Fut, S>,
|
||||
handler: Option<HandleFn<Fut, S>>,
|
||||
/// The function that's called every time the swarm receives a
|
||||
/// [`SwarmEvent`].
|
||||
swarm_handler: SwarmHandleFn<SwarmFut, S, SS>,
|
||||
swarm_handler: Option<SwarmHandleFn<SwarmFut, SS>>,
|
||||
|
||||
/// How long we should wait between each bot joining the server. Set to
|
||||
/// None to have every bot connect at the same time. None is different than
|
||||
|
@ -72,19 +70,25 @@ where
|
|||
/// the previous one to be ready.
|
||||
join_delay: Option<std::time::Duration>,
|
||||
}
|
||||
impl SwarmBuilder<S>
|
||||
impl<S, SS, Fut, SwarmFut> SwarmBuilder<S, SS, Fut, SwarmFut>
|
||||
where
|
||||
Fut: Future<Output = Result<(), anyhow::Error>>,
|
||||
SwarmFut: Future<Output = Result<(), anyhow::Error>>,
|
||||
S: Default + Send + Sync + Clone + 'static,
|
||||
SS: Default + Send + Sync + Clone + 'static,
|
||||
{
|
||||
/// Start creating the swarm.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
// we create the app here so plugins can add onto it.
|
||||
// the schedules won't run until [`Self::start`] is called.
|
||||
app: init_ecs_app(),
|
||||
|
||||
accounts: Vec::new(),
|
||||
states: Vec::new(),
|
||||
swarm_state: SS::default(),
|
||||
handler: |_, _, _| {},
|
||||
swarm_handler: |_, _, _| {},
|
||||
handler: None,
|
||||
swarm_handler: None,
|
||||
join_delay: None,
|
||||
}
|
||||
}
|
||||
|
@ -104,30 +108,42 @@ where
|
|||
///
|
||||
/// This will make the state for this client be the default, use
|
||||
/// [`Self::add_account_with_state`] to avoid that.
|
||||
pub fn add_account(&mut self, account: Vec<Account>) {
|
||||
pub fn add_account(&mut self, account: Account) {
|
||||
self.accounts.push(account);
|
||||
self.states.push(S::default());
|
||||
}
|
||||
/// Add an account with a custom initial state. Use just
|
||||
/// [`Self::add_account`] to use the Default implementation for the state.
|
||||
pub fn add_account_with_state(&mut self, account: Vec<Account>, state: S) {
|
||||
self.accounts.push(accounts);
|
||||
pub fn add_account_with_state(&mut self, account: Account, state: S) {
|
||||
self.accounts.push(account);
|
||||
self.states.push(state);
|
||||
}
|
||||
|
||||
/// Set the function that's called every time a bot receives an [`Event`].
|
||||
/// This is the way to handle normal per-bot events.
|
||||
///
|
||||
/// You can only have one client handler, calling this again will replace
|
||||
/// the old client handler function (you can have a client handler and swarm
|
||||
/// handler separately though).
|
||||
pub fn set_handler(&mut self, handler: HandleFn<Fut, S>) {
|
||||
self.handler = handler;
|
||||
self.handler = Some(handler);
|
||||
}
|
||||
/// Set the function that's called every time the swarm receives a
|
||||
/// [`SwarmEvent`]. This is the way to handle global swarm events.
|
||||
pub fn set_swarm_handler(&mut self, handler: SwarmHandleFn<SwarmFut, S, SS>) {
|
||||
self.swarm_handler = handler;
|
||||
///
|
||||
/// You can only have one swarm handler, calling this again will replace
|
||||
/// the old swarm handler function (you can have a client handler and swarm
|
||||
/// handler separately though).
|
||||
pub fn set_swarm_handler(&mut self, handler: SwarmHandleFn<SwarmFut, SS>) {
|
||||
self.swarm_handler = Some(handler);
|
||||
}
|
||||
|
||||
/// TODO: write plugin docs probably here
|
||||
fn add_plugin(&mut self) {}
|
||||
///
|
||||
/// notes: ECS entities might not be Minecraft entities, filter by MinecraftEntityId to make sure
|
||||
fn add_plugin<T: Plugin>(&mut self, plugin: T) {
|
||||
self.app.add_plugin(plugin);
|
||||
}
|
||||
|
||||
/// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
|
||||
/// server.
|
||||
|
@ -136,15 +152,15 @@ where
|
|||
/// that implements `TryInto<ServerAddress>`.
|
||||
///
|
||||
/// [`ServerAddress`]: azalea_protocol::ServerAddress
|
||||
pub async fn start(self, address: TryInto<ServerAddress>) {
|
||||
pub async fn start(self, address: impl TryInto<ServerAddress>) -> Result<(), SwarmStartError>{
|
||||
assert_eq!(
|
||||
options.accounts.len(),
|
||||
options.states.len(),
|
||||
self.accounts.len(),
|
||||
self.states.len(),
|
||||
"There must be exactly one state per bot."
|
||||
);
|
||||
|
||||
// convert the TryInto<ServerAddress> into a ServerAddress
|
||||
let address: ServerAddress = match options.address.try_into() {
|
||||
let address: ServerAddress = match address.try_into() {
|
||||
Ok(address) => address,
|
||||
Err(_) => return Err(SwarmStartError::InvalidAddress),
|
||||
};
|
||||
|
@ -154,22 +170,18 @@ where
|
|||
|
||||
let world_container = Arc::new(RwLock::new(WorldContainer::default()));
|
||||
|
||||
let plugins = options.plugins;
|
||||
|
||||
// we can't modify the swarm plugins after this
|
||||
let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
|
||||
let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
|
||||
let ecs_lock = start_ecs(run_schedule_receiver, run_schedule_sender.clone());
|
||||
let ecs_lock = start_ecs(self.app, run_schedule_receiver, run_schedule_sender.clone());
|
||||
|
||||
let mut swarm = Swarm {
|
||||
bot_datas: Arc::new(Mutex::new(Vec::new())),
|
||||
|
||||
// bot_datas: Arc::new(Mutex::new(Vec::new())),
|
||||
resolved_address,
|
||||
address,
|
||||
world_container,
|
||||
plugins,
|
||||
|
||||
bots_tx,
|
||||
|
||||
|
@ -178,21 +190,15 @@ where
|
|||
ecs_lock,
|
||||
run_schedule_sender,
|
||||
};
|
||||
self.app.insert_resource(swarm.clone());
|
||||
|
||||
{
|
||||
// the chat plugin is hacky and needs the swarm to be passed like this
|
||||
let (chat_swarm_state, chat_tx) = chat::SwarmState::new(swarm.clone());
|
||||
swarm.plugins.add(chat::Plugin {
|
||||
swarm_state: chat_swarm_state,
|
||||
tx: chat_tx,
|
||||
});
|
||||
}
|
||||
swarm.plugins.add(chat::Plugin);
|
||||
|
||||
let mut swarm_clone = swarm.clone();
|
||||
let join_task = tokio::spawn(async move {
|
||||
if let Some(join_delay) = options.join_delay {
|
||||
if let Some(join_delay) = self.join_delay {
|
||||
// if there's a join delay, then join one by one
|
||||
for (account, state) in options.accounts.iter().zip(options.states) {
|
||||
for (account, state) in self.accounts.iter().zip(self.states) {
|
||||
swarm_clone
|
||||
.add_with_exponential_backoff(account, state.clone())
|
||||
.await;
|
||||
|
@ -200,7 +206,7 @@ where
|
|||
}
|
||||
} else {
|
||||
let swarm_borrow = &swarm_clone;
|
||||
join_all(options.accounts.iter().zip(options.states).map(
|
||||
join_all(self.accounts.iter().zip(self.states).map(
|
||||
async move |(account, state)| -> Result<(), JoinError> {
|
||||
swarm_borrow
|
||||
.clone()
|
||||
|
@ -213,14 +219,14 @@ where
|
|||
}
|
||||
});
|
||||
|
||||
let swarm_state = options.swarm_state;
|
||||
let swarm_state = self.swarm_state;
|
||||
let mut internal_state = InternalSwarmState::default();
|
||||
|
||||
// Watch swarm_rx and send those events to the swarm_handle.
|
||||
let swarm_clone = swarm.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(event) = swarm_rx.recv().await {
|
||||
tokio::spawn((options.swarm_handle)(
|
||||
tokio::spawn((self.swarm_handle)(
|
||||
swarm_clone.clone(),
|
||||
event,
|
||||
swarm_state.clone(),
|
||||
|
@ -242,7 +248,7 @@ where
|
|||
}
|
||||
_ => {}
|
||||
}
|
||||
tokio::spawn((options.handle)(bot, event, state));
|
||||
tokio::spawn((self.handle)(bot, event, state));
|
||||
}
|
||||
|
||||
join_task.abort();
|
||||
|
@ -268,7 +274,7 @@ pub enum SwarmEvent {
|
|||
Chat(ChatPacket),
|
||||
}
|
||||
|
||||
pub type SwarmHandleFn<Fut, S, SS> = fn(Swarm<S>, SwarmEvent, SS) -> Fut;
|
||||
pub type SwarmHandleFn<Fut, SS> = fn(Swarm, SwarmEvent, SS) -> Fut;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SwarmStartError {
|
||||
|
@ -363,13 +369,10 @@ pub enum SwarmStartError {
|
|||
// ) -> Result<(), SwarmStartError> {
|
||||
// }
|
||||
|
||||
impl<S> Swarm<S>
|
||||
where
|
||||
S: Send + Sync + Clone + 'static,
|
||||
{
|
||||
impl Swarm {
|
||||
/// Add a new account to the swarm. You can remove it later by calling
|
||||
/// [`Client::disconnect`].
|
||||
pub async fn add(&mut self, account: &Account, state: S) -> Result<Client, JoinError> {
|
||||
pub async fn add<S: Clone>(&mut self, account: &Account, state: S) -> Result<Client, JoinError> {
|
||||
// tx is moved to the bot so it can send us events
|
||||
// rx is used to receive events from the bot
|
||||
// An event that causes the schedule to run. This is only used internally.
|
||||
|
@ -388,7 +391,6 @@ where
|
|||
let cloned_bot = bot.clone();
|
||||
let cloned_state = state.clone();
|
||||
let owned_account = account.clone();
|
||||
let bot_datas = self.bot_datas.clone();
|
||||
let swarm_tx = self.swarm_tx.clone();
|
||||
// send the init event immediately so it's the first thing we get
|
||||
swarm_tx.send(SwarmEvent::Init).unwrap();
|
||||
|
@ -402,20 +404,11 @@ where
|
|||
error!("Error sending event to swarm: {e}");
|
||||
}
|
||||
}
|
||||
// the bot disconnected, so we remove it from the swarm
|
||||
let mut bot_datas = bot_datas.lock();
|
||||
let index = bot_datas
|
||||
.iter()
|
||||
.position(|(b, _)| b.profile.uuid == cloned_bot.profile.uuid)
|
||||
.expect("bot disconnected but not found in swarm");
|
||||
bot_datas.remove(index);
|
||||
|
||||
swarm_tx
|
||||
.send(SwarmEvent::Disconnect(owned_account))
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
self.bot_datas.lock().push((bot.clone(), state.clone()));
|
||||
|
||||
Ok(bot)
|
||||
}
|
||||
|
@ -443,7 +436,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<S> IntoIterator for Swarm<S>
|
||||
impl IntoIterator for Swarm
|
||||
where
|
||||
S: Send + Sync + Clone + 'static,
|
||||
{
|
||||
|
@ -458,7 +451,8 @@ where
|
|||
/// }
|
||||
/// ```
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.bot_datas.lock().clone().into_iter()
|
||||
self.
|
||||
// self.bot_datas.lock().clone().into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue