Internal messaging - client to client chat works

This commit is contained in:
Garen Tyler 2021-03-20 10:47:25 -06:00
parent 0f531f20cf
commit 897d2dee2c
7 changed files with 228 additions and 52 deletions

117
Cargo.lock generated
View File

@ -96,6 +96,7 @@ dependencies = [
"chrono",
"ctrlc",
"fern",
"futures",
"lazy_static",
"log",
"radix64",
@ -125,6 +126,98 @@ dependencies = [
"log",
]
[[package]]
name = "futures"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f55667319111d593ba876406af7c409c0ebb44dc4be6132a783ccf163ea14c1"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c2dd2df839b57db9ab69c2c9d8f3e8c81984781937fe2807dc6dcf3b2ad2939"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15496a72fabf0e62bdc3df11a59a3787429221dd0710ba8ef163d6f7a9112c94"
[[package]]
name = "futures-executor"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891a4b7b96d84d5940084b2a37632dd65deeae662c114ceaa2c879629c9c0ad1"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71c2c65c57704c32f5241c1223167c2c3294fd34ac020c807ddbe6db287ba59"
[[package]]
name = "futures-macro"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea405816a5139fb39af82c2beb921d52143f556038378d6db21183a5c37fbfb7"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85754d98985841b7d4f5e8e6fbfa4a4ac847916893ec511a2917ccd8525b8bb3"
[[package]]
name = "futures-task"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa189ef211c15ee602667a6fcfe1c1fd9e07d42250d2156382820fba33c9df80"
[[package]]
name = "futures-util"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1812c7ab8aedf8d6f2701a43e1243acdbcc2b36ab26e2ad421eb99ac963d96d1"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
[[package]]
name = "hermit-abi"
version = "0.1.18"
@ -295,6 +388,24 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]]
name = "proc-macro2"
version = "1.0.24"
@ -384,6 +495,12 @@ dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]]
name = "smallvec"
version = "1.6.1"

View File

@ -16,6 +16,7 @@ tokio = { version = "1", features = ["full"] }
async-trait = "0.1.48"
lazy_static = "1.4.0"
ctrlc = "3.1.8"
futures = "0.3.13"
# colorful = "0.2.1"
# ozelot = "0.9.0" # Ozelot 0.9.0 supports protocol version 578 (1.15.2)
# toml = "0.5.6"

View File

@ -10,7 +10,7 @@ pub mod server;
/// The data types for blocks, chunks, dimensions, and world files.
pub mod world;
use log::warn;
use log::*;
pub use mctypes::*;
use serde::{Deserialize, Serialize};
use std::sync::mpsc::{self, Receiver};

View File

@ -1,4 +1,4 @@
use log::info;
use log::*;
use std::sync::mpsc::TryRecvError;
use std::time::Duration;

14
src/server/messages.rs Normal file
View File

@ -0,0 +1,14 @@
#[derive(Debug, PartialEq, Clone)]
pub enum ServerboundMessage {
Chat(String), // The chat message.
PlayerJoin(String, String), // UUID, then username
}
#[derive(Debug, PartialEq, Clone)]
pub enum ClientboundMessage {}
#[derive(Debug, PartialEq, Clone)]
pub enum BroadcastMessage {
Chat(String), // The chat message.
Disconnect(String), // The reason for disconnecting.
}

View File

@ -1,8 +1,11 @@
/// Internal messaging for the server.
pub mod messages;
/// Put the network client struct in its own file.
pub mod net;
use crate::entity::player::Player;
use log::info;
use log::*;
use messages::*;
use net::*;
use std::sync::mpsc::{self, Receiver, TryRecvError};
use tokio::net::{TcpListener, ToSocketAddrs};
@ -11,13 +14,15 @@ use tokio::net::{TcpListener, ToSocketAddrs};
pub struct Server {
network_clients: Vec<NetworkClient>,
network_receiver: Receiver<NetworkClient>,
message_receiver: Receiver<ServerboundMessage>,
// message_sender: Bus<BroadcastMessage>,
pub players: Vec<Player>,
pub packets_read: usize,
pub packets_sent: usize,
}
impl Server {
pub fn new<A: 'static + ToSocketAddrs + Send>(addr: A) -> Server {
let (tx, rx) = mpsc::channel();
let (network_client_tx, network_client_rx) = mpsc::channel();
let (serverbound_message_tx, serverbound_message_rx) = mpsc::channel();
// let mut broadcast_message_tx = Bus::new(1_000); // Hold up to 1,000 messages in the queue.
tokio::task::spawn(async move {
let listener = TcpListener::bind(addr)
.await
@ -28,18 +33,23 @@ impl Server {
.accept()
.await
.expect("Network receiver disconnected");
tx.send(NetworkClient::new(stream, id as u128))
network_client_tx
.send(NetworkClient::new(
stream,
id as u128,
serverbound_message_tx.clone(),
// broadcast_message_tx.add_rx(),
))
.expect("Network receiver disconnected");
id += 1;
}
});
info!("Network server started!");
Server {
network_receiver: rx,
network_receiver: network_client_rx,
network_clients: vec![],
message_receiver: serverbound_message_rx,
players: vec![],
packets_read: 0,
packets_sent: 0,
}
}
@ -47,26 +57,21 @@ impl Server {
///
/// Disconnects all clients.
pub async fn shutdown(&mut self) {
info!("Server shutting down.");
for client in self.network_clients.iter_mut() {
// We don't care if it doesn't succeed in sending the packet.
let _ = client.disconnect(Some("The server is shutting down")).await;
// Count the number of packets from the remaining clients.
self.packets_read += client.packets_read;
self.packets_sent += client.packets_sent;
}
info!(
"{} packets read and {} packets sent in {:?}",
self.packets_read,
self.packets_sent,
"Server shutting down. Uptime: {:?}",
crate::START_TIME.elapsed()
);
self.broadcast_message(BroadcastMessage::Disconnect(
"The server is shutting down".into(),
))
.await;
}
/// Update the network server.
///
/// Update each client in `self.network_clients`.
async fn update_network(&mut self) -> tokio::io::Result<()> {
// Read new clients from the network.
loop {
match self.network_receiver.try_recv() {
Ok(client) => {
@ -80,6 +85,7 @@ impl Server {
Err(TryRecvError::Disconnected) => panic!("Network sender disconnected"),
}
}
// Count the number of players in the Play state.
let num_players = self.network_clients.iter().fold(0, |acc, nc| {
if nc.state == NetworkClientState::Play {
acc + 1
@ -87,32 +93,45 @@ impl Server {
acc
}
});
// Update each client, disconnecting those with errors.
for client in self.network_clients.iter_mut() {
if client.update(num_players).await.is_err() {
client.force_disconnect();
}
}
// Remove disconnected clients.
let mut index = 0;
self.network_clients
.retain(|nc| nc.state != NetworkClientState::Disconnected);
// Read new messages from the clients.
loop {
if index >= self.network_clients.len() {
break;
match self.message_receiver.try_recv() {
Ok(message) => match message {
ServerboundMessage::Chat(msg) => {
self.broadcast_message(BroadcastMessage::Chat(msg)).await;
}
ServerboundMessage::PlayerJoin(_uuid, username) => {
self.broadcast_message(BroadcastMessage::Chat(format!(
"Welcome {} to the server!",
username
)))
.await;
}
},
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => panic!("Message sender disconnected"),
}
if self.network_clients[index].state == NetworkClientState::Disconnected {
// Count the number of packets before removing it.
self.packets_read += self.network_clients[index].packets_read;
self.packets_sent += self.network_clients[index].packets_sent;
self.network_clients.remove(index);
continue;
}
index += 1;
}
// self.network_clients
// .retain(|nc| nc.state != NetworkClientState::Disconnected);
Ok(())
}
pub async fn broadcast_message(&mut self, message: BroadcastMessage) {
let mut v = Vec::new();
for client in self.network_clients.iter_mut() {
v.push(client.handle_broadcast_message(message.clone()));
}
futures::future::join_all(v).await;
}
/// Update the game server.
///
/// Start by updating the network.

View File

@ -1,10 +1,12 @@
/// Definitions for all the packets in the Minecraft protocol.
pub mod packets;
use super::messages::*;
use crate::{mctypes::*, CONFIG, FAVICON};
use log::{debug, info};
use log::*;
use packets::*;
use serde_json::json;
use std::sync::mpsc::Sender;
use std::time::{Duration, Instant};
use tokio::net::TcpStream;
@ -30,12 +32,15 @@ pub struct NetworkClient {
pub uuid: Option<String>,
pub username: Option<String>,
pub last_keep_alive: Instant,
pub packets_read: usize,
pub packets_sent: usize,
pub message_sender: Sender<ServerboundMessage>,
}
impl NetworkClient {
/// Create a new `NetworkClient`
pub fn new(stream: TcpStream, id: u128) -> NetworkClient {
pub fn new(
stream: TcpStream,
id: u128,
message_sender: Sender<ServerboundMessage>,
) -> NetworkClient {
NetworkClient {
id,
connected: true,
@ -44,8 +49,7 @@ impl NetworkClient {
uuid: None,
username: None,
last_keep_alive: Instant::now(),
packets_read: 0,
packets_sent: 0,
message_sender,
}
}
@ -160,11 +164,18 @@ impl NetworkClient {
// TODO: C->S Player Position and Look
// TODO: C->S Client Status
// TODO: S->C inventories, entities, etc.
self.send_chat_message(format!(
"Welcome {} to the server!",
self.username.as_ref().unwrap_or(&"unknown".to_owned())
))
.await?;
self.message_sender
.send(ServerboundMessage::PlayerJoin(
self.uuid
.as_ref()
.unwrap_or(&"00000000-0000-3000-0000-000000000000".to_owned())
.to_string(),
self.username
.as_ref()
.unwrap_or(&"unknown".to_owned())
.to_string(),
))
.expect("Message receiver disconnected");
}
NetworkClientState::Play => {
if self.last_keep_alive.elapsed() > Duration::from_millis(1000) {
@ -188,7 +199,9 @@ impl NetworkClient {
self.get_packet::<ServerboundChatMessage>().await?;
let reply = format!("<{}> {}", self.get_name(), serverboundchatmessage.text);
info!("{}", reply);
self.send_chat_message(reply).await?;
self.message_sender
.send(ServerboundMessage::Chat(reply))
.expect("Message receiver disconnected");
} else {
let _ = read_bytes(&mut self.stream, Into::<i32>::into(packet_length) as usize)
.await?;
@ -196,7 +209,7 @@ impl NetworkClient {
}
NetworkClientState::Disconnected => {
if self.connected {
self.disconnect(None).await?;
self.disconnect("Disconnected").await?;
}
}
}
@ -205,14 +218,12 @@ impl NetworkClient {
/// Send a generic packet to the client.
pub async fn send_packet<P: PacketCommon>(&mut self, packet: P) -> tokio::io::Result<()> {
self.packets_sent += 1;
debug!("Sent {:?} {:#04X?} {:?}", self.state, P::id(), packet);
packet.write(&mut self.stream).await
}
/// Read a generic packet from the network.
pub async fn get_packet<T: PacketCommon>(&mut self) -> tokio::io::Result<T> {
self.packets_read += 1;
let packet = T::read(&mut self.stream).await?;
debug!("Got {:?} {:#04X?} {:?}", self.state, T::id(), packet);
Ok(packet)
@ -232,9 +243,9 @@ impl NetworkClient {
/// Disconnect the client.
///
/// Sends `0x40 Disconnect` then waits 10 seconds before forcing the connection closed.
pub async fn disconnect(&mut self, reason: Option<&str>) -> tokio::io::Result<()> {
pub async fn disconnect<S: Into<MCString>>(&mut self, reason: S) -> tokio::io::Result<()> {
let mut disconnect = Disconnect::new();
disconnect.reason.text = reason.unwrap_or("Disconnected").into();
disconnect.reason.text = reason.into();
self.send_packet(disconnect).await?;
self.force_disconnect();
Ok(())
@ -258,10 +269,24 @@ impl NetworkClient {
Ok(())
}
/// Helper function to get the name of the player.
pub fn get_name(&self) -> String {
self.username
.as_ref()
.unwrap_or(&"unknown".to_owned())
.to_string()
}
/// Receives broadcast messages from the server.
pub async fn handle_broadcast_message(
&mut self,
message: BroadcastMessage,
) -> tokio::io::Result<()> {
use BroadcastMessage::*;
match message {
Chat(s) => self.send_chat_message(s).await?,
Disconnect(reason) => self.disconnect(reason).await?,
}
Ok(())
}
}