From 5d8b7b7504069100351757fc575ea80ac3214d7b Mon Sep 17 00:00:00 2001 From: Garen Tyler Date: Thu, 5 Jun 2025 20:05:05 -0600 Subject: [PATCH] Split upstream and downstream connections --- src/net/connection.rs | 308 ----------------------- src/net/connection/downstream/manager.rs | 171 +++++++++++++ src/net/connection/downstream/mod.rs | 86 +++++++ src/net/connection/mod.rs | 92 +++++++ src/net/connection/upstream.rs | 83 ++++++ src/proxy/mod.rs | 17 +- 6 files changed, 442 insertions(+), 315 deletions(-) delete mode 100644 src/net/connection.rs create mode 100644 src/net/connection/downstream/manager.rs create mode 100644 src/net/connection/downstream/mod.rs create mode 100644 src/net/connection/mod.rs create mode 100644 src/net/connection/upstream.rs diff --git a/src/net/connection.rs b/src/net/connection.rs deleted file mode 100644 index 307cbdd..0000000 --- a/src/net/connection.rs +++ /dev/null @@ -1,308 +0,0 @@ -use super::{codec::PacketCodec, error::Error}; -use crate::protocol::{ - encryption::*, - packets::{self, Packet, PacketDirection}, - types::Chat, - ClientState, -}; -use futures::{stream::StreamExt, SinkExt}; -use rand::rngs::StdRng; -use rand::Rng; -use rand::SeedableRng; -use std::{ - collections::HashMap, - time::{Duration, Instant}, -}; -use tokio::{io::BufStream, net::TcpStream, sync::mpsc}; -use tokio::{ - net::{TcpListener, ToSocketAddrs}, - task::JoinHandle, -}; -use tokio_util::codec::{Decoder, Framed}; -use tokio_util::sync::CancellationToken; -use tracing::{error, trace}; - -#[derive(Debug)] -pub struct ConnectionManager { - max_clients: Option, - clients: HashMap, - channel: ( - mpsc::UnboundedSender, - mpsc::UnboundedReceiver, - ), -} -impl ConnectionManager { - pub fn new(max_clients: Option) -> ConnectionManager { - ConnectionManager { - max_clients, - clients: HashMap::new(), - channel: mpsc::unbounded_channel(), - } - } - pub fn client(&self, id: u128) -> Option<&Connection> { - self.clients.get(&id) - } - pub fn client_mut(&mut self, id: u128) -> Option<&mut Connection> { - self.clients.get_mut(&id) - } - pub fn clients(&self) -> impl Iterator { - self.clients.values() - } - pub fn clients_mut(&mut self) -> impl Iterator { - self.clients.values_mut() - } - pub async fn spawn_listener( - &self, - bind_address: A, - running: CancellationToken, - ) -> Result, Error> - where - A: 'static + ToSocketAddrs + Send + std::fmt::Debug, - { - trace!("Starting listener task"); - let fmt_addr = format!("{:?}", bind_address); - let listener = TcpListener::bind(bind_address) - .await - .map_err(Error::Io) - .inspect_err(|_| error!("Could not bind to {}.", fmt_addr))?; - - let sender = self.channel.0.clone(); - - let join_handle = tokio::spawn(async move { - let mut client_id = 0u128; - - loop { - tokio::select! { - _ = running.cancelled() => { - break; - } - result = listener.accept() => { - if let Ok((stream, _)) = result { - trace!("Listener task got connection (id {})", client_id); - let client = Connection::new_client(client_id, stream); - if sender.send(client).is_err() { - trace!("Client receiver disconnected"); - break; - } - client_id += 1; - } - } - } - } - trace!("Listener task shutting down"); - }); - - Ok(join_handle) - } - pub async fn update(&mut self) -> Result<(), Error> { - // Receive new clients from the sender. - loop { - match self.channel.1.try_recv() { - Ok(connection) => { - let id = connection.id(); - - match self.max_clients { - Some(max) => { - if self.clients.len() >= max { - let _ = connection.disconnect(None).await; - } else { - self.clients.insert(id, connection); - } - } - None => { - self.clients.insert(id, connection); - } - } - } - Err(mpsc::error::TryRecvError::Disconnected) => { - return Err(Error::ConnectionChannelDisconnnection) - } - Err(mpsc::error::TryRecvError::Empty) => break, - }; - } - - // Disconnect any clients that have timed out. - // We don't actually care if the disconnections succeed, - // the connection is going to be dropped anyway. - let _ = futures::future::join_all({ - // Workaround until issue #59618 hash_extract_if gets stabilized. - let ids = self - .clients - .iter() - .filter_map(|(id, c)| { - if c.received_elapsed() > Duration::from_secs(10) { - Some(*id) - } else { - None - } - }) - .collect::>(); - ids.into_iter() - .map(|id| self.clients.remove(&id).unwrap()) - .map(|client| client.disconnect(None)) - }) - .await; - - // Remove disconnected clients. - let before = self.clients.len(); - self.clients - .retain(|_id, c| c.client_state() != ClientState::Disconnected); - let after = self.clients.len(); - if before - after > 0 { - trace!("Removed {} disconnected clients", before - after); - } - Ok(()) - } - pub async fn disconnect( - &mut self, - id: u128, - reason: Option, - ) -> Option> { - let client = self.clients.remove(&id)?; - Some(client.disconnect(reason).await) - } - pub async fn shutdown(mut self, reason: Option) -> Result<(), Error> { - let reason = reason.unwrap_or(serde_json::json!({ - "text": "You have been disconnected!" - })); - - let disconnections = self - .clients - .drain() - .map(|(_, c)| c) - .map(|c| c.disconnect(Some(reason.clone()))) - .collect::>(); - - // We don't actually care if the disconnections succeed, - // the connection is going to be dropped anyway. - let _disconnections = futures::future::join_all(disconnections).await; - - Ok(()) - } -} - -#[derive(Debug)] -pub struct Connection { - /// The `Connection`'s unique id. - id: u128, - stream: Framed, PacketCodec>, - last_received_data_time: Instant, - last_sent_data_time: Instant, -} -impl Connection { - fn new(id: u128, receiving_direction: PacketDirection, stream: TcpStream) -> Self { - let codec = PacketCodec::new(ClientState::Handshake, receiving_direction); - - Connection { - id, - stream: codec.framed(BufStream::new(stream)), - last_received_data_time: Instant::now(), - last_sent_data_time: Instant::now(), - } - } - /// Make a Connection from a `TcpStream`, acting as a client talking to a server. - pub fn new_client(id: u128, stream: TcpStream) -> Self { - Self::new(id, PacketDirection::Serverbound, stream) - } - /// Make a Connection from a `TcpStream`, acting as a server talking to a client. - pub fn new_server(id: u128, stream: TcpStream) -> Self { - Self::new(id, PacketDirection::Clientbound, stream) - } - pub fn id(&self) -> u128 { - self.id - } - pub fn client_state(&self) -> ClientState { - self.stream.codec().client_state - } - pub fn client_state_mut(&mut self) -> &mut ClientState { - &mut self.stream.codec_mut().client_state - } - pub fn received_elapsed(&self) -> Duration { - self.last_received_data_time.elapsed() - } - pub fn sent_elapsed(&self) -> Duration { - self.last_sent_data_time.elapsed() - } - pub async fn read_packet(&mut self) -> Option> { - let packet = self.stream.next().await.map(|packet| { - packet.map_err(|mut e| { - // Set the codec error to something more descriptive. - if e.to_string() == "bytes remaining on stream" { - e = Error::Io(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e)); - } - trace!("Error reading packet from connection {}: {:?}", self.id, e); - e - }) - }); - - if let Some(Ok(ref packet)) = packet { - trace!("Received packet from connection {}: {:?}", self.id, packet); - self.last_received_data_time = Instant::now(); - - if let Packet::EncryptionRequest(packet) = packet { - // Extract the public key from the packet. - let public_key = rsa::RsaPublicKey::parse(&packet.public_key) - .expect("Failed to parse RSA public key from packet") - .1; - - // Generate a shared secret. - let mut rng = StdRng::from_entropy(); - let shared_secret: [u8; 16] = rng.gen(); - - // Create the AES stream cipher and initialize it with the shared secret. - let encryptor = - Aes128Cfb8Encryptor::new((&shared_secret).into(), (&shared_secret).into()); - let decryptor = - Aes128Cfb8Decryptor::new((&shared_secret).into(), (&shared_secret).into()); - - // Send the encryption response packet. - self.send_packet(packets::login::serverbound::EncryptionResponse { - shared_secret: public_key - .encrypt(&mut rng, rsa::Pkcs1v15Encrypt, &shared_secret[..]) - .expect("Failed to encrypt shared secret"), - verify_token: public_key - .encrypt(&mut rng, rsa::Pkcs1v15Encrypt, &packet.verify_token[..]) - .expect("Failed to encrypt shared secret"), - }) - .await - .expect("Failed to send encryption response"); - - // Enable encryption on the connection. - self.stream.codec_mut().aes_cipher = Some((encryptor, decryptor, 0)); - } - } - - packet - } - pub async fn send_packet>(&mut self, packet: P) -> Result<(), Error> { - let packet: Packet = packet.into(); - trace!("Sending packet to connection {}: {:?}", self.id, packet); - self.stream.send(packet).await.inspect_err(|e| { - trace!("Error sending packet to connection {}: {:?}", self.id, e); - }) - } - pub async fn disconnect(mut self, reason: Option) -> Result<(), Error> { - trace!("Connection disconnected (id {})", self.id); - use packets::{login::clientbound::LoginDisconnect, play::clientbound::PlayDisconnect}; - - let reason = reason.unwrap_or(serde_json::json!({ - "text": "You have been disconnected!" - })); - - match self.client_state() { - ClientState::Disconnected | ClientState::Handshake | ClientState::Status => { - // Impossible to send a disconnect in these states. - } - ClientState::Login => { - let _ = self.send_packet(LoginDisconnect { reason }).await; - } - ClientState::Play => { - let _ = self.send_packet(PlayDisconnect { reason }).await; - } - } - - self.stream.flush().await?; - self.stream.codec_mut().client_state = ClientState::Disconnected; - Ok(()) - } -} diff --git a/src/net/connection/downstream/manager.rs b/src/net/connection/downstream/manager.rs new file mode 100644 index 0000000..2d8d114 --- /dev/null +++ b/src/net/connection/downstream/manager.rs @@ -0,0 +1,171 @@ +use crate::{ + net::{connection::DownstreamConnection, error::Error}, + protocol::{types::Chat, ClientState}, +}; +use std::{collections::HashMap, time::Duration}; +use tokio::{ + net::{TcpListener, ToSocketAddrs}, + sync::mpsc, + task::JoinHandle, +}; +use tokio_util::sync::CancellationToken; +use tracing::{error, trace}; + +#[derive(Debug)] +pub struct DownstreamConnectionManager { + max_clients: Option, + clients: HashMap, + channel: ( + mpsc::UnboundedSender, + mpsc::UnboundedReceiver, + ), +} +impl DownstreamConnectionManager { + pub fn new(max_clients: Option) -> DownstreamConnectionManager { + DownstreamConnectionManager { + max_clients, + clients: HashMap::new(), + channel: mpsc::unbounded_channel(), + } + } + pub fn client(&self, id: u128) -> Option<&DownstreamConnection> { + self.clients.get(&id) + } + pub fn client_mut(&mut self, id: u128) -> Option<&mut DownstreamConnection> { + self.clients.get_mut(&id) + } + pub fn clients(&self) -> impl Iterator { + self.clients.values() + } + pub fn clients_mut(&mut self) -> impl Iterator { + self.clients.values_mut() + } + pub async fn spawn_listener( + &self, + bind_address: A, + running: CancellationToken, + ) -> Result, Error> + where + A: 'static + ToSocketAddrs + Send + std::fmt::Debug, + { + trace!("Starting listener task"); + let fmt_addr = format!("{:?}", bind_address); + let listener = TcpListener::bind(bind_address) + .await + .map_err(Error::Io) + .inspect_err(|_| error!("Could not bind to {}.", fmt_addr))?; + + let sender = self.channel.0.clone(); + + let join_handle = tokio::spawn(async move { + let mut client_id = 0u128; + + loop { + tokio::select! { + _ = running.cancelled() => { + break; + } + result = listener.accept() => { + if let Ok((stream, _)) = result { + trace!("Listener task got connection (id {})", client_id); + let client = DownstreamConnection::new(client_id, stream); + if sender.send(client).is_err() { + trace!("Client receiver disconnected"); + break; + } + client_id += 1; + } + } + } + } + trace!("Listener task shutting down"); + }); + + Ok(join_handle) + } + pub async fn update(&mut self) -> Result<(), Error> { + // Receive new clients from the sender. + loop { + match self.channel.1.try_recv() { + Ok(connection) => { + let id = connection.id(); + + match self.max_clients { + Some(max) => { + if self.clients.len() >= max { + let _ = connection.disconnect(None).await; + } else { + self.clients.insert(id, connection); + } + } + None => { + self.clients.insert(id, connection); + } + } + } + Err(mpsc::error::TryRecvError::Disconnected) => { + return Err(Error::ConnectionChannelDisconnnection) + } + Err(mpsc::error::TryRecvError::Empty) => break, + }; + } + + // Disconnect any clients that have timed out. + // We don't actually care if the disconnections succeed, + // the connection is going to be dropped anyway. + let _ = futures::future::join_all({ + // Workaround until issue #59618 hash_extract_if gets stabilized. + let ids = self + .clients + .iter() + .filter_map(|(id, c)| { + if c.received_elapsed() > Duration::from_secs(10) { + Some(*id) + } else { + None + } + }) + .collect::>(); + ids.into_iter() + .map(|id| self.clients.remove(&id).unwrap()) + .map(|client| client.disconnect(None)) + }) + .await; + + // Remove disconnected clients. + let before = self.clients.len(); + self.clients + .retain(|_id, c| c.client_state() != ClientState::Disconnected); + let after = self.clients.len(); + if before - after > 0 { + trace!("Removed {} disconnected clients", before - after); + } + Ok(()) + } + pub async fn disconnect( + &mut self, + id: u128, + reason: Option, + ) -> Option> { + let client = self.clients.remove(&id)?; + Some(client.disconnect(reason).await) + } + pub async fn shutdown(mut self, reason: Option) -> Result<(), Error> { + let reason = reason.unwrap_or(serde_json::json!({ + "text": "You have been disconnected!" + })); + + let disconnections = self + .clients + .drain() + .map(|(_, c)| c) + .map(|c| c.disconnect(Some(reason.clone()))) + .collect::>(); + + // We don't actually care if the disconnections succeed, + // the connection is going to be dropped anyway. + let _disconnections = futures::future::join_all(disconnections).await; + + Ok(()) + } +} diff --git a/src/net/connection/downstream/mod.rs b/src/net/connection/downstream/mod.rs new file mode 100644 index 0000000..bec9a86 --- /dev/null +++ b/src/net/connection/downstream/mod.rs @@ -0,0 +1,86 @@ +pub mod manager; + +use crate::{ + net::{connection::GenericConnection, error::Error}, + protocol::{ + packets::{self, Packet, PacketDirection}, + types::Chat, + ClientState, + }, +}; +use tokio::net::TcpStream; + +/// The connection's current state. +/// Similar to crate::protocol::ClientState, +/// but has more fine-grained tracking for packet responses. +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +pub enum DownstreamConnectionState { + #[default] + Handshake, + StatusRequest, + StatusPing, + LoginStart, + EncryptionResponse, + LoginPluginResponse, + Play, + Disconnected, +} + +#[derive(Debug)] +pub struct DownstreamConnection { + inner: GenericConnection, + state: DownstreamConnectionState, +} +impl DownstreamConnection { + pub fn new(id: u128, stream: TcpStream) -> Self { + DownstreamConnection { + // receiving_direction: PacketDirection::Serverbound + inner: GenericConnection::new(id, PacketDirection::Serverbound, stream), + state: DownstreamConnectionState::Handshake, + } + } + pub async fn read_packet(&mut self) -> Option> { + self.inner.read_packet().await + } + pub async fn send_packet>(&mut self, packet: P) -> Result<(), Error> { + self.inner.send_packet(packet).await + } + pub async fn disconnect(mut self, reason: Option) -> Result<(), Error> { + use packets::{login::clientbound::LoginDisconnect, play::clientbound::PlayDisconnect}; + + let reason = reason.unwrap_or(serde_json::json!({ + "text": "You have been disconnected!" + })); + + match self.client_state() { + ClientState::Disconnected | ClientState::Handshake | ClientState::Status => { + // Impossible to send a disconnect in these states. + } + ClientState::Login => { + let _ = self.send_packet(LoginDisconnect { reason }).await; + } + ClientState::Play => { + let _ = self.send_packet(PlayDisconnect { reason }).await; + } + } + + self.inner.disconnect().await + } +} +impl std::ops::Deref for DownstreamConnection { + type Target = GenericConnection; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} +impl std::ops::DerefMut for DownstreamConnection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} +impl From for GenericConnection { + fn from(value: DownstreamConnection) -> Self { + value.inner + } +} diff --git a/src/net/connection/mod.rs b/src/net/connection/mod.rs new file mode 100644 index 0000000..29171d4 --- /dev/null +++ b/src/net/connection/mod.rs @@ -0,0 +1,92 @@ +/// Connections where we're the server. +mod downstream; +/// Connections where we're the client. +mod upstream; + +pub use downstream::{manager::DownstreamConnectionManager, DownstreamConnection}; +pub use upstream::UpstreamConnection; + +use crate::{ + net::{codec::PacketCodec, error::Error}, + protocol::{ + packets::{Packet, PacketDirection}, + ClientState, + }, +}; +use futures::{stream::StreamExt, SinkExt}; +use std::time::{Duration, Instant}; +use tokio::{io::BufStream, net::TcpStream}; +use tokio_util::codec::{Decoder, Framed}; +use tracing::trace; + +#[derive(Debug)] +pub struct GenericConnection { + /// The `GenericConnection`'s unique id. + id: u128, + stream: Framed, PacketCodec>, + last_received_data_time: Instant, + last_sent_data_time: Instant, +} +impl GenericConnection { + pub fn new(id: u128, receiving_direction: PacketDirection, stream: TcpStream) -> Self { + let codec = PacketCodec::new(ClientState::Handshake, receiving_direction); + + GenericConnection { + id, + stream: codec.framed(BufStream::new(stream)), + last_received_data_time: Instant::now(), + last_sent_data_time: Instant::now(), + } + } + pub fn id(&self) -> u128 { + self.id + } + pub fn client_state(&self) -> ClientState { + self.stream.codec().client_state + } + pub fn client_state_mut(&mut self) -> &mut ClientState { + &mut self.stream.codec_mut().client_state + } + pub fn received_elapsed(&self) -> Duration { + self.last_received_data_time.elapsed() + } + pub fn sent_elapsed(&self) -> Duration { + self.last_sent_data_time.elapsed() + } + pub async fn read_packet(&mut self) -> Option> { + let packet = self.stream.next().await.map(|packet| { + packet.map_err(|mut e| { + // Set the codec error to something more descriptive. + if e.to_string() == "bytes remaining on stream" { + e = Error::Io(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e)); + } + trace!("Error reading packet from connection {}: {:?}", self.id, e); + e + }) + }); + + if let Some(Ok(ref packet)) = packet { + trace!("Received packet from connection {}: {:?}", self.id, packet); + self.last_received_data_time = Instant::now(); + + if let Some(next_state) = packet.state_change() { + *self.client_state_mut() = next_state; + } + } + + packet + } + pub async fn send_packet>(&mut self, packet: P) -> Result<(), Error> { + let packet: Packet = packet.into(); + trace!("Sending packet to connection {}: {:?}", self.id, packet); + self.stream.send(packet).await.inspect_err(|e| { + trace!("Error sending packet to connection {}: {:?}", self.id, e); + }) + } + pub async fn disconnect(mut self) -> Result<(), Error> { + trace!("Connection disconnected (id {})", self.id); + self.stream.flush().await?; + self.stream.codec_mut().client_state = ClientState::Disconnected; + Ok(()) + } +} diff --git a/src/net/connection/upstream.rs b/src/net/connection/upstream.rs new file mode 100644 index 0000000..9f74b20 --- /dev/null +++ b/src/net/connection/upstream.rs @@ -0,0 +1,83 @@ +use crate::{ + net::{connection::GenericConnection, error::Error}, + protocol::{ + encryption::*, + packets::{self, Packet, PacketDirection}, + }, +}; +use rand::{rngs::StdRng, Rng, SeedableRng}; +use tokio::net::TcpStream; + +#[derive(Debug)] +pub struct UpstreamConnection { + inner: GenericConnection, +} +impl UpstreamConnection { + pub fn new(id: u128, stream: TcpStream) -> Self { + UpstreamConnection { + // receiving_direction: PacketDirection::Clientbound + inner: GenericConnection::new(id, PacketDirection::Clientbound, stream), + } + } + pub async fn read_packet(&mut self) -> Option> { + let packet = self.inner.read_packet().await?.ok()?; + + match packet { + Packet::EncryptionRequest(ref packet) => { + // Extract the public key from the packet. + let public_key = rsa::RsaPublicKey::parse(&packet.public_key) + .expect("Failed to parse RSA public key from packet") + .1; + + // Generate a shared secret. + let mut rng = StdRng::from_entropy(); + let shared_secret: [u8; 16] = rng.gen(); + + // Create the AES stream cipher and initialize it with the shared secret. + let encryptor = + Aes128Cfb8Encryptor::new((&shared_secret).into(), (&shared_secret).into()); + let decryptor = + Aes128Cfb8Decryptor::new((&shared_secret).into(), (&shared_secret).into()); + + // Send the encryption response packet. + self.send_packet(packets::login::serverbound::EncryptionResponse { + shared_secret: public_key + .encrypt(&mut rng, rsa::Pkcs1v15Encrypt, &shared_secret[..]) + .expect("Failed to encrypt shared secret"), + verify_token: public_key + .encrypt(&mut rng, rsa::Pkcs1v15Encrypt, &packet.verify_token[..]) + .expect("Failed to encrypt shared secret"), + }) + .await + .expect("Failed to send encryption response"); + + // Enable encryption on the connection. + self.inner.stream.codec_mut().aes_cipher = Some((encryptor, decryptor, 0)); + } + Packet::SetCompression(_) => todo!(), + _ => {} + } + + Some(Ok(packet)) + } + pub async fn send_packet>(&mut self, packet: P) -> Result<(), Error> { + self.inner.send_packet(packet).await + } +} +impl std::ops::Deref for UpstreamConnection { + type Target = GenericConnection; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} +impl std::ops::DerefMut for UpstreamConnection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} +impl From for GenericConnection { + fn from(value: UpstreamConnection) -> Self { + value.inner + } +} diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index ed9d47f..f9fa3a1 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -1,11 +1,13 @@ pub mod config; pub mod error; -use crate::net::connection::Connection; use crate::protocol::packets::Packet; use crate::protocol::ClientState; use crate::App; -use crate::{config::Config, net::connection::ConnectionManager}; +use crate::{ + config::Config, + net::connection::{DownstreamConnectionManager, UpstreamConnection}, +}; use config::ProxyConfig; use error::{Error, NetworkError}; use tokio::net::TcpStream; @@ -16,17 +18,18 @@ use tracing::{info, trace}; #[derive(Debug)] pub struct Proxy { running: CancellationToken, - connections: ConnectionManager, + connections: DownstreamConnectionManager, listener: JoinHandle<()>, upstream_address: String, - upstream: Connection, + upstream: UpstreamConnection, } impl Proxy { - pub async fn connect_upstream(upstream_address: &str) -> Result { + pub async fn connect_upstream(upstream_address: &str) -> Result { let upstream = TcpStream::connect(upstream_address) .await .map_err(Error::Io)?; - Ok(Connection::new_server(0, upstream)) + + Ok(UpstreamConnection::new(0, upstream)) } pub fn rewrite_packet(packet: Packet) -> Option { match packet { @@ -62,7 +65,7 @@ impl App for Proxy { let bind_address = format!("0.0.0.0:{}", config.proxy.port); // Only allow one client to join at a time. - let connections = ConnectionManager::new(Some(1)); + let connections = DownstreamConnectionManager::new(Some(1)); let listener = connections .spawn_listener(bind_address, running.child_token()) .await