1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

start adding Swarm::add_account

This commit is contained in:
mat 2022-11-20 03:32:18 -06:00
parent 141cd227e0
commit 3c59b156a9
10 changed files with 165 additions and 81 deletions

2
Cargo.lock generated
View file

@ -121,6 +121,7 @@ dependencies = [
"azalea-world",
"env_logger",
"futures",
"log",
"nohash-hasher",
"num-traits",
"parking_lot",
@ -403,6 +404,7 @@ dependencies = [
"azalea",
"env_logger",
"parking_lot",
"rand",
"tokio",
"uuid",
]

View file

@ -191,7 +191,7 @@ impl Client {
let resolved_address = resolver::resolve_address(&address).await?;
let conn = Connection::new(&resolved_address).await?;
let (conn, game_profile) = Self::handshake(conn, account, address).await?;
let (conn, game_profile) = Self::handshake(conn, account, &address).await?;
let (tx, rx) = mpsc::unbounded_channel();
@ -211,7 +211,7 @@ impl Client {
pub async fn handshake(
mut conn: Connection<ClientboundHandshakePacket, ServerboundHandshakePacket>,
account: &Account,
address: ServerAddress,
address: &ServerAddress,
) -> Result<
(
Connection<ClientboundGamePacket, ServerboundGamePacket>,

View file

@ -321,6 +321,7 @@ impl Client {
/// Sets your rotation. `y_rot` is yaw (looking to the side), `x_rot` is
/// pitch (looking up and down). You can get these numbers from the vanilla
/// f3 screen.
/// `y_rot` goes from -180 to 180, and `x_rot` goes from -90 to 90.
pub fn set_rotation(&mut self, y_rot: f32, x_rot: f32) {
let mut player_entity = self.entity_mut();
player_entity.set_rotation(y_rot, x_rot);

View file

@ -18,6 +18,7 @@ azalea-physics = {version = "0.3.0", path = "../azalea-physics"}
azalea-protocol = {version = "0.3.0", path = "../azalea-protocol"}
azalea-world = {version = "0.3.0", path = "../azalea-world"}
futures = "0.3.25"
log = "0.4.17"
nohash-hasher = "0.2.0"
num-traits = "0.2.15"
parking_lot = {version = "^0.12.1", features = ["deadlock_detection"]}

View file

@ -40,7 +40,7 @@ async fn handle(bot: Client, event: Event, state: State) -> anyhow::Result<()> {
Ok(())
}
async fn swarm_handle(swarm: Swarm, event: Event, state: SwarmState) -> anyhow::Result<()> {
async fn swarm_handle<S>(swarm: Swarm<S>, event: Event, state: SwarmState) -> anyhow::Result<()> {
match event {
Event::Login => {
swarm.goto(azalea::BlockPos::new(0, 70, 0)).await;

View file

@ -32,7 +32,7 @@ struct State {}
struct SwarmState {}
async fn handle(bot: Client, event: Event, state: State) {}
async fn swarm_handle(swarm: Swarm, event: Event, state: State) {
async fn swarm_handle<S>(swarm: Swarm<S>, event: Event, state: State) {
match event {
Event::Tick => {
// choose an arbitrary player within render distance to target

View file

@ -7,18 +7,19 @@ use azalea_client::{
Account, Client, ClientInformation, Event, JoinError, PhysicsState, Player, Plugin, Plugins,
};
use azalea_protocol::{
connect::Connection,
connect::{Connection, ConnectionError},
resolver::{self, ResolverError},
ServerAddress,
};
use azalea_world::WeakWorldContainer;
use azalea_world::World;
use futures::{
future::{select_all, try_join_all},
future::{join_all, select_all, try_join_all},
FutureExt,
};
use log::error;
use parking_lot::{Mutex, RwLock};
use std::{any::Any, future::Future, sync::Arc};
use std::{any::Any, future::Future, net::SocketAddr, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver};
@ -43,10 +44,20 @@ macro_rules! swarm_plugins {
/// A swarm is a way to conveniently control many bots at once, while also
/// being able to control bots at an individual level when desired.
///
/// The `S` type parameter is the type of the state for individual clients.
/// It's used to make the [`Swarm::add_account`] function work.
#[derive(Clone)]
pub struct Swarm {
pub struct Swarm<S> {
bots: Arc<Mutex<Vec<Client>>>,
receivers: Arc<Mutex<Vec<UnboundedReceiver<Event>>>>,
states: Arc<Mutex<Vec<S>>>,
resolved_address: SocketAddr,
address: ServerAddress,
world_container: Arc<RwLock<WeakWorldContainer>>,
/// Plugins that are set for new bots
plugins: Plugins,
}
/// An event about something that doesn't have to do with a single bot.
@ -56,7 +67,7 @@ pub enum SwarmEvent {
Login,
}
pub type SwarmHandleFn<Fut, S> = fn(Swarm, SwarmEvent, S) -> Fut;
pub type SwarmHandleFn<Fut, S, SS> = fn(Swarm<S>, SwarmEvent, SS) -> Fut;
/// The options that are passed to [`azalea::start_swarm`].
///
@ -76,13 +87,19 @@ where
/// The accounts that are going to join the server.
pub accounts: Vec<Account>,
pub plugins: Plugins,
pub swarm_plugins: SwarmPlugins,
pub swarm_plugins: SwarmPlugins<S>,
/// The individual bot states. This must be the same length as `accounts`,
/// since each bot gets one state.
pub states: Vec<S>,
pub swarm_state: SS,
pub handle: HandleFn<Fut, S>,
pub swarm_handle: SwarmHandleFn<SwarmFut, SS>,
pub swarm_handle: SwarmHandleFn<SwarmFut, S, SS>,
/// How long we should wait between each bot joining the server. Set to
/// None to have every bot connect at the same time. None is different than
/// a duration of 0, since if a duration is present the bots will wait for
/// the previous one to be ready.
pub join_delay: Option<std::time::Duration>,
}
#[derive(Error, Debug)]
@ -120,52 +137,62 @@ pub async fn start_swarm<
// resolve the address
let address: ServerAddress = address.try_into().map_err(|_| JoinError::InvalidAddress)?;
let resolved_address = resolver::resolve_address(&address).await?;
let address_borrow = &address;
let shared_world_container = Arc::new(RwLock::new(WeakWorldContainer::default()));
let shared_world_container_borrow = &shared_world_container;
let world_container = Arc::new(RwLock::new(WeakWorldContainer::default()));
let bots: Vec<(Client, UnboundedReceiver<Event>)> = try_join_all(options.accounts.iter().map(
async move |account| -> Result<(Client, UnboundedReceiver<Event>), JoinError> {
let conn = Connection::new(&resolved_address).await?;
let (conn, game_profile) =
Client::handshake(conn, account, address_borrow.clone()).await?;
let (tx, rx) = mpsc::unbounded_channel();
let client = Client::new(
game_profile,
conn,
Some(shared_world_container_borrow.clone()),
);
tx.send(Event::Initialize).unwrap();
client.start_tasks(tx);
Ok((client, rx))
},
))
.await?;
// extract it into two different vecs
let (mut bots, receivers) = bots
.into_iter()
.unzip::<Client, UnboundedReceiver<Event>, Vec<Client>, Vec<UnboundedReceiver<Event>>>();
for bot in &mut bots {
// each bot has its own plugins instance, they're not shared
let mut plugins = options.plugins.clone();
plugins.add(bot::Plugin::default());
bot.plugins = Arc::new(plugins);
}
let mut plugins = options.plugins;
plugins.add(bot::Plugin::default());
let mut swarm = Swarm {
bots: Arc::new(Mutex::new(bots)),
receivers: Arc::new(Mutex::new(receivers)),
bots: Arc::new(Mutex::new(Vec::new())),
receivers: Arc::new(Mutex::new(Vec::new())),
states: Arc::new(Mutex::new(Vec::new())),
resolved_address,
address,
world_container,
plugins,
};
let states = options.states;
let mut swarm_clone = swarm.clone();
let join_task = tokio::spawn(async move {
if let Some(join_delay) = options.join_delay {
// if there's a join delay, then join one by one
for (account, state) in options.accounts.iter().zip(options.states) {
// exponential backoff
let mut disconnects = 0;
while let Err(e) = swarm_clone.add_account(account, state.clone()).await {
disconnects += 1;
let delay = (Duration::from_secs(5) * 2u32.pow(disconnects))
.min(Duration::from_secs(120));
error!("Error joining account: {e}. Waiting {delay:?} and trying again.");
tokio::time::sleep(delay).await;
}
tokio::time::sleep(join_delay).await;
}
} else {
let swarm_borrow = &swarm_clone;
join_all(options.accounts.iter().zip(options.states).map(
async move |(account, state)| -> Result<(), JoinError> {
// exponential backoff
let mut disconnects = 0;
while let Err(e) = swarm_borrow
.clone()
.add_account(account, state.clone())
.await
{
disconnects += 1;
let delay = (Duration::from_secs(5) * 2u32.pow(disconnects))
.min(Duration::from_secs(120));
error!("Error joining account: {e}. Waiting {delay:?} and trying again.");
tokio::time::sleep(delay).await;
}
Ok(())
},
))
.await;
}
});
let swarm_state = options.swarm_state;
let mut internal_state = InternalSwarmState::default();
@ -186,19 +213,12 @@ pub async fn start_swarm<
// bot events
while let (Some(event), bot_index) = swarm.bot_recv().await {
let bot = swarm.bots.lock()[bot_index].clone();
let bot_state = states[bot_index].clone();
let bot_state = swarm.states.lock()[bot_index].clone();
let cloned_plugins = (*bot.plugins).clone();
for plugin in cloned_plugins.into_iter() {
tokio::spawn(plugin.handle(event.clone(), bot.clone()));
}
let bot_plugin = bot.plugins.get::<bot::Plugin>().unwrap().clone();
tokio::spawn(bot::Plugin::handle(
Box::new(bot_plugin),
event.clone(),
bot.clone(),
));
// swarm event handling
match &event {
Event::Login => {
@ -213,14 +233,22 @@ pub async fn start_swarm<
tokio::spawn((options.handle)(bot, event, bot_state));
}
let _ = join_task.abort();
Ok(())
}
impl Swarm {
impl<S> Swarm<S>
where
S: Send + Sync + Clone + 'static,
{
/// Wait for any bot to get an event. We return the event and index (so we
/// can get the state and bot from that index)
async fn bot_recv(&mut self) -> (Option<Event>, usize) {
let mut receivers = self.receivers.lock();
if receivers.is_empty() {
// TODO
}
let mut futures = Vec::with_capacity(receivers.len());
for rx in receivers.iter_mut() {
futures.push(rx.recv().boxed());
@ -228,6 +256,25 @@ impl Swarm {
let (event, index, _remaining) = select_all(futures).await;
(event, index)
}
/// Add a new account as part of the swarm.
pub async fn add_account(&mut self, account: &Account, state: S) -> Result<Client, JoinError> {
let conn = Connection::new(&self.resolved_address).await?;
let (conn, game_profile) = Client::handshaw ake(conn, account, &self.address.clone()).await?;
let (tx, rx) = mpsc::unbounded_channel();
let mut client = Client::new(game_profile, conn, Some(self.world_container.clone()));
tx.send(Event::Initialize).unwrap();
client.start_tasks(tx);
client.plugins = Arc::new(self.plugins.clone());
self.bots.lock().push(client.clone());
self.receivers.lock().push(rx);
self.states.lock().push(state.clone());
Ok(client)
}
}
#[derive(Default)]
@ -235,3 +282,9 @@ struct InternalSwarmState {
/// The number of clients connected to the server
pub clients_joined: usize,
}
impl From<ConnectionError> for SwarmStartError {
fn from(e: ConnectionError) -> Self {
SwarmStartError::from(JoinError::from(e))
}
}

View file

@ -13,16 +13,16 @@ use std::{
///
/// If you're using azalea, you should generate this from the `plugins!` macro.
#[derive(Clone)]
pub struct SwarmPlugins {
map: Option<HashMap<TypeId, Box<dyn SwarmPlugin>, BuildHasherDefault<NoHashHasher<u64>>>>,
pub struct SwarmPlugins<S> {
map: Option<HashMap<TypeId, Box<dyn SwarmPlugin<S>>, BuildHasherDefault<NoHashHasher<u64>>>>,
}
impl SwarmPlugins {
impl<S> SwarmPlugins<S> {
pub fn new() -> Self {
Self { map: None }
}
pub fn add<T: SwarmPlugin>(&mut self, plugin: T) {
pub fn add<T: SwarmPlugin<S>>(&mut self, plugin: T) {
if self.map.is_none() {
self.map = Some(HashMap::with_hasher(BuildHasherDefault::default()));
}
@ -32,7 +32,7 @@ impl SwarmPlugins {
.insert(TypeId::of::<T>(), Box::new(plugin));
}
pub fn get<T: SwarmPlugin>(&self) -> Option<&T> {
pub fn get<T: SwarmPlugin<S>>(&self) -> Option<&T> {
self.map
.as_ref()
.and_then(|map| map.get(&TypeId::of::<T>()))
@ -40,8 +40,8 @@ impl SwarmPlugins {
}
}
impl IntoIterator for SwarmPlugins {
type Item = Box<dyn SwarmPlugin>;
impl<S> IntoIterator for SwarmPlugins<S> {
type Item = Box<dyn SwarmPlugin<S>>;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
@ -54,24 +54,24 @@ impl IntoIterator for SwarmPlugins {
/// Plugins can keep their own personal state, listen to events, and add new functions to Client.
#[async_trait]
pub trait SwarmPlugin: Send + Sync + SwarmPluginClone + Any + 'static {
async fn handle(self: Box<Self>, event: SwarmEvent, swarm: Swarm);
pub trait SwarmPlugin<S>: Send + Sync + SwarmPluginClone<S> + Any + 'static {
async fn handle(self: Box<Self>, event: SwarmEvent, swarm: Swarm<S>);
}
/// An internal trait that allows Plugin to be cloned.
#[doc(hidden)]
pub trait SwarmPluginClone {
fn clone_box(&self) -> Box<dyn SwarmPlugin>;
pub trait SwarmPluginClone<S> {
fn clone_box(&self) -> Box<dyn SwarmPlugin<S>>;
}
impl<T> SwarmPluginClone for T
impl<T, S> SwarmPluginClone<S> for T
where
T: 'static + SwarmPlugin + Clone,
T: 'static + SwarmPlugin<S> + Clone,
{
fn clone_box(&self) -> Box<dyn SwarmPlugin> {
fn clone_box(&self) -> Box<dyn SwarmPlugin<S>> {
Box::new(self.clone())
}
}
impl Clone for Box<dyn SwarmPlugin> {
impl<S> Clone for Box<dyn SwarmPlugin<S>> {
fn clone(&self) -> Self {
self.clone_box()
}

View file

@ -12,5 +12,6 @@ anyhow = "1.0.65"
azalea = {path = "../azalea"}
env_logger = "0.9.1"
parking_lot = {version = "^0.12.1", features = ["deadlock_detection"]}
rand = "0.8.5"
tokio = "1.19.2"
uuid = "1.1.2"

View file

@ -1,9 +1,14 @@
use azalea::pathfinder::BlockPosGoal;
use azalea::{prelude::*, BlockPos, Swarm, SwarmEvent};
use azalea::{prelude::*, BlockPos, Swarm, SwarmEvent, WalkDirection};
use azalea::{Account, Client, Event};
use parking_lot::Mutex;
use rand::Rng;
use std::sync::Arc;
#[derive(Default, Clone)]
struct State {}
struct State {
moving: Arc<Mutex<bool>>,
}
#[derive(Default, Clone)]
struct SwarmState {}
@ -48,7 +53,7 @@ async fn main() -> anyhow::Result<()> {
loop {
let e = azalea::start_swarm(azalea::SwarmOptions {
accounts: accounts.clone(),
address: "localhost",
address: "92.222.245.59",
states: states.clone(),
swarm_state: SwarmState::default(),
@ -58,13 +63,15 @@ async fn main() -> anyhow::Result<()> {
handle,
swarm_handle,
join_delay: Some(std::time::Duration::from_secs(5)),
})
.await;
println!("{:?}", e);
}
}
async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()> {
async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<()> {
match event {
Event::Login => {
bot.chat("Hello world").await?;
@ -89,6 +96,25 @@ async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()>
println!("initialized");
}
Event::Tick => {
// look in a random direction and walk for 1-3 seconds
{
let mut moving = state.moving.lock();
if *moving {
return Ok(());
}
*moving = true;
}
let (rotation, duration) = {
let mut rng = rand::thread_rng();
(rng.gen_range(0.0..360.0), rng.gen_range(1..3))
};
bot.set_rotation(rotation, 0.);
bot.walk(WalkDirection::Forward);
tokio::time::sleep(tokio::time::Duration::from_secs(duration)).await;
bot.walk(WalkDirection::None);
*state.moving.lock() = false;
// bot.jump();
}
_ => {}
@ -97,8 +123,8 @@ async fn handle(bot: Client, event: Event, _state: State) -> anyhow::Result<()>
Ok(())
}
async fn swarm_handle(
mut _swarm: Swarm,
async fn swarm_handle<S>(
mut _swarm: Swarm<S>,
_event: SwarmEvent,
_state: SwarmState,
) -> anyhow::Result<()> {