1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

add_account works

This commit is contained in:
mat 2022-11-20 15:38:50 -06:00
parent 3c59b156a9
commit af1bb7e86f

View file

@ -2,26 +2,19 @@ mod plugins;
pub use self::plugins::*;
use crate::{bot, HandleFn};
use async_trait::async_trait;
use azalea_client::{
Account, Client, ClientInformation, Event, JoinError, PhysicsState, Player, Plugin, Plugins,
};
use azalea_client::{Account, Client, Event, JoinError, Plugins};
use azalea_protocol::{
connect::{Connection, ConnectionError},
resolver::{self, ResolverError},
ServerAddress,
};
use azalea_world::WeakWorldContainer;
use azalea_world::World;
use futures::{
future::{join_all, select_all, try_join_all},
FutureExt,
};
use futures::future::join_all;
use log::error;
use parking_lot::{Mutex, RwLock};
use std::{any::Any, future::Future, net::SocketAddr, sync::Arc, time::Duration};
use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
/// A helper macro that generates a [`Plugins`] struct from a list of objects
/// that implement [`Plugin`].
@ -49,15 +42,25 @@ macro_rules! swarm_plugins {
/// It's used to make the [`Swarm::add_account`] function work.
#[derive(Clone)]
pub struct Swarm<S> {
bots: Arc<Mutex<Vec<Client>>>,
receivers: Arc<Mutex<Vec<UnboundedReceiver<Event>>>>,
states: Arc<Mutex<Vec<S>>>,
bots: Arc<Mutex<Vec<SwarmBotData<S>>>>,
resolved_address: SocketAddr,
address: ServerAddress,
world_container: Arc<RwLock<WeakWorldContainer>>,
/// Plugins that are set for new bots
plugins: Plugins,
/// A single receiver that combines all the receivers of all the bots.
/// (bot index, event)
bots_rx: Arc<Mutex<UnboundedReceiver<(Option<Event>, SwarmBotData<S>)>>>,
bots_tx: Arc<Mutex<UnboundedSender<(Option<Event>, SwarmBotData<S>)>>>,
}
/// The data stored for each bot in the swarm.
#[derive(Clone)]
pub struct SwarmBotData<S> {
pub bot: Client,
pub state: S,
}
/// An event about something that doesn't have to do with a single bot.
@ -143,14 +146,18 @@ pub async fn start_swarm<
let mut plugins = options.plugins;
plugins.add(bot::Plugin::default());
let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
let mut swarm = Swarm {
bots: Arc::new(Mutex::new(Vec::new())),
receivers: Arc::new(Mutex::new(Vec::new())),
states: Arc::new(Mutex::new(Vec::new())),
resolved_address,
address,
world_container,
plugins,
bots_rx: Arc::new(Mutex::new(bots_rx)),
bots_tx: Arc::new(Mutex::new(bots_tx)),
};
let mut swarm_clone = swarm.clone();
@ -211,9 +218,8 @@ pub async fn start_swarm<
};
// bot events
while let (Some(event), bot_index) = swarm.bot_recv().await {
let bot = swarm.bots.lock()[bot_index].clone();
let bot_state = swarm.states.lock()[bot_index].clone();
while let (Some(event), SwarmBotData { bot, state, .. }) = swarm.bot_recv().await {
// bot event handling
let cloned_plugins = (*bot.plugins).clone();
for plugin in cloned_plugins.into_iter() {
tokio::spawn(plugin.handle(event.clone(), bot.clone()));
@ -230,7 +236,7 @@ pub async fn start_swarm<
_ => {}
}
tokio::spawn((options.handle)(bot, event, bot_state));
tokio::spawn((options.handle)(bot, event, state));
}
let _ = join_task.abort();
@ -242,36 +248,52 @@ impl<S> Swarm<S>
where
S: Send + Sync + Clone + 'static,
{
/// Wait for any bot to get an event. We return the event and index (so we
/// can get the state and bot from that index)
async fn bot_recv(&mut self) -> (Option<Event>, usize) {
let mut receivers = self.receivers.lock();
if receivers.is_empty() {
// TODO
}
let mut futures = Vec::with_capacity(receivers.len());
for rx in receivers.iter_mut() {
futures.push(rx.recv().boxed());
}
let (event, index, _remaining) = select_all(futures).await;
(event, index)
/// Wait for any bot to get an event. We return the event and SwarmBotData
async fn bot_recv(&mut self) -> (Option<Event>, SwarmBotData<S>) {
let mut bots_rx = self.bots_rx.lock();
let (event, bot) = bots_rx.recv().await.unwrap();
(event, bot)
}
/// Add a new account as part of the swarm.
pub async fn add_account(&mut self, account: &Account, state: S) -> Result<Client, JoinError> {
let conn = Connection::new(&self.resolved_address).await?;
let (conn, game_profile) = Client::handshaw ake(conn, account, &self.address.clone()).await?;
let (conn, game_profile) = Client::handshake(conn, account, &self.address.clone()).await?;
let (tx, rx) = mpsc::unbounded_channel();
// tx is moved to the client so it can send us events
// rx is used to receive events from the client
let (tx, mut rx) = mpsc::unbounded_channel();
let mut client = Client::new(game_profile, conn, Some(self.world_container.clone()));
tx.send(Event::Initialize).unwrap();
client.start_tasks(tx);
client.plugins = Arc::new(self.plugins.clone());
self.bots.lock().push(client.clone());
self.receivers.lock().push(rx);
self.states.lock().push(state.clone());
let cloned_bots_tx = self.bots_tx.clone();
let cloned_client = client.clone();
let cloned_state = state.clone();
let cloned_plugins = (*client.plugins).clone();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
// we can't handle events here (since we can't copy the handler),
// they're handled above in start_swarm
if let Err(e) = cloned_bots_tx.lock().send((
Some(event),
SwarmBotData {
bot: cloned_client.clone(),
state: cloned_state.clone(),
},
)) {
error!("Error sending event to swarm: {e}");
}
}
});
self.bots.lock().push(SwarmBotData {
bot: client.clone(),
state: state.clone(),
});
let uuid = client.player.read().uuid;
Ok(client)
}