diff --git a/Cargo.lock b/Cargo.lock index a69b62d..ff52592 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,6 +96,7 @@ dependencies = [ "chrono", "ctrlc", "fern", + "futures", "lazy_static", "log", "radix64", @@ -125,6 +126,98 @@ dependencies = [ "log", ] +[[package]] +name = "futures" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f55667319111d593ba876406af7c409c0ebb44dc4be6132a783ccf163ea14c1" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c2dd2df839b57db9ab69c2c9d8f3e8c81984781937fe2807dc6dcf3b2ad2939" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94" + +[[package]] +name = "futures-executor" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891a4b7b96d84d5940084b2a37632dd65deeae662c114ceaa2c879629c9c0ad1" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59" + +[[package]] +name = "futures-macro" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea405816a5139fb39af82c2beb921d52143f556038378d6db21183a5c37fbfb7" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85754d98985841b7d4f5e8e6fbfa4a4ac847916893ec511a2917ccd8525b8bb3" + +[[package]] +name = "futures-task" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa189ef211c15ee602667a6fcfe1c1fd9e07d42250d2156382820fba33c9df80" + +[[package]] +name = "futures-util" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1812c7ab8aedf8d6f2701a43e1243acdbcc2b36ab26e2ad421eb99ac963d96d1" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + [[package]] name = "hermit-abi" version = "0.1.18" @@ -295,6 +388,24 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + [[package]] name = "proc-macro2" version = "1.0.24" @@ -384,6 +495,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + [[package]] name = "smallvec" version = "1.6.1" diff --git a/Cargo.toml b/Cargo.toml index 679776d..d01653b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ tokio = { version = "1", features = ["full"] } async-trait = "0.1.48" lazy_static = "1.4.0" ctrlc = "3.1.8" +futures = "0.3.13" # colorful = "0.2.1" # ozelot = "0.9.0" # Ozelot 0.9.0 supports protocol version 578 (1.15.2) # toml = "0.5.6" diff --git a/src/lib.rs b/src/lib.rs index 71329bf..69a3860 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ pub mod server; /// The data types for blocks, chunks, dimensions, and world files. pub mod world; -use log::warn; +use log::*; pub use mctypes::*; use serde::{Deserialize, Serialize}; use std::sync::mpsc::{self, Receiver}; diff --git a/src/main.rs b/src/main.rs index ef9b371..fd16e2d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use log::info; +use log::*; use std::sync::mpsc::TryRecvError; use std::time::Duration; diff --git a/src/server/messages.rs b/src/server/messages.rs new file mode 100644 index 0000000..a91d51b --- /dev/null +++ b/src/server/messages.rs @@ -0,0 +1,14 @@ +#[derive(Debug, PartialEq, Clone)] +pub enum ServerboundMessage { + Chat(String), // The chat message. + PlayerJoin(String, String), // UUID, then username +} + +#[derive(Debug, PartialEq, Clone)] +pub enum ClientboundMessage {} + +#[derive(Debug, PartialEq, Clone)] +pub enum BroadcastMessage { + Chat(String), // The chat message. + Disconnect(String), // The reason for disconnecting. +} diff --git a/src/server/mod.rs b/src/server/mod.rs index 17d3b23..d134231 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,8 +1,11 @@ +/// Internal messaging for the server. +pub mod messages; /// Put the network client struct in its own file. pub mod net; use crate::entity::player::Player; -use log::info; +use log::*; +use messages::*; use net::*; use std::sync::mpsc::{self, Receiver, TryRecvError}; use tokio::net::{TcpListener, ToSocketAddrs}; @@ -11,13 +14,15 @@ use tokio::net::{TcpListener, ToSocketAddrs}; pub struct Server { network_clients: Vec, network_receiver: Receiver, + message_receiver: Receiver, + // message_sender: Bus, pub players: Vec, - pub packets_read: usize, - pub packets_sent: usize, } impl Server { pub fn new(addr: A) -> Server { - let (tx, rx) = mpsc::channel(); + let (network_client_tx, network_client_rx) = mpsc::channel(); + let (serverbound_message_tx, serverbound_message_rx) = mpsc::channel(); + // let mut broadcast_message_tx = Bus::new(1_000); // Hold up to 1,000 messages in the queue. tokio::task::spawn(async move { let listener = TcpListener::bind(addr) .await @@ -28,18 +33,23 @@ impl Server { .accept() .await .expect("Network receiver disconnected"); - tx.send(NetworkClient::new(stream, id as u128)) + network_client_tx + .send(NetworkClient::new( + stream, + id as u128, + serverbound_message_tx.clone(), + // broadcast_message_tx.add_rx(), + )) .expect("Network receiver disconnected"); id += 1; } }); info!("Network server started!"); Server { - network_receiver: rx, + network_receiver: network_client_rx, network_clients: vec![], + message_receiver: serverbound_message_rx, players: vec![], - packets_read: 0, - packets_sent: 0, } } @@ -47,26 +57,21 @@ impl Server { /// /// Disconnects all clients. pub async fn shutdown(&mut self) { - info!("Server shutting down."); - for client in self.network_clients.iter_mut() { - // We don't care if it doesn't succeed in sending the packet. - let _ = client.disconnect(Some("The server is shutting down")).await; - // Count the number of packets from the remaining clients. - self.packets_read += client.packets_read; - self.packets_sent += client.packets_sent; - } info!( - "{} packets read and {} packets sent in {:?}", - self.packets_read, - self.packets_sent, + "Server shutting down. Uptime: {:?}", crate::START_TIME.elapsed() ); + self.broadcast_message(BroadcastMessage::Disconnect( + "The server is shutting down".into(), + )) + .await; } /// Update the network server. /// /// Update each client in `self.network_clients`. async fn update_network(&mut self) -> tokio::io::Result<()> { + // Read new clients from the network. loop { match self.network_receiver.try_recv() { Ok(client) => { @@ -80,6 +85,7 @@ impl Server { Err(TryRecvError::Disconnected) => panic!("Network sender disconnected"), } } + // Count the number of players in the Play state. let num_players = self.network_clients.iter().fold(0, |acc, nc| { if nc.state == NetworkClientState::Play { acc + 1 @@ -87,32 +93,45 @@ impl Server { acc } }); + // Update each client, disconnecting those with errors. for client in self.network_clients.iter_mut() { if client.update(num_players).await.is_err() { client.force_disconnect(); } } // Remove disconnected clients. - let mut index = 0; + self.network_clients + .retain(|nc| nc.state != NetworkClientState::Disconnected); + // Read new messages from the clients. loop { - if index >= self.network_clients.len() { - break; + match self.message_receiver.try_recv() { + Ok(message) => match message { + ServerboundMessage::Chat(msg) => { + self.broadcast_message(BroadcastMessage::Chat(msg)).await; + } + ServerboundMessage::PlayerJoin(_uuid, username) => { + self.broadcast_message(BroadcastMessage::Chat(format!( + "Welcome {} to the server!", + username + ))) + .await; + } + }, + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => panic!("Message sender disconnected"), } - if self.network_clients[index].state == NetworkClientState::Disconnected { - // Count the number of packets before removing it. - self.packets_read += self.network_clients[index].packets_read; - self.packets_sent += self.network_clients[index].packets_sent; - self.network_clients.remove(index); - continue; - } - index += 1; } - // self.network_clients - // .retain(|nc| nc.state != NetworkClientState::Disconnected); - Ok(()) } + pub async fn broadcast_message(&mut self, message: BroadcastMessage) { + let mut v = Vec::new(); + for client in self.network_clients.iter_mut() { + v.push(client.handle_broadcast_message(message.clone())); + } + futures::future::join_all(v).await; + } + /// Update the game server. /// /// Start by updating the network. diff --git a/src/server/net/mod.rs b/src/server/net/mod.rs index 8ae3f48..9d16b46 100644 --- a/src/server/net/mod.rs +++ b/src/server/net/mod.rs @@ -1,10 +1,12 @@ /// Definitions for all the packets in the Minecraft protocol. pub mod packets; +use super::messages::*; use crate::{mctypes::*, CONFIG, FAVICON}; -use log::{debug, info}; +use log::*; use packets::*; use serde_json::json; +use std::sync::mpsc::Sender; use std::time::{Duration, Instant}; use tokio::net::TcpStream; @@ -30,12 +32,15 @@ pub struct NetworkClient { pub uuid: Option, pub username: Option, pub last_keep_alive: Instant, - pub packets_read: usize, - pub packets_sent: usize, + pub message_sender: Sender, } impl NetworkClient { /// Create a new `NetworkClient` - pub fn new(stream: TcpStream, id: u128) -> NetworkClient { + pub fn new( + stream: TcpStream, + id: u128, + message_sender: Sender, + ) -> NetworkClient { NetworkClient { id, connected: true, @@ -44,8 +49,7 @@ impl NetworkClient { uuid: None, username: None, last_keep_alive: Instant::now(), - packets_read: 0, - packets_sent: 0, + message_sender, } } @@ -160,11 +164,18 @@ impl NetworkClient { // TODO: C->S Player Position and Look // TODO: C->S Client Status // TODO: S->C inventories, entities, etc. - self.send_chat_message(format!( - "Welcome {} to the server!", - self.username.as_ref().unwrap_or(&"unknown".to_owned()) - )) - .await?; + self.message_sender + .send(ServerboundMessage::PlayerJoin( + self.uuid + .as_ref() + .unwrap_or(&"00000000-0000-3000-0000-000000000000".to_owned()) + .to_string(), + self.username + .as_ref() + .unwrap_or(&"unknown".to_owned()) + .to_string(), + )) + .expect("Message receiver disconnected"); } NetworkClientState::Play => { if self.last_keep_alive.elapsed() > Duration::from_millis(1000) { @@ -188,7 +199,9 @@ impl NetworkClient { self.get_packet::().await?; let reply = format!("<{}> {}", self.get_name(), serverboundchatmessage.text); info!("{}", reply); - self.send_chat_message(reply).await?; + self.message_sender + .send(ServerboundMessage::Chat(reply)) + .expect("Message receiver disconnected"); } else { let _ = read_bytes(&mut self.stream, Into::::into(packet_length) as usize) .await?; @@ -196,7 +209,7 @@ impl NetworkClient { } NetworkClientState::Disconnected => { if self.connected { - self.disconnect(None).await?; + self.disconnect("Disconnected").await?; } } } @@ -205,14 +218,12 @@ impl NetworkClient { /// Send a generic packet to the client. pub async fn send_packet(&mut self, packet: P) -> tokio::io::Result<()> { - self.packets_sent += 1; debug!("Sent {:?} {:#04X?} {:?}", self.state, P::id(), packet); packet.write(&mut self.stream).await } /// Read a generic packet from the network. pub async fn get_packet(&mut self) -> tokio::io::Result { - self.packets_read += 1; let packet = T::read(&mut self.stream).await?; debug!("Got {:?} {:#04X?} {:?}", self.state, T::id(), packet); Ok(packet) @@ -232,9 +243,9 @@ impl NetworkClient { /// Disconnect the client. /// /// Sends `0x40 Disconnect` then waits 10 seconds before forcing the connection closed. - pub async fn disconnect(&mut self, reason: Option<&str>) -> tokio::io::Result<()> { + pub async fn disconnect>(&mut self, reason: S) -> tokio::io::Result<()> { let mut disconnect = Disconnect::new(); - disconnect.reason.text = reason.unwrap_or("Disconnected").into(); + disconnect.reason.text = reason.into(); self.send_packet(disconnect).await?; self.force_disconnect(); Ok(()) @@ -258,10 +269,24 @@ impl NetworkClient { Ok(()) } + /// Helper function to get the name of the player. pub fn get_name(&self) -> String { self.username .as_ref() .unwrap_or(&"unknown".to_owned()) .to_string() } + + /// Receives broadcast messages from the server. + pub async fn handle_broadcast_message( + &mut self, + message: BroadcastMessage, + ) -> tokio::io::Result<()> { + use BroadcastMessage::*; + match message { + Chat(s) => self.send_chat_message(s).await?, + Disconnect(reason) => self.disconnect(reason).await?, + } + Ok(()) + } }