1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 23:44:38 +00:00

start adding swarm

This commit is contained in:
Ubuntu 2022-11-01 20:58:36 +00:00
commit 70dd62c785
8 changed files with 315 additions and 144 deletions

37
Cargo.lock generated
View file

@ -108,6 +108,7 @@ dependencies = [
"azalea-core",
"azalea-protocol",
"env_logger",
"futures",
"parking_lot 0.12.1",
"thiserror",
"tokio",
@ -737,9 +738,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c"
checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
dependencies = [
"futures-channel",
"futures-core",
@ -752,9 +753,9 @@ dependencies = [
[[package]]
name = "futures-channel"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
dependencies = [
"futures-core",
"futures-sink",
@ -762,15 +763,15 @@ dependencies = [
[[package]]
name = "futures-core"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"
[[package]]
name = "futures-executor"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab"
checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2"
dependencies = [
"futures-core",
"futures-task",
@ -779,15 +780,15 @@ dependencies = [
[[package]]
name = "futures-io"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
[[package]]
name = "futures-macro"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
dependencies = [
"proc-macro2",
"quote",
@ -796,21 +797,21 @@ dependencies = [
[[package]]
name = "futures-sink"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"
[[package]]
name = "futures-task"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1"
checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"
[[package]]
name = "futures-util"
version = "0.3.24"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
dependencies = [
"futures-channel",
"futures-core",

View file

@ -159,7 +159,6 @@ impl Client {
address: impl TryInto<ServerAddress>,
) -> Result<(Self, UnboundedReceiver<Event>), JoinError> {
let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?;
let resolved_address = resolver::resolve_address(&address).await?;
let mut conn = Connection::new(&resolved_address).await?;

View file

@ -11,7 +11,7 @@
#![feature(error_generic_member_access)]
#![feature(provide_any)]
use std::str::FromStr;
use std::{net::SocketAddr, str::FromStr};
#[cfg(feature = "connecting")]
pub mod connect;
@ -33,13 +33,12 @@ pub mod write;
/// assert_eq!(addr.host, "localhost");
/// assert_eq!(addr.port, 25565);
/// ```
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct ServerAddress {
pub host: String,
pub port: u16,
}
// impl try_from for ServerAddress
impl<'a> TryFrom<&'a str> for ServerAddress {
type Error = String;
@ -57,6 +56,18 @@ impl<'a> TryFrom<&'a str> for ServerAddress {
}
}
impl From<SocketAddr> for ServerAddress {
/// Convert an existing SocketAddr into a ServerAddress. This just converts
/// the ip to a string and passes along the port. The resolver will realize
/// it's already an IP address and not do any DNS requests.
fn from(addr: SocketAddr) -> Self {
ServerAddress {
host: addr.ip().to_string(),
port: addr.port(),
}
}
}
#[cfg(feature = "connecting")]
pub async fn connect(address: ServerAddress) -> Result<(), Box<dyn std::error::Error>> {
use log::debug;

View file

@ -17,6 +17,7 @@ azalea-core = { version = "0.3.0", path = "../azalea-core" }
parking_lot = "^0.12.1"
thiserror = "^1.0.37"
tokio = "^1.21.1"
futures = "0.3.25"
[dev-dependencies]
anyhow = "^1.0.65"

View file

@ -1,13 +1,14 @@
use azalea::{Account, Accounts, Client, Event, Swarm};
use azalea::prelude::*;
use parking_lot::Mutex;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let accounts = Accounts::new();
let mut accounts = Vec::new();
let mut states = Vec::new();
for i in 0..10 {
accounts.add(Account::offline(&format!("bot{}", i)));
accounts.push(Account::offline(&format!("bot{}", i)));
}
azalea::start_swarm(azalea::SwarmOptions {
@ -15,13 +16,13 @@ async fn main() {
address: "localhost",
swarm_state: State::default(),
state: State::default(),
states,
swarm_plugins: plugins![azalea_pathfinder::Plugin::default()],
plugins: plugins![],
handle: Box::new(handle),
swarm_handle: Box::new(swarm_handle),
handle,
swarm_handle,
})
.await
.unwrap();

View file

@ -60,128 +60,16 @@
mod bot;
pub mod prelude;
mod start;
mod swarm;
pub use azalea_client::*;
pub use azalea_core::{BlockPos, Vec3};
use azalea_protocol::ServerAddress;
use std::{future::Future, sync::Arc};
use thiserror::Error;
pub use start::{start, Options};
pub use swarm::*;
pub type HandleFn<Fut, S> = fn(Client, Event, S) -> Fut;
/// The options that are passed to [`azalea::start`].
///
/// [`azalea::start`]: fn.start.html
pub struct Options<S, A, Fut>
where
A: TryInto<ServerAddress>,
Fut: Future<Output = Result<(), anyhow::Error>>,
{
/// The address of the server that we're connecting to. This can be a
/// `&str`, [`ServerAddress`], or anything that implements
/// `TryInto<ServerAddress>`.
///
/// [`ServerAddress`]: azalea_protocol::ServerAddress
pub address: A,
/// The account that's going to join the server.
pub account: Account,
/// The plugins that are going to be used. Plugins are external crates that
/// add extra functionality to Azalea. You should use the [`plugins`] macro
/// for this field.
///
/// ```rust,no_run
/// plugins![azalea_pathfinder::Plugin::default()]
/// ```
pub plugins: Plugins,
/// A struct that contains the data that you want your bot to remember
/// across events.
///
/// # Examples
///
/// ```rust
/// use parking_lot::Mutex;
/// use std::sync::Arc;
///
/// #[derive(Default, Clone)]
/// struct State {
/// farming: Arc<Mutex<bool>>,
/// }
/// ```
pub state: S,
/// The function that's called whenever we get an event.
///
/// # Examples
///
/// ```rust
/// use azalea::prelude::*;
///
/// async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> {
/// Ok(())
/// }
/// ```
pub handle: HandleFn<Fut, S>,
}
#[derive(Error, Debug)]
pub enum Error {
#[error("Invalid address")]
InvalidAddress,
#[error("Join error: {0}")]
Join(#[from] azalea_client::JoinError),
}
/// Join a server and start handling events. This function will run forever until
/// it gets disconnected from the server.
///
/// # Examples
///
/// ```rust,no_run
/// let error = azalea::start(azalea::Options {
/// account,
/// address: "localhost",
/// state: State::default(),
/// plugins: plugins![azalea_pathfinder::Plugin::default()],
/// handle,
/// }).await;
/// ```
pub async fn start<
S: Send + Sync + Clone + 'static,
A: Send + TryInto<ServerAddress>,
Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
>(
options: Options<S, A, Fut>,
) -> Result<(), Error> {
let address = match options.address.try_into() {
Ok(address) => address,
Err(_) => return Err(Error::InvalidAddress),
};
let (mut bot, mut rx) = Client::join(&options.account, address).await?;
let mut plugins = options.plugins;
plugins.add(bot::Plugin::default());
bot.plugins = Arc::new(plugins);
let state = options.state;
while let Some(event) = rx.recv().await {
let cloned_plugins = (*bot.plugins).clone();
for plugin in cloned_plugins.into_iter() {
tokio::spawn(plugin.handle(event.clone(), bot.clone()));
}
let bot_plugin = bot.plugins.get::<bot::Plugin>().unwrap().clone();
tokio::spawn(bot::Plugin::handle(
Box::new(bot_plugin),
event.clone(),
bot.clone(),
));
tokio::spawn((options.handle)(bot.clone(), event.clone(), state.clone()));
}
Ok(())
}
/// A helper macro that generates a [`Plugins`] struct from a list of objects
/// that implement [`Plugin`].
///

118
azalea/src/start.rs Normal file
View file

@ -0,0 +1,118 @@
use crate::{bot, HandleFn};
use azalea_client::{Account, Client, Plugin, Plugins};
use azalea_protocol::ServerAddress;
use std::{future::Future, sync::Arc};
use thiserror::Error;
/// The options that are passed to [`azalea::start`].
///
/// [`azalea::start`]: crate::start
pub struct Options<S, A, Fut>
where
A: TryInto<ServerAddress>,
Fut: Future<Output = Result<(), anyhow::Error>>,
{
/// The address of the server that we're connecting to. This can be a
/// `&str`, [`ServerAddress`], or anything that implements
/// `TryInto<ServerAddress>`.
///
/// [`ServerAddress`]: azalea_protocol::ServerAddress
pub address: A,
/// The account that's going to join the server.
pub account: Account,
/// The plugins that are going to be used. Plugins are external crates that
/// add extra functionality to Azalea. You should use the [`plugins`] macro
/// for this field.
///
/// ```rust,no_run
/// plugins![azalea_pathfinder::Plugin::default()]
/// ```
pub plugins: Plugins,
/// A struct that contains the data that you want your bot to remember
/// across events.
///
/// # Examples
///
/// ```rust
/// use parking_lot::Mutex;
/// use std::sync::Arc;
///
/// #[derive(Default, Clone)]
/// struct State {
/// farming: Arc<Mutex<bool>>,
/// }
/// ```
pub state: S,
/// The function that's called whenever we get an event.
///
/// # Examples
///
/// ```rust
/// use azalea::prelude::*;
///
/// async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> {
/// Ok(())
/// }
/// ```
pub handle: HandleFn<Fut, S>,
}
#[derive(Error, Debug)]
pub enum StartError {
#[error("Invalid address")]
InvalidAddress,
#[error("Join error: {0}")]
Join(#[from] azalea_client::JoinError),
}
/// Join a server and start handling events. This function will run forever until
/// it gets disconnected from the server.
///
/// # Examples
///
/// ```rust,no_run
/// let error = azalea::start(azalea::Options {
/// account,
/// address: "localhost",
/// state: State::default(),
/// plugins: plugins![azalea_pathfinder::Plugin::default()],
/// handle,
/// }).await;
/// ```
pub async fn start<
S: Send + Sync + Clone + 'static,
A: Send + TryInto<ServerAddress>,
Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
>(
options: Options<S, A, Fut>,
) -> Result<(), StartError> {
let address = match options.address.try_into() {
Ok(address) => address,
Err(_) => return Err(StartError::InvalidAddress),
};
let (mut bot, mut rx) = Client::join(&options.account, address).await?;
let mut plugins = options.plugins;
plugins.add(bot::Plugin::default());
bot.plugins = Arc::new(plugins);
let state = options.state;
while let Some(event) = rx.recv().await {
let cloned_plugins = (*bot.plugins).clone();
for plugin in cloned_plugins.into_iter() {
tokio::spawn(plugin.handle(event.clone(), bot.clone()));
}
let bot_plugin = bot.plugins.get::<bot::Plugin>().unwrap().clone();
tokio::spawn(bot::Plugin::handle(
Box::new(bot_plugin),
event.clone(),
bot.clone(),
));
tokio::spawn((options.handle)(bot.clone(), event.clone(), state.clone()));
}
Ok(())
}

152
azalea/src/swarm.rs Normal file
View file

@ -0,0 +1,152 @@
use crate::{bot, HandleFn};
use azalea_client::{Account, Client, Event, JoinError, Plugin, Plugins};
use azalea_protocol::{
resolver::{self, ResolverError},
ServerAddress,
};
use futures::{
future::{select_all, try_join_all},
FutureExt,
};
use parking_lot::Mutex;
use std::{future::Future, sync::Arc};
use thiserror::Error;
use tokio::sync::mpsc::UnboundedReceiver;
/// A swarm is a way to conveniently control many bots at once, while also
/// being able to control bots at an individual level when desired.
#[derive(Clone)]
pub struct Swarm {
bots: Arc<Mutex<Vec<Client>>>,
receivers: Arc<Mutex<Vec<UnboundedReceiver<Event>>>>,
}
/// An event about something that doesn't have to do with a single bot.
#[derive(Clone, Debug)]
pub enum SwarmEvent {
/// All the bots in the swarm have successfully joined the server.
Login,
}
/// The options that are passed to [`azalea::start_swarm`].
///
/// [`azalea::start`]: crate::start_swarm
pub struct SwarmOptions<S, SS, A, Fut>
where
A: TryInto<ServerAddress>,
Fut: Future<Output = Result<(), anyhow::Error>>,
{
/// The address of the server that we're connecting to. This can be a
/// `&str`, [`ServerAddress`], or anything that implements
/// `TryInto<ServerAddress>`.
///
/// [`ServerAddress`]: azalea_protocol::ServerAddress
pub address: A,
/// The accounts that are going to join the server.
pub accounts: Vec<Account>,
pub plugins: Plugins,
pub swarm_plugins: Plugins,
/// The individual bot states. This must be the same length as `accounts`,
/// since each bot gets one state.
pub states: Vec<S>,
pub swarm_state: SS,
pub handle: HandleFn<Fut, S>,
pub swarm_handle: HandleFn<Fut, S>,
}
#[derive(Error, Debug)]
pub enum SwarmStartError {
#[error("Invalid address")]
InvalidAddress,
#[error(transparent)]
ResolveAddress(#[from] ResolverError),
#[error("Join error: {0}")]
Join(#[from] azalea_client::JoinError),
}
/// Make a bot swarm.
pub async fn start_swarm<
S: Send + Sync + Clone + 'static,
SS: Send + Sync + Clone + 'static,
A: Send + TryInto<ServerAddress>,
Fut: Future<Output = Result<(), anyhow::Error>> + Send + 'static,
>(
options: SwarmOptions<S, SS, A, Fut>,
) -> Result<(), SwarmStartError> {
assert_eq!(
options.accounts.len(),
options.states.len(),
"There must be exactly one state per bot."
);
// convert the TryInto<ServerAddress> into a ServerAddress
let address = match options.address.try_into() {
Ok(address) => address,
Err(_) => return Err(SwarmStartError::InvalidAddress),
};
// resolve the address
let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?;
let resolved_address = resolver::resolve_address(&address).await?;
let mut bots = try_join_all(
options
.accounts
.iter()
.map(|account| Client::join(&account, resolved_address)),
)
.await?;
// extract it into two different vecs
let (mut bots, receivers) = bots
.into_iter()
.unzip::<Client, UnboundedReceiver<Event>, Vec<Client>, Vec<UnboundedReceiver<Event>>>();
for bot in &mut bots {
// each bot has its own plugins instance, they're not shared
let mut plugins = options.plugins.clone();
plugins.add(bot::Plugin::default());
bot.plugins = Arc::new(plugins);
}
let mut swarm = Swarm {
bots: Arc::new(Mutex::new(bots)),
receivers: Arc::new(Mutex::new(receivers)),
};
let states = options.states;
let swarm_state = options.swarm_state;
while let (Some(event), bot_index) = swarm.bot_recv().await {
let bot = swarm.bots.lock()[bot_index].clone();
let bot_state = states[bot_index].clone();
let cloned_plugins = (*bot.plugins).clone();
for plugin in cloned_plugins.into_iter() {
tokio::spawn(plugin.handle(event.clone(), bot.clone()));
}
let bot_plugin = bot.plugins.get::<bot::Plugin>().unwrap().clone();
tokio::spawn(bot::Plugin::handle(
Box::new(bot_plugin),
event.clone(),
bot.clone(),
));
tokio::spawn((options.handle)(bot, event, bot_state));
}
Ok(())
}
impl Swarm {
/// Wait for any bot to get an event. We return the event and index (so we
/// can get the state and bot from that index)
async fn bot_recv(&mut self) -> (Option<Event>, usize) {
let mut receivers = self.receivers.lock();
let mut futures = Vec::with_capacity(receivers.len());
for rx in receivers.iter_mut() {
futures.push(rx.recv().boxed());
}
let (event, index, _remaining) = select_all(futures).await;
(event, index)
}
}