1
2
Fork 0
mirror of https://github.com/mat-1/azalea.git synced 2025-08-02 14:26:04 +00:00

add SwarmEvent::Disconnect(Account)

This commit is contained in:
mat 2022-11-20 18:30:19 -06:00
parent 5f3740b64c
commit df8af1c2e1
5 changed files with 116 additions and 75 deletions

5
Cargo.lock generated
View file

@ -128,6 +128,7 @@ dependencies = [
"priority-queue",
"thiserror",
"tokio",
"uuid",
]
[[package]]
@ -2088,9 +2089,9 @@ dependencies = [
[[package]]
name = "uuid"
version = "1.2.1"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83"
checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c"
[[package]]
name = "vcpkg"

View file

@ -342,7 +342,7 @@ impl Client {
}
/// Disconnect from the server, ending all tasks.
pub async fn shutdown(self) -> Result<(), std::io::Error> {
pub async fn disconnect(self) -> Result<(), std::io::Error> {
self.write_conn.lock().await.shutdown().await?;
let tasks = self.tasks.lock();
for task in tasks.iter() {
@ -382,10 +382,10 @@ impl Client {
Err(e) => {
if let ReadPacketError::ConnectionClosed = e {
info!("Connection closed");
if let Err(e) = client.shutdown().await {
if let Err(e) = client.disconnect().await {
error!("Error shutting down connection: {:?}", e);
}
return;
break;
}
if IGNORE_ERRORS {
warn!("{}", e);

View file

@ -25,6 +25,7 @@ parking_lot = {version = "^0.12.1", features = ["deadlock_detection"]}
priority-queue = "1.3.0"
thiserror = "^1.0.37"
tokio = "^1.21.2"
uuid = "1.2.2"
[dev-dependencies]
anyhow = "^1.0.65"

View file

@ -15,6 +15,7 @@ use parking_lot::{Mutex, RwLock};
use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration};
use thiserror::Error;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use uuid::Uuid;
/// A helper macro that generates a [`Plugins`] struct from a list of objects
/// that implement [`Plugin`].
@ -39,10 +40,10 @@ macro_rules! swarm_plugins {
/// being able to control bots at an individual level when desired.
///
/// The `S` type parameter is the type of the state for individual bots.
/// It's used to make the [`Swarm::add_account`] function work.
/// It's used to make the [`Swarm::add`] function work.
#[derive(Clone)]
pub struct Swarm<S> {
bots: Arc<Mutex<Vec<SwarmBotData<S>>>>,
bot_datas: Arc<Mutex<Vec<(Client, S)>>>,
resolved_address: SocketAddr,
address: ServerAddress,
@ -52,15 +53,10 @@ pub struct Swarm<S> {
/// A single receiver that combines all the receivers of all the bots.
/// (bot index, event)
bots_rx: Arc<Mutex<UnboundedReceiver<(Option<Event>, SwarmBotData<S>)>>>,
bots_tx: Arc<Mutex<UnboundedSender<(Option<Event>, SwarmBotData<S>)>>>,
}
bots_rx: Arc<Mutex<UnboundedReceiver<(Option<Event>, (Client, S))>>>,
bots_tx: UnboundedSender<(Option<Event>, (Client, S))>,
/// The data stored for each bot in the swarm.
#[derive(Clone)]
pub struct SwarmBotData<S> {
pub bot: Client,
pub state: S,
swarm_tx: UnboundedSender<SwarmEvent>,
}
/// An event about something that doesn't have to do with a single bot.
@ -68,6 +64,11 @@ pub struct SwarmBotData<S> {
pub enum SwarmEvent {
/// All the bots in the swarm have successfully joined the server.
Login,
/// A bot got disconnected from the server.
///
/// You can implement an auto-reconnect by calling [`Swarm::add`]
/// with the account from this event.
Disconnect(Account),
}
pub type SwarmHandleFn<Fut, S, SS> = fn(Swarm<S>, SwarmEvent, SS) -> Fut;
@ -147,9 +148,10 @@ pub async fn start_swarm<
plugins.add(bot::Plugin::default());
let (bots_tx, mut bots_rx) = mpsc::unbounded_channel();
let (swarm_tx, mut swarm_rx) = mpsc::unbounded_channel();
let mut swarm = Swarm {
bots: Arc::new(Mutex::new(Vec::new())),
bot_datas: Arc::new(Mutex::new(Vec::new())),
resolved_address,
address,
@ -157,7 +159,9 @@ pub async fn start_swarm<
plugins,
bots_rx: Arc::new(Mutex::new(bots_rx)),
bots_tx: Arc::new(Mutex::new(bots_tx)),
bots_tx: bots_tx,
swarm_tx: swarm_tx.clone(),
};
let mut swarm_clone = swarm.clone();
@ -167,13 +171,9 @@ pub async fn start_swarm<
for (account, state) in options.accounts.iter().zip(options.states) {
// exponential backoff
let mut disconnects = 0;
while let Err(e) = swarm_clone.add_account(account, state.clone()).await {
disconnects += 1;
let delay = (Duration::from_secs(5) * 2u32.pow(disconnects))
.min(Duration::from_secs(120));
error!("Error joining account: {e}. Waiting {delay:?} and trying again.");
tokio::time::sleep(delay).await;
}
swarm_clone
.add_with_exponential_backoff(account, state.clone())
.await;
tokio::time::sleep(join_delay).await;
}
} else {
@ -182,17 +182,10 @@ pub async fn start_swarm<
async move |(account, state)| -> Result<(), JoinError> {
// exponential backoff
let mut disconnects = 0;
while let Err(e) = swarm_borrow
swarm_borrow
.clone()
.add_account(account, state.clone())
.await
{
disconnects += 1;
let delay = (Duration::from_secs(5) * 2u32.pow(disconnects))
.min(Duration::from_secs(120));
error!("Error joining account: {e}. Waiting {delay:?} and trying again.");
tokio::time::sleep(delay).await;
}
.add_with_exponential_backoff(account, state.clone())
.await;
Ok(())
},
))
@ -203,22 +196,24 @@ pub async fn start_swarm<
let swarm_state = options.swarm_state;
let mut internal_state = InternalSwarmState::default();
// Send an event to the swarm_handle function.
let cloned_swarm = swarm.clone();
let fire_swarm_event = move |event: SwarmEvent| {
let cloned_swarm_plugins = options.swarm_plugins.clone();
for plugin in cloned_swarm_plugins.into_iter() {
tokio::spawn(plugin.handle(event.clone(), cloned_swarm.clone()));
// Watch swarm_rx and send those events to the plugins and swarm_handle.
let swarm_clone = swarm.clone();
tokio::spawn(async move {
while let Some(event) = swarm_rx.recv().await {
let cloned_swarm_plugins = options.swarm_plugins.clone();
for plugin in cloned_swarm_plugins.into_iter() {
tokio::spawn(plugin.handle(event.clone(), swarm_clone.clone()));
}
tokio::spawn((options.swarm_handle)(
swarm_clone.clone(),
event,
swarm_state.clone(),
));
}
tokio::spawn((options.swarm_handle)(
cloned_swarm.clone(),
event,
swarm_state.clone(),
));
};
});
// bot events
while let (Some(event), SwarmBotData { bot, state, .. }) = swarm.bot_recv().await {
while let (Some(event), (bot, state)) = swarm.bot_recv().await {
// bot event handling
let cloned_plugins = (*bot.plugins).clone();
for plugin in cloned_plugins.into_iter() {
@ -229,8 +224,8 @@ pub async fn start_swarm<
match &event {
Event::Login => {
internal_state.bots_joined += 1;
if internal_state.bots_joined == swarm.bots.lock().len() {
fire_swarm_event(SwarmEvent::Login);
if internal_state.bots_joined == swarm.bot_datas.lock().len() {
swarm_tx.send(SwarmEvent::Login).unwrap();
}
}
_ => {}
@ -248,15 +243,15 @@ impl<S> Swarm<S>
where
S: Send + Sync + Clone + 'static,
{
/// Wait for any bot to get an event. We return the event and SwarmBotData
async fn bot_recv(&mut self) -> (Option<Event>, SwarmBotData<S>) {
/// Wait for any bot to get an event. We return the event and (Client, State)
async fn bot_recv(&mut self) -> (Option<Event>, (Client, S)) {
let mut bots_rx = self.bots_rx.lock();
let (event, bot) = bots_rx.recv().await.unwrap();
(event, bot)
}
/// Add a new account as part of the swarm.
pub async fn add_account(&mut self, account: &Account, state: S) -> Result<Client, JoinError> {
/// Add a new account to the swarm. You can remove it later by calling [`Client::disconnect`].
pub async fn add(&mut self, account: &Account, state: S) -> Result<Client, JoinError> {
let conn = Connection::new(&self.resolved_address).await?;
let (conn, game_profile) = Client::handshake(conn, account, &self.address.clone()).await?;
@ -272,29 +267,55 @@ where
let cloned_bots_tx = self.bots_tx.clone();
let cloned_bot = bot.clone();
let cloned_state = state.clone();
let owned_account = account.clone();
let bot_datas = self.bot_datas.clone();
let swarm_tx = self.swarm_tx.clone();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
// we can't handle events here (since we can't copy the handler),
// they're handled above in start_swarm
if let Err(e) = cloned_bots_tx.lock().send((
Some(event),
SwarmBotData {
bot: cloned_bot.clone(),
state: cloned_state.clone(),
},
)) {
if let Err(e) =
cloned_bots_tx.send((Some(event), (cloned_bot.clone(), cloned_state.clone())))
{
error!("Error sending event to swarm: {e}");
}
}
// the bot disconnected, so we remove it from the swarm
let mut bot_datas = bot_datas.lock();
let index = bot_datas
.iter()
.position(|(b, _)| b.profile.uuid == cloned_bot.profile.uuid)
.expect("bot disconnected but not found in swarm");
bot_datas.remove(index);
swarm_tx
.send(SwarmEvent::Disconnect(owned_account))
.unwrap();
});
self.bots.lock().push(SwarmBotData {
bot: bot.clone(),
state: state.clone(),
});
self.bot_datas.lock().push((bot.clone(), state.clone()));
Ok(bot)
}
/// Add a new account to the swarm, retrying if it couldn't join. This will
/// run forever until the bot joins or the task is aborted.
pub async fn add_with_exponential_backoff(&mut self, account: &Account, state: S) -> Client {
let mut disconnects = 0;
loop {
match self.add(account, state.clone()).await {
Ok(bot) => return bot,
Err(e) => {
disconnects += 1;
let delay = (Duration::from_secs(5) * 2u32.pow(disconnects))
.min(Duration::from_secs(120));
let username = account.username.clone();
error!("Error joining {username}: {e}. Waiting {delay:?} and trying again.");
tokio::time::sleep(delay).await;
}
}
}
}
}
#[derive(Default)]

View file

@ -4,6 +4,7 @@ use azalea::{Account, Client, Event};
use parking_lot::Mutex;
use rand::Rng;
use std::sync::Arc;
use std::time::Duration;
#[derive(Default, Clone)]
struct State {
@ -45,7 +46,7 @@ async fn main() -> anyhow::Result<()> {
let mut accounts = Vec::new();
let mut states = Vec::new();
for i in 0..5 {
for i in 0..10 {
accounts.push(Account::offline(&format!("bot{}", i)));
states.push(State::default());
}
@ -53,7 +54,7 @@ async fn main() -> anyhow::Result<()> {
loop {
let e = azalea::start_swarm(azalea::SwarmOptions {
accounts: accounts.clone(),
address: "92.222.245.59",
address: "localhost",
states: states.clone(),
swarm_state: SwarmState::default(),
@ -64,7 +65,7 @@ async fn main() -> anyhow::Result<()> {
handle,
swarm_handle,
join_delay: Some(std::time::Duration::from_secs(5)),
join_delay: None,
})
.await;
println!("{e:?}");
@ -77,6 +78,11 @@ async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<(
bot.chat("Hello world").await?;
}
Event::Chat(m) => {
if m.content() == bot.profile.name {
bot.chat("Bye").await?;
tokio::time::sleep(Duration::from_millis(50)).await;
bot.disconnect().await?;
}
// println!("{}", m.message().to_ansi(None));
// if m.message().to_string() == "<py5> goto" {
// let target_pos_vec3 = bot
@ -105,17 +111,21 @@ async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<(
*moving = true;
}
let (rotation, duration) = {
let mut rng = rand::thread_rng();
(rng.gen_range(0.0..360.0), rng.gen_range(1..3))
};
let rotation = rand::thread_rng().gen_range(0.0..360.0);
let duration = rand::thread_rng().gen_range(1..3);
let jumping = rand::thread_rng().gen_bool(0.5);
bot.set_rotation(rotation, 0.);
bot.walk(WalkDirection::Forward);
if jumping {
bot.set_jumping(true);
}
tokio::time::sleep(tokio::time::Duration::from_secs(duration)).await;
bot.walk(WalkDirection::None);
if jumping {
bot.set_jumping(false);
}
*state.moving.lock() = false;
// bot.jump();
}
_ => {}
}
@ -123,10 +133,18 @@ async fn handle(mut bot: Client, event: Event, state: State) -> anyhow::Result<(
Ok(())
}
async fn swarm_handle<S>(
mut _swarm: Swarm<S>,
_event: SwarmEvent,
async fn swarm_handle(
mut swarm: Swarm<State>,
event: SwarmEvent,
_state: SwarmState,
) -> anyhow::Result<()> {
match &event {
SwarmEvent::Disconnect(account) => {
println!("bot got kicked! {}", account.username);
tokio::time::sleep(Duration::from_secs(5)).await;
swarm.add(account, State::default()).await?;
}
_ => {}
}
Ok(())
}