mirror of
https://github.com/mat-1/azalea.git
synced 2025-08-02 06:16:04 +00:00
start converting swarm to use builder
This commit is contained in:
parent
8ea3d728c1
commit
228e53adad
10 changed files with 236 additions and 494 deletions
|
@ -1,7 +1,8 @@
|
|||
pub use crate::chat::ChatPacket;
|
||||
use crate::{
|
||||
local_player::{
|
||||
death_event, send_tick_event, update_in_loaded_chunk, LocalPlayer, PhysicsState, GameProfileComponent,
|
||||
death_event, send_tick_event, update_in_loaded_chunk, GameProfileComponent, LocalPlayer,
|
||||
PhysicsState,
|
||||
},
|
||||
movement::{local_player_ai_step, send_position, sprint_listener, walk_listener},
|
||||
packet_handling::{self, PacketHandlerPlugin},
|
||||
|
@ -183,7 +184,7 @@ impl Client {
|
|||
|
||||
// An event that causes the schedule to run. This is only used internally.
|
||||
let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
|
||||
let ecs_lock = start_ecs(run_schedule_receiver, run_schedule_sender.clone());
|
||||
let (ecs_lock, _app) = start_ecs(run_schedule_receiver, run_schedule_sender.clone());
|
||||
|
||||
{
|
||||
let mut ecs = ecs_lock.lock();
|
||||
|
@ -487,7 +488,7 @@ impl Client {
|
|||
pub fn start_ecs(
|
||||
run_schedule_receiver: mpsc::Receiver<()>,
|
||||
run_schedule_sender: mpsc::Sender<()>,
|
||||
) -> Arc<Mutex<bevy_ecs::world::World>> {
|
||||
) -> (Arc<Mutex<bevy_ecs::world::World>>, App) {
|
||||
// if you get an error right here that means you're doing something with locks
|
||||
// wrong read the error to see where the issue is
|
||||
// you might be able to just drop the lock or put it in its own scope to fix
|
||||
|
@ -534,7 +535,7 @@ pub fn start_ecs(
|
|||
));
|
||||
tokio::spawn(tick_run_schedule_loop(run_schedule_sender));
|
||||
|
||||
ecs
|
||||
(ecs, app)
|
||||
}
|
||||
|
||||
async fn run_schedule_loop(
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
|||
use azalea_world::entity::Entity;
|
||||
use bevy_ecs::{
|
||||
prelude::Component,
|
||||
query::{QueryItem, ROQueryItem, ReadOnlyWorldQuery, WorldQuery},
|
||||
query::{ROQueryItem, ReadOnlyWorldQuery, WorldQuery},
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ mod movement;
|
|||
pub mod packet_handling;
|
||||
pub mod ping;
|
||||
mod player;
|
||||
mod plugins;
|
||||
|
||||
pub use account::Account;
|
||||
pub use bevy_ecs as ecs;
|
||||
|
@ -29,4 +28,3 @@ pub use client::{start_ecs, ChatPacket, Client, ClientInformation, Event, JoinEr
|
|||
pub use local_player::{GameProfileComponent, LocalPlayer};
|
||||
pub use movement::{SprintDirection, StartSprintEvent, StartWalkEvent, WalkDirection};
|
||||
pub use player::PlayerInfo;
|
||||
pub use plugins::{Plugin, PluginState, PluginStates, Plugins};
|
||||
|
|
|
@ -1,144 +0,0 @@
|
|||
use crate::{client::Client, Event};
|
||||
use async_trait::async_trait;
|
||||
use nohash_hasher::NoHashHasher;
|
||||
use std::{
|
||||
any::{Any, TypeId},
|
||||
collections::HashMap,
|
||||
hash::BuildHasherDefault,
|
||||
};
|
||||
|
||||
type U64Hasher = BuildHasherDefault<NoHashHasher<u64>>;
|
||||
|
||||
// kind of based on https://docs.rs/http/latest/src/http/extensions.rs.html
|
||||
#[derive(Clone, Default)]
|
||||
pub struct PluginStates {
|
||||
map: Option<HashMap<TypeId, Box<dyn PluginState>, U64Hasher>>,
|
||||
}
|
||||
|
||||
/// A map of PluginState TypeIds to AnyPlugin objects. This can then be built
|
||||
/// into a [`PluginStates`] object to get a fresh new state based on this
|
||||
/// plugin.
|
||||
///
|
||||
/// If you're using the azalea crate, you should generate this from the
|
||||
/// `plugins!` macro.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct Plugins {
|
||||
map: Option<HashMap<TypeId, Box<dyn AnyPlugin>, U64Hasher>>,
|
||||
}
|
||||
|
||||
impl PluginStates {
|
||||
pub fn get<T: PluginState>(&self) -> Option<&T> {
|
||||
self.map
|
||||
.as_ref()
|
||||
.and_then(|map| map.get(&TypeId::of::<T>()))
|
||||
.and_then(|boxed| (boxed.as_ref() as &dyn Any).downcast_ref::<T>())
|
||||
}
|
||||
}
|
||||
|
||||
impl Plugins {
|
||||
/// Create a new empty set of plugins.
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Add a new plugin to this set.
|
||||
pub fn add<T: Plugin + Clone>(&mut self, plugin: T) {
|
||||
if self.map.is_none() {
|
||||
self.map = Some(HashMap::with_hasher(BuildHasherDefault::default()));
|
||||
}
|
||||
self.map
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.insert(TypeId::of::<T::State>(), Box::new(plugin));
|
||||
}
|
||||
|
||||
/// Build our plugin states from this set of plugins. Note that if you're
|
||||
/// using `azalea` you'll probably never need to use this as it's called
|
||||
/// for you.
|
||||
pub fn build(self) -> PluginStates {
|
||||
let mut map = HashMap::with_hasher(BuildHasherDefault::default());
|
||||
for (id, plugin) in self.map.unwrap().into_iter() {
|
||||
map.insert(id, plugin.build());
|
||||
}
|
||||
PluginStates { map: Some(map) }
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoIterator for PluginStates {
|
||||
type Item = Box<dyn PluginState>;
|
||||
type IntoIter = std::vec::IntoIter<Self::Item>;
|
||||
|
||||
/// Iterate over the plugin states.
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.map
|
||||
.map(|map| map.into_values().collect::<Vec<_>>())
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
/// A `PluginState` keeps the current state of a plugin for a client. All the
|
||||
/// fields must be atomic. Unique `PluginState`s are built from [`Plugin`]s.
|
||||
#[async_trait]
|
||||
pub trait PluginState: Send + Sync + PluginStateClone + Any + 'static {
|
||||
async fn handle(self: Box<Self>, event: Event, bot: Client);
|
||||
}
|
||||
|
||||
/// Plugins can keep their own personal state, listen to [`Event`]s, and add
|
||||
/// new functions to [`Client`].
|
||||
pub trait Plugin: Send + Sync + Any + 'static {
|
||||
type State: PluginState;
|
||||
|
||||
fn build(&self) -> Self::State;
|
||||
}
|
||||
|
||||
/// AnyPlugin is basically a Plugin but without the State associated type
|
||||
/// it has to exist so we can do a hashmap with Box<dyn AnyPlugin>
|
||||
#[doc(hidden)]
|
||||
pub trait AnyPlugin: Send + Sync + Any + AnyPluginClone + 'static {
|
||||
fn build(&self) -> Box<dyn PluginState>;
|
||||
}
|
||||
|
||||
impl<S: PluginState, B: Plugin<State = S> + Clone> AnyPlugin for B {
|
||||
fn build(&self) -> Box<dyn PluginState> {
|
||||
Box::new(self.build())
|
||||
}
|
||||
}
|
||||
|
||||
/// An internal trait that allows PluginState to be cloned.
|
||||
#[doc(hidden)]
|
||||
pub trait PluginStateClone {
|
||||
fn clone_box(&self) -> Box<dyn PluginState>;
|
||||
}
|
||||
impl<T> PluginStateClone for T
|
||||
where
|
||||
T: 'static + PluginState + Clone,
|
||||
{
|
||||
fn clone_box(&self) -> Box<dyn PluginState> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
impl Clone for Box<dyn PluginState> {
|
||||
fn clone(&self) -> Self {
|
||||
self.clone_box()
|
||||
}
|
||||
}
|
||||
|
||||
/// An internal trait that allows AnyPlugin to be cloned.
|
||||
#[doc(hidden)]
|
||||
pub trait AnyPluginClone {
|
||||
fn clone_box(&self) -> Box<dyn AnyPlugin>;
|
||||
}
|
||||
impl<T> AnyPluginClone for T
|
||||
where
|
||||
T: 'static + Plugin + Clone,
|
||||
{
|
||||
fn clone_box(&self) -> Box<dyn AnyPlugin> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
impl Clone for Box<dyn AnyPlugin> {
|
||||
fn clone(&self) -> Self {
|
||||
self.clone_box()
|
||||
}
|
||||
}
|
|
@ -15,8 +15,7 @@ pub struct ServerboundSetJigsawBlockPacket {
|
|||
pub target: ResourceLocation,
|
||||
pub pool: ResourceLocation,
|
||||
pub final_state: String,
|
||||
pub joint: String, /* TODO: Does JigsawBlockEntity$JointType::getSerializedName, may not be
|
||||
* implemented */
|
||||
pub joint: String,
|
||||
}
|
||||
|
||||
pub enum JointType {
|
||||
|
|
|
@ -166,8 +166,6 @@ impl Command for RelativeEntityUpdate {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: optimization: switch out the `HashMap<Entity, _>`s for `IntMap`s
|
||||
|
||||
/// Things that are shared between all the partial worlds.
|
||||
#[derive(Resource, Default)]
|
||||
pub struct EntityInfos {
|
||||
|
|
|
@ -85,9 +85,10 @@ pub mod prelude;
|
|||
mod start;
|
||||
mod swarm;
|
||||
|
||||
pub use azalea_block::*;
|
||||
pub use azalea_block as blocks;
|
||||
pub use azalea_client::*;
|
||||
pub use azalea_core::{BlockPos, Vec3};
|
||||
pub use azalea_protocol as protocol;
|
||||
pub use azalea_registry::EntityKind;
|
||||
pub use azalea_world::{entity, World};
|
||||
pub use start::{start, Options};
|
||||
|
|
|
@ -4,7 +4,7 @@ mod plugins;
|
|||
|
||||
pub use self::plugins::*;
|
||||
use crate::HandleFn;
|
||||
use azalea_client::{start_ecs, Account, ChatPacket, Client, Event, JoinError, Plugins};
|
||||
use azalea_client::{start_ecs, Account, ChatPacket, Client, Event, JoinError};
|
||||
use azalea_protocol::{
|
||||
connect::ConnectionError,
|
||||
resolver::{self, ResolverError},
|
||||
|
@ -18,25 +18,6 @@ use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
|
|||
use thiserror::Error;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
/// A helper macro that generates a [`SwarmPlugins`] struct from a list of
|
||||
/// objects that implement [`SwarmPlugin`].
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// swarm_plugins![azalea_pathfinder::Plugin];
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! swarm_plugins {
|
||||
($($plugin:expr),*) => {
|
||||
{
|
||||
let mut plugins = azalea::SwarmPlugins::new();
|
||||
$(
|
||||
plugins.add($plugin);
|
||||
)*
|
||||
plugins
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// A swarm is a way to conveniently control many bots at once, while also
|
||||
/// being able to control bots at an individual level when desired.
|
||||
///
|
||||
|
@ -52,7 +33,7 @@ pub struct Swarm<S> {
|
|||
|
||||
resolved_address: SocketAddr,
|
||||
address: ServerAddress,
|
||||
pub worlds: Arc<RwLock<WorldContainer>>,
|
||||
pub world_container: Arc<RwLock<WorldContainer>>,
|
||||
pub ecs_lock: Arc<Mutex<bevy_ecs::world::World>>,
|
||||
/// Plugins that are set for new bots
|
||||
plugins: Plugins,
|
||||
|
@ -63,6 +44,213 @@ pub struct Swarm<S> {
|
|||
run_schedule_sender: mpsc::Sender<()>,
|
||||
}
|
||||
|
||||
/// Create a new [`Swarm`].
|
||||
pub struct SwarmBuilder<S, SS, A, Fut, SwarmFut>
|
||||
where
|
||||
Fut: Future<Output = Result<(), anyhow::Error>>,
|
||||
SwarmFut: Future<Output = Result<(), anyhow::Error>>,
|
||||
S: Default + Send + Sync + Clone + 'static,
|
||||
SS: Default + Send + Sync + Clone + 'static,
|
||||
{
|
||||
app: bevy_app::App,
|
||||
/// The accounts that are going to join the server.
|
||||
accounts: Vec<Account>,
|
||||
/// The individual bot states. This must be the same length as `accounts`,
|
||||
/// since each bot gets one state.
|
||||
states: Vec<S>,
|
||||
/// The state for the overall swarm.
|
||||
swarm_state: SS,
|
||||
/// The function that's called every time a bot receives an [`Event`].
|
||||
handler: HandleFn<Fut, S>,
|
||||
/// The function that's called every time the swarm receives a
|
||||
/// [`SwarmEvent`].
|
||||
swarm_handler: SwarmHandleFn<SwarmFut, S, SS>,
|
||||
|
||||
/// How long we should wait between each bot joining the server. Set to
|
||||
/// None to have every bot connect at the same time. None is different than
|
||||
/// a duration of 0, since if a duration is present the bots will wait for
|
||||
/// the previous one to be ready.
|
||||
join_delay: Option<std::time::Duration>,
|
||||
}
|
||||
impl SwarmBuilder<S>
|
||||
where
|
||||
S: Default + Send + Sync + Clone + 'static,
|
||||
SS: Default + Send + Sync + Clone + 'static,
|
||||
{
|
||||
/// Start creating the swarm.
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
accounts: Vec::new(),
|
||||
states: Vec::new(),
|
||||
swarm_state: SS::default(),
|
||||
handler: |_, _, _| {},
|
||||
swarm_handler: |_, _, _| {},
|
||||
join_delay: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a vec of [`Account`]s to the swarm.
|
||||
///
|
||||
/// Use [`Self::add_account`] to only add one account. If you want the
|
||||
/// clients to have different default states, add them one at a time with
|
||||
/// [`Self::add_account_with_state`].
|
||||
pub fn add_accounts(&mut self, accounts: Vec<Account>) {
|
||||
for account in accounts {
|
||||
self.add_account(account);
|
||||
}
|
||||
}
|
||||
/// Add a single new [`Account`] to the swarm. Use [`add_accounts`] to add
|
||||
/// multiple accounts at a time.
|
||||
///
|
||||
/// This will make the state for this client be the default, use
|
||||
/// [`Self::add_account_with_state`] to avoid that.
|
||||
pub fn add_account(&mut self, account: Vec<Account>) {
|
||||
self.accounts.push(account);
|
||||
self.states.push(S::default());
|
||||
}
|
||||
/// Add an account with a custom initial state. Use just
|
||||
/// [`Self::add_account`] to use the Default implementation for the state.
|
||||
pub fn add_account_with_state(&mut self, account: Vec<Account>, state: S) {
|
||||
self.accounts.push(accounts);
|
||||
self.states.push(state);
|
||||
}
|
||||
|
||||
/// Set the function that's called every time a bot receives an [`Event`].
|
||||
/// This is the way to handle normal per-bot events.
|
||||
pub fn set_handler(&mut self, handler: HandleFn<Fut, S>) {
|
||||
self.handler = handler;
|
||||
}
|
||||
/// Set the function that's called every time the swarm receives a
|
||||
/// [`SwarmEvent`]. This is the way to handle global swarm events.
|
||||
pub fn set_swarm_handler(&mut self, handler: SwarmHandleFn<SwarmFut, S, SS>) {
|
||||
self.swarm_handler = handler;
|
||||
}
|
||||
|
||||
/// TODO: write plugin docs probably here
|
||||
fn add_plugin(&mut self) {}
|
||||
|
||||
/// Build this `SwarmBuilder` into an actual [`Swarm`] and join the given
|
||||
/// server.
|
||||
///
|
||||
/// The `address` argumentcan be a `&str`, [`ServerAddress`], or anything
|
||||
/// that implements `TryInto<ServerAddress>`.
|
||||
///
|
||||
/// [`ServerAddress`]: azalea_protocol::ServerAddress
|
||||
pub async fn start(self, address: TryInto<ServerAddress>) {
|
||||
assert_eq!(
|
||||
options.accounts.len(),
|
||||
options.states.len(),
|
||||
"There must be exactly one state per bot."
|
||||
);
|
||||
|
||||
// convert the TryInto<ServerAddress> into a ServerAddress
|
||||
let address: ServerAddress = match options.address.try_into() {
|
||||
Ok(address) => address,
|
||||
Err(_) => return Err(SwarmStartError::InvalidAddress),
|
||||
};
|
||||
|
||||
// resolve the address
|
||||
let resolved_address = resolver::resolve_address(&address).await?;
|
||||
|
||||
let world_container = Arc::new(RwLock::new(WorldContainer::default()));
|
||||
|
||||
let plugins = options.plugins;
|
||||
|
||||
// we can't modify the swarm plugins after this
|
||||
let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
|
||||
let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
|
||||
let ecs_lock = start_ecs(run_schedule_receiver, run_schedule_sender.clone());
|
||||
|
||||
let mut swarm = Swarm {
|
||||
bot_datas: Arc::new(Mutex::new(Vec::new())),
|
||||
|
||||
resolved_address,
|
||||
address,
|
||||
world_container,
|
||||
plugins,
|
||||
|
||||
bots_tx,
|
||||
|
||||
swarm_tx: swarm_tx.clone(),
|
||||
|
||||
ecs_lock,
|
||||
run_schedule_sender,
|
||||
};
|
||||
|
||||
{
|
||||
// the chat plugin is hacky and needs the swarm to be passed like this
|
||||
let (chat_swarm_state, chat_tx) = chat::SwarmState::new(swarm.clone());
|
||||
swarm.plugins.add(chat::Plugin {
|
||||
swarm_state: chat_swarm_state,
|
||||
tx: chat_tx,
|
||||
});
|
||||
}
|
||||
|
||||
let mut swarm_clone = swarm.clone();
|
||||
let join_task = tokio::spawn(async move {
|
||||
if let Some(join_delay) = options.join_delay {
|
||||
// if there's a join delay, then join one by one
|
||||
for (account, state) in options.accounts.iter().zip(options.states) {
|
||||
swarm_clone
|
||||
.add_with_exponential_backoff(account, state.clone())
|
||||
.await;
|
||||
tokio::time::sleep(join_delay).await;
|
||||
}
|
||||
} else {
|
||||
let swarm_borrow = &swarm_clone;
|
||||
join_all(options.accounts.iter().zip(options.states).map(
|
||||
async move |(account, state)| -> Result<(), JoinError> {
|
||||
swarm_borrow
|
||||
.clone()
|
||||
.add_with_exponential_backoff(account, state.clone())
|
||||
.await;
|
||||
Ok(())
|
||||
},
|
||||
))
|
||||
.await;
|
||||
}
|
||||
});
|
||||
|
||||
let swarm_state = options.swarm_state;
|
||||
let mut internal_state = InternalSwarmState::default();
|
||||
|
||||
// Watch swarm_rx and send those events to the swarm_handle.
|
||||
let swarm_clone = swarm.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(event) = swarm_rx.recv().await {
|
||||
tokio::spawn((options.swarm_handle)(
|
||||
swarm_clone.clone(),
|
||||
event,
|
||||
swarm_state.clone(),
|
||||
));
|
||||
}
|
||||
});
|
||||
|
||||
// bot events
|
||||
while let Some((Some(event), (bot, state))) = bots_rx.recv().await {
|
||||
// remove this #[allow] when more checks are added
|
||||
// TODO: actually it'd be better to just have this in a system
|
||||
#[allow(clippy::single_match)]
|
||||
match &event {
|
||||
Event::Login => {
|
||||
internal_state.bots_joined += 1;
|
||||
if internal_state.bots_joined == swarm.bot_datas.lock().len() {
|
||||
swarm_tx.send(SwarmEvent::Login).unwrap();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
tokio::spawn((options.handle)(bot, event, state));
|
||||
}
|
||||
|
||||
join_task.abort();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// An event about something that doesn't have to do with a single bot.
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum SwarmEvent {
|
||||
|
@ -82,49 +270,6 @@ pub enum SwarmEvent {
|
|||
|
||||
pub type SwarmHandleFn<Fut, S, SS> = fn(Swarm<S>, SwarmEvent, SS) -> Fut;
|
||||
|
||||
/// The options that are passed to [`azalea::start_swarm`].
|
||||
///
|
||||
/// [`azalea::start_swarm`]: crate::start_swarm()
|
||||
pub struct SwarmOptions<S, SS, A, Fut, SwarmFut>
|
||||
where
|
||||
A: TryInto<ServerAddress>,
|
||||
Fut: Future<Output = Result<(), anyhow::Error>>,
|
||||
SwarmFut: Future<Output = Result<(), anyhow::Error>>,
|
||||
{
|
||||
/// The address of the server that we're connecting to. This can be a
|
||||
/// `&str`, [`ServerAddress`], or anything that implements
|
||||
/// `TryInto<ServerAddress>`.
|
||||
///
|
||||
/// [`ServerAddress`]: azalea_protocol::ServerAddress
|
||||
pub address: A,
|
||||
/// The accounts that are going to join the server.
|
||||
pub accounts: Vec<Account>,
|
||||
/// The plugins that are going to be used for all the bots.
|
||||
///
|
||||
/// You can usually leave this as `plugins![]`.
|
||||
pub plugins: Plugins,
|
||||
/// The plugins that are going to be used for the swarm.
|
||||
///
|
||||
/// You can usually leave this as `swarm_plugins![]`.
|
||||
pub swarm_plugins: SwarmPlugins<S>,
|
||||
/// The individual bot states. This must be the same length as `accounts`,
|
||||
/// since each bot gets one state.
|
||||
pub states: Vec<S>,
|
||||
/// The state for the overall swarm.
|
||||
pub swarm_state: SS,
|
||||
/// The function that's called every time a bot receives an [`Event`].
|
||||
pub handle: HandleFn<Fut, S>,
|
||||
/// The function that's called every time the swarm receives a
|
||||
/// [`SwarmEvent`].
|
||||
pub swarm_handle: SwarmHandleFn<SwarmFut, S, SS>,
|
||||
|
||||
/// How long we should wait between each bot joining the server. Set to
|
||||
/// None to have every bot connect at the same time. None is different than
|
||||
/// a duration of 0, since if a duration is present the bots will wait for
|
||||
/// the previous one to be ready.
|
||||
pub join_delay: Option<std::time::Duration>,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SwarmStartError {
|
||||
#[error("Invalid address")]
|
||||
|
@ -207,127 +352,16 @@ pub enum SwarmStartError {
|
|||
/// }
|
||||
/// Ok(())
|
||||
/// }
|
||||
pub async fn start_swarm<
|
||||
S: Send + Sync + Clone + 'static,
|
||||
SS: Send + Sync + Clone + 'static,
|
||||
A: Send + TryInto<ServerAddress>,
|
||||
Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
|
||||
SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
|
||||
>(
|
||||
options: SwarmOptions<S, SS, A, Fut, SwarmFut>,
|
||||
) -> Result<(), SwarmStartError> {
|
||||
assert_eq!(
|
||||
options.accounts.len(),
|
||||
options.states.len(),
|
||||
"There must be exactly one state per bot."
|
||||
);
|
||||
|
||||
// convert the TryInto<ServerAddress> into a ServerAddress
|
||||
let address: ServerAddress = match options.address.try_into() {
|
||||
Ok(address) => address,
|
||||
Err(_) => return Err(SwarmStartError::InvalidAddress),
|
||||
};
|
||||
|
||||
// resolve the address
|
||||
let resolved_address = resolver::resolve_address(&address).await?;
|
||||
|
||||
let world_container = Arc::new(RwLock::new(WorldContainer::default()));
|
||||
|
||||
let plugins = options.plugins;
|
||||
|
||||
// we can't modify the swarm plugins after this
|
||||
let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
|
||||
let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let (run_schedule_sender, run_schedule_receiver) = mpsc::channel(1);
|
||||
let ecs_lock = start_ecs(run_schedule_receiver, run_schedule_sender.clone());
|
||||
|
||||
let mut swarm = Swarm {
|
||||
bot_datas: Arc::new(Mutex::new(Vec::new())),
|
||||
|
||||
resolved_address,
|
||||
address,
|
||||
worlds: world_container,
|
||||
plugins,
|
||||
|
||||
bots_tx,
|
||||
|
||||
swarm_tx: swarm_tx.clone(),
|
||||
|
||||
ecs_lock,
|
||||
run_schedule_sender,
|
||||
};
|
||||
|
||||
{
|
||||
// the chat plugin is hacky and needs the swarm to be passed like this
|
||||
let (chat_swarm_state, chat_tx) = chat::SwarmState::new(swarm.clone());
|
||||
swarm.plugins.add(chat::Plugin {
|
||||
swarm_state: chat_swarm_state,
|
||||
tx: chat_tx,
|
||||
});
|
||||
}
|
||||
|
||||
let mut swarm_clone = swarm.clone();
|
||||
let join_task = tokio::spawn(async move {
|
||||
if let Some(join_delay) = options.join_delay {
|
||||
// if there's a join delay, then join one by one
|
||||
for (account, state) in options.accounts.iter().zip(options.states) {
|
||||
swarm_clone
|
||||
.add_with_exponential_backoff(account, state.clone())
|
||||
.await;
|
||||
tokio::time::sleep(join_delay).await;
|
||||
}
|
||||
} else {
|
||||
let swarm_borrow = &swarm_clone;
|
||||
join_all(options.accounts.iter().zip(options.states).map(
|
||||
async move |(account, state)| -> Result<(), JoinError> {
|
||||
swarm_borrow
|
||||
.clone()
|
||||
.add_with_exponential_backoff(account, state.clone())
|
||||
.await;
|
||||
Ok(())
|
||||
},
|
||||
))
|
||||
.await;
|
||||
}
|
||||
});
|
||||
|
||||
let swarm_state = options.swarm_state;
|
||||
let mut internal_state = InternalSwarmState::default();
|
||||
|
||||
// Watch swarm_rx and send those events to the swarm_handle.
|
||||
let swarm_clone = swarm.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(event) = swarm_rx.recv().await {
|
||||
tokio::spawn((options.swarm_handle)(
|
||||
swarm_clone.clone(),
|
||||
event,
|
||||
swarm_state.clone(),
|
||||
));
|
||||
}
|
||||
});
|
||||
|
||||
// bot events
|
||||
while let Some((Some(event), (bot, state))) = bots_rx.recv().await {
|
||||
// remove this #[allow] when more checks are added
|
||||
// TODO: actually it'd be better to just have this in a system
|
||||
#[allow(clippy::single_match)]
|
||||
match &event {
|
||||
Event::Login => {
|
||||
internal_state.bots_joined += 1;
|
||||
if internal_state.bots_joined == swarm.bot_datas.lock().len() {
|
||||
swarm_tx.send(SwarmEvent::Login).unwrap();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
tokio::spawn((options.handle)(bot, event, state));
|
||||
}
|
||||
|
||||
join_task.abort();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
// pub async fn start_swarm<
|
||||
// S: Send + Sync + Clone + 'static,
|
||||
// SS: Send + Sync + Clone + 'static,
|
||||
// A: Send + TryInto<ServerAddress>,
|
||||
// Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
|
||||
// SwarmFut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
|
||||
// >(
|
||||
// options: SwarmOptions<S, SS, A, Fut, SwarmFut>,
|
||||
// ) -> Result<(), SwarmStartError> {
|
||||
// }
|
||||
|
||||
impl<S> Swarm<S>
|
||||
where
|
||||
|
@ -373,7 +407,7 @@ where
|
|||
let index = bot_datas
|
||||
.iter()
|
||||
.position(|(b, _)| b.profile.uuid == cloned_bot.profile.uuid)
|
||||
.expect("bot disconnected but not found in swarm");
|
||||
.expect("bot disconnected but not found in swarm");
|
||||
bot_datas.remove(index);
|
||||
|
||||
swarm_tx
|
||||
|
|
|
@ -1,135 +0,0 @@
|
|||
use crate::{Swarm, SwarmEvent};
|
||||
use async_trait::async_trait;
|
||||
use nohash_hasher::NoHashHasher;
|
||||
use std::{
|
||||
any::{Any, TypeId},
|
||||
collections::HashMap,
|
||||
hash::BuildHasherDefault,
|
||||
};
|
||||
|
||||
type U64Hasher = BuildHasherDefault<NoHashHasher<u64>>;
|
||||
|
||||
// kind of based on https://docs.rs/http/latest/src/http/extensions.rs.html
|
||||
/// A map of plugin ids to [`SwarmPlugin`] trait objects. The client stores
|
||||
/// this so we can keep the state for our [`Swarm`] plugins.
|
||||
///
|
||||
/// If you're using azalea, you should generate this from the `swarm_plugins!`
|
||||
/// macro.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct SwarmPlugins<S> {
|
||||
map: Option<HashMap<TypeId, Box<dyn SwarmPlugin<S>>, U64Hasher>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SwarmPluginStates<S> {
|
||||
map: Option<HashMap<TypeId, Box<dyn SwarmPluginState<S>>, U64Hasher>>,
|
||||
}
|
||||
|
||||
impl<S> SwarmPluginStates<S> {
|
||||
pub fn get<T: SwarmPluginState<S>>(&self) -> Option<&T> {
|
||||
self.map
|
||||
.as_ref()
|
||||
.and_then(|map| map.get(&TypeId::of::<T>()))
|
||||
.and_then(|boxed| (boxed.as_ref() as &dyn Any).downcast_ref::<T>())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> SwarmPlugins<S>
|
||||
where
|
||||
S: 'static,
|
||||
{
|
||||
/// Create a new empty set of plugins.
|
||||
pub fn new() -> Self {
|
||||
Self { map: None }
|
||||
}
|
||||
|
||||
/// Add a new plugin to this set.
|
||||
pub fn add<T: SwarmPlugin<S>>(&mut self, plugin: T) {
|
||||
if self.map.is_none() {
|
||||
self.map = Some(HashMap::with_hasher(BuildHasherDefault::default()));
|
||||
}
|
||||
self.map
|
||||
.as_mut()
|
||||
.unwrap()
|
||||
.insert(TypeId::of::<T>(), Box::new(plugin));
|
||||
}
|
||||
|
||||
/// Build our plugin states from this set of plugins. Note that if you're
|
||||
/// using `azalea` you'll probably never need to use this as it's called
|
||||
/// for you.
|
||||
pub fn build(self) -> SwarmPluginStates<S> {
|
||||
if self.map.is_none() {
|
||||
return SwarmPluginStates { map: None };
|
||||
}
|
||||
let mut map = HashMap::with_hasher(BuildHasherDefault::default());
|
||||
for (id, plugin) in self.map.unwrap().into_iter() {
|
||||
map.insert(id, plugin.build());
|
||||
}
|
||||
SwarmPluginStates { map: Some(map) }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> IntoIterator for SwarmPluginStates<S> {
|
||||
type Item = Box<dyn SwarmPluginState<S>>;
|
||||
type IntoIter = std::vec::IntoIter<Self::Item>;
|
||||
|
||||
/// Iterate over the plugin states.
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.map
|
||||
.map(|map| map.into_values().collect::<Vec<_>>())
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
/// A `SwarmPluginState` keeps the current state of a plugin for a client. All
|
||||
/// the fields must be atomic. Unique `SwarmPluginState`s are built from
|
||||
/// [`SwarmPlugin`]s.
|
||||
#[async_trait]
|
||||
pub trait SwarmPluginState<S>: Send + Sync + SwarmPluginStateClone<S> + Any + 'static {
|
||||
async fn handle(self: Box<Self>, event: SwarmEvent, swarm: Swarm<S>);
|
||||
}
|
||||
|
||||
/// Swarm plugins can keep their own personal state ([`SwarmPluginState`]),
|
||||
/// listen to [`SwarmEvent`]s, and add new functions to [`Swarm`].
|
||||
pub trait SwarmPlugin<S>: Send + Sync + SwarmPluginClone<S> + Any + 'static {
|
||||
fn build(&self) -> Box<dyn SwarmPluginState<S>>;
|
||||
}
|
||||
|
||||
/// An internal trait that allows SwarmPluginState to be cloned.
|
||||
#[doc(hidden)]
|
||||
pub trait SwarmPluginStateClone<S> {
|
||||
fn clone_box(&self) -> Box<dyn SwarmPluginState<S>>;
|
||||
}
|
||||
impl<T, S> SwarmPluginStateClone<S> for T
|
||||
where
|
||||
T: 'static + SwarmPluginState<S> + Clone,
|
||||
{
|
||||
fn clone_box(&self) -> Box<dyn SwarmPluginState<S>> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
impl<S> Clone for Box<dyn SwarmPluginState<S>> {
|
||||
fn clone(&self) -> Self {
|
||||
self.clone_box()
|
||||
}
|
||||
}
|
||||
|
||||
/// An internal trait that allows SwarmPlugin to be cloned.
|
||||
#[doc(hidden)]
|
||||
pub trait SwarmPluginClone<S> {
|
||||
fn clone_box(&self) -> Box<dyn SwarmPlugin<S>>;
|
||||
}
|
||||
impl<T, S> SwarmPluginClone<S> for T
|
||||
where
|
||||
T: 'static + SwarmPlugin<S> + Clone,
|
||||
{
|
||||
fn clone_box(&self) -> Box<dyn SwarmPlugin<S>> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
impl<S> Clone for Box<dyn SwarmPlugin<S>> {
|
||||
fn clone(&self) -> Self {
|
||||
self.clone_box()
|
||||
}
|
||||
}
|
|
@ -53,23 +53,13 @@ async fn main() -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
loop {
|
||||
let e = azalea::start_swarm(azalea::SwarmOptions {
|
||||
accounts: accounts.clone(),
|
||||
address: "localhost",
|
||||
|
||||
states: states.clone(),
|
||||
swarm_state: SwarmState::default(),
|
||||
|
||||
plugins: plugins![],
|
||||
swarm_plugins: swarm_plugins![],
|
||||
|
||||
handle,
|
||||
swarm_handle,
|
||||
|
||||
join_delay: Some(Duration::from_millis(1000)),
|
||||
// join_delay: None,
|
||||
})
|
||||
.await;
|
||||
let e = azalea::SwarmBuilder::new()
|
||||
.add_accounts(accounts.clone())
|
||||
.set_handler(handle)
|
||||
.swarm_handle(swarm_handle)
|
||||
.join_delay(Some(Duration::from_millis(1000)))
|
||||
.start("localhost")
|
||||
.await;
|
||||
println!("{e:?}");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue