diff --git a/src/mctypes.rs b/src/mctypes.rs index 4de4bf2..8a1ea93 100644 --- a/src/mctypes.rs +++ b/src/mctypes.rs @@ -3,12 +3,12 @@ pub use functions::*; pub use numbers::*; pub use other::*; -use std::io::prelude::*; -use std::net::TcpStream; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; // /// Make sure all types can serialize and deserialize to/from `Vec`. // pub trait MCType: Into> + TryFrom> + Display { -// pub async fn read(_stream: &mut TcpStream) -> std::io::Result; +// pub async fn read(_stream: &mut TcpStream) -> tokio::io::Result; // } /// Helper functions. @@ -16,14 +16,14 @@ pub mod functions { use super::*; /// Read a single byte from the given `TcpStream`. - pub async fn read_byte(t: &mut TcpStream) -> std::io::Result { + pub async fn read_byte(t: &mut TcpStream) -> tokio::io::Result { let mut buffer = [0u8; 1]; - t.read_exact(&mut buffer)?; + t.read_exact(&mut buffer).await?; Ok(buffer[0]) } /// Write a single byte to the given `TcpStream`. - pub fn write_byte(t: &mut TcpStream, value: u8) -> std::io::Result { - t.write(&[value])?; + pub async fn write_byte(t: &mut TcpStream, value: u8) -> tokio::io::Result { + t.write(&[value]).await?; Ok(value) } /// Take `l` bytes from the given `Vec`. @@ -43,8 +43,8 @@ pub mod functions { a.into_boxed_slice() } /// Makes returning errors shorter. - pub fn io_error(s: &str) -> std::io::Error { - use std::io::{Error, ErrorKind}; + pub fn io_error(s: &str) -> tokio::io::Error { + use tokio::io::{Error, ErrorKind}; Error::new(ErrorKind::Other, s) } } @@ -113,7 +113,7 @@ pub mod other { } } impl MCBoolean { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let b = read_byte(t).await?; Ok(MCBoolean::try_from(vec![b]).unwrap()) } @@ -188,7 +188,7 @@ pub mod other { } } impl MCString { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let str_len = MCVarInt::read(t).await?; let mut str_bytes = vec![]; for _ in 0..str_len.into() { @@ -243,7 +243,7 @@ pub mod other { } } impl MCChat { - pub async fn read(_t: &mut TcpStream) -> std::io::Result { + pub async fn read(_t: &mut TcpStream) -> tokio::io::Result { Err(io_error("Cannot read MCChat from stream")) } } @@ -261,7 +261,7 @@ pub mod numbers { pub value: i8, // -128 to 127 } impl MCByte { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { Ok(MCByte::from_bytes(vec![read_byte(t).await?])) } pub fn from_bytes(v: Vec) -> MCByte { @@ -319,7 +319,7 @@ pub mod numbers { pub value: u8, // 0 to 255 } impl MCUnsignedByte { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { Ok(MCUnsignedByte::from_bytes(vec![read_byte(t).await?])) } pub fn from_bytes(v: Vec) -> MCUnsignedByte { @@ -377,7 +377,7 @@ pub mod numbers { pub value: i16, // -32768 to 32767 } impl MCShort { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut bytes = Vec::new(); bytes.push(read_byte(t).await?); // MSD bytes.push(read_byte(t).await?); // LSD @@ -440,7 +440,7 @@ pub mod numbers { pub value: u16, // 0 to 65535 } impl MCUnsignedShort { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut bytes = Vec::new(); bytes.push(read_byte(t).await?); // MSD bytes.push(read_byte(t).await?); // LSD @@ -503,7 +503,7 @@ pub mod numbers { pub value: i32, // -2147483648 to 2147483647 } impl MCInt { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut bytes = Vec::new(); for _ in 0..4 { bytes.push(read_byte(t).await?); @@ -567,7 +567,7 @@ pub mod numbers { pub value: u32, // 0 to 4294967295 } impl MCUnsignedInt { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut bytes = Vec::new(); for _ in 0..4 { bytes.push(read_byte(t).await?); @@ -631,7 +631,7 @@ pub mod numbers { pub value: i64, // -9223372036854775808 to 9223372036854775807 } impl MCLong { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut bytes = Vec::new(); for _ in 0..8 { bytes.push(read_byte(t).await?); @@ -695,7 +695,7 @@ pub mod numbers { pub value: u64, // 0 to 18446744073709551615 } impl MCUnsignedLong { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut bytes = Vec::new(); for _ in 0..8 { bytes.push(read_byte(t).await?); @@ -759,7 +759,7 @@ pub mod numbers { pub value: f32, // 32-bit floating point number } impl MCFloat { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut bytes = Vec::new(); for _ in 0..4 { bytes.push(read_byte(t).await?); @@ -823,7 +823,7 @@ pub mod numbers { pub value: f64, // 64-bit floating point number } impl MCDouble { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut bytes = Vec::new(); for _ in 0..8 { bytes.push(read_byte(t).await?); @@ -887,7 +887,7 @@ pub mod numbers { pub value: i32, // Variable length 32-bit integer } impl MCVarInt { - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut num_read = 0; let mut result = 0i32; let mut read = 0u8; diff --git a/src/server/net/mod.rs b/src/server/net/mod.rs index 4550104..a792def 100644 --- a/src/server/net/mod.rs +++ b/src/server/net/mod.rs @@ -5,8 +5,8 @@ use crate::mctypes::*; use log::{debug, info}; use packets::*; use serde_json::json; -use std::net::{TcpListener, TcpStream, ToSocketAddrs}; use std::sync::mpsc::{self, Receiver, TryRecvError}; +use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; /// The part of the server that handles /// connecting clients and receiving/sending packets. @@ -20,18 +20,24 @@ impl NetworkServer { /// then hold that in a queue for processing on an update. pub fn new(addr: A) -> NetworkServer { let (tx, rx) = mpsc::channel(); - std::thread::spawn(move || { - let listener = TcpListener::bind(addr).expect("Could not bind to TCP socket"); - for (id, stream) in listener.incoming().enumerate() { - if let Ok(s) = stream { - tx.send(NetworkClient { - id: id as u128, - connected: true, - stream: s, - state: NetworkClientState::Handshake, - }) + tokio::task::spawn(async move { + let listener = TcpListener::bind(addr) + .await + .expect("Could not bind to TCP socket"); + let mut id = 0; + loop { + let (stream, _) = listener + .accept() + .await .expect("Network receiver disconnected"); - } + tx.send(NetworkClient { + id: id as u128, + connected: true, + stream, + state: NetworkClientState::Handshake, + }) + .expect("Network receiver disconnected"); + id += 1; } }); info!("Network server started!"); @@ -87,7 +93,8 @@ impl NetworkClient { pub async fn update(&mut self) { match self.state { NetworkClientState::Handshake => { - let (_packet_length, _packet_id) = read_packet_header(&mut self.stream).await.unwrap(); + let (_packet_length, _packet_id) = + read_packet_header(&mut self.stream).await.unwrap(); let handshake = Handshake::read(&mut self.stream).await.unwrap(); // Minecraft versions 1.8 - 1.8.9 use protocol version 47. let compatible_versions = handshake.protocol_version == 47; @@ -109,7 +116,8 @@ impl NetworkClient { debug!("Got handshake: {:?}", handshake); } NetworkClientState::Status => { - let (_packet_length, _packet_id) = read_packet_header(&mut self.stream).await.unwrap(); + let (_packet_length, _packet_id) = + read_packet_header(&mut self.stream).await.unwrap(); let statusrequest = StatusRequest::read(&mut self.stream).await.unwrap(); debug!("Got status request: {:?}", statusrequest); let mut statusresponse = StatusResponse::new(); @@ -138,7 +146,8 @@ impl NetworkClient { .into(); statusresponse.write(&mut self.stream).await.unwrap(); debug!("Sending status response: StatusResponse"); - let (_packet_length, _packet_id) = read_packet_header(&mut self.stream).await.unwrap(); + let (_packet_length, _packet_id) = + read_packet_header(&mut self.stream).await.unwrap(); let statusping = StatusPing::read(&mut self.stream).await.unwrap(); debug!("Got status ping: {:?}", statusping); let mut statuspong = StatusPong::new(); @@ -148,7 +157,8 @@ impl NetworkClient { self.state = NetworkClientState::Disconnected; } NetworkClientState::Login => { - let (_packet_length, _packet_id) = read_packet_header(&mut self.stream).await.unwrap(); + let (_packet_length, _packet_id) = + read_packet_header(&mut self.stream).await.unwrap(); let loginstart = LoginStart::read(&mut self.stream).await.unwrap(); debug!("{:?}", loginstart); // Offline mode skips encryption and compression. diff --git a/src/server/net/packets/clientbound.rs b/src/server/net/packets/clientbound.rs index dc86297..03deb2f 100644 --- a/src/server/net/packets/clientbound.rs +++ b/src/server/net/packets/clientbound.rs @@ -1,6 +1,6 @@ use crate::mctypes::*; use std::convert::{Into, TryFrom}; -use std::net::TcpStream; +use tokio::net::TcpStream; #[derive(Debug, Clone)] pub struct StatusResponse { @@ -28,14 +28,14 @@ impl StatusResponse { json_response: MCString::from(""), } } - pub async fn read(t: &'_ mut TcpStream) -> std::io::Result { + pub async fn read(t: &'_ mut TcpStream) -> tokio::io::Result { let mut statusresponse = StatusResponse::new(); statusresponse.json_response = MCString::read(t).await?; Ok(statusresponse) } - pub async fn write(&self, t: &'_ mut TcpStream) -> std::io::Result<()> { + pub async fn write(&self, t: &'_ mut TcpStream) -> tokio::io::Result<()> { for b in Into::>::into(self.clone()) { - write_byte(t, b)?; + write_byte(t, b).await?; } Ok(()) } @@ -65,14 +65,14 @@ impl StatusPong { pub fn new() -> Self { StatusPong { payload: 0.into() } } - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut statuspong = StatusPong::new(); statuspong.payload = MCLong::read(t).await?; Ok(statuspong) } - pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> { + pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> { for b in Into::>::into(self.clone()) { - write_byte(t, b)?; + write_byte(t, b).await?; } Ok(()) } @@ -107,15 +107,15 @@ impl LoginSuccess { username: MCString::from(""), } } - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut loginsuccess = LoginSuccess::new(); loginsuccess.uuid = MCString::read(t).await?; loginsuccess.username = MCString::read(t).await?; Ok(loginsuccess) } - pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> { + pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> { for b in Into::>::into(self.clone()) { - write_byte(t, b)?; + write_byte(t, b).await?; } Ok(()) } @@ -149,16 +149,16 @@ impl LoginDisconnect { }, } } - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut logindisconnect = LoginDisconnect::new(); logindisconnect.reason = MCChat { text: MCString::read(t).await?, }; Ok(logindisconnect) } - pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> { + pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> { for b in Into::>::into(self.clone()) { - write_byte(t, b)?; + write_byte(t, b).await?; } Ok(()) } diff --git a/src/server/net/packets/mod.rs b/src/server/net/packets/mod.rs index 934cc4f..86dfd1f 100644 --- a/src/server/net/packets/mod.rs +++ b/src/server/net/packets/mod.rs @@ -6,10 +6,10 @@ pub mod serverbound; use crate::mctypes::MCVarInt; pub use clientbound::*; pub use serverbound::*; -use std::net::TcpStream; +use tokio::net::TcpStream; /// A helper function to read the packet header. -pub async fn read_packet_header(t: &mut TcpStream) -> std::io::Result<(MCVarInt, MCVarInt)> { +pub async fn read_packet_header(t: &mut TcpStream) -> tokio::io::Result<(MCVarInt, MCVarInt)> { let length = MCVarInt::read(t).await?; let id = MCVarInt::read(t).await?; Ok((length, id)) diff --git a/src/server/net/packets/serverbound.rs b/src/server/net/packets/serverbound.rs index e3311c9..988e651 100644 --- a/src/server/net/packets/serverbound.rs +++ b/src/server/net/packets/serverbound.rs @@ -1,6 +1,6 @@ use crate::mctypes::*; use std::convert::{Into, TryFrom}; -use std::net::TcpStream; +use tokio::net::TcpStream; /// Needed for every interaction with the server. #[derive(Debug, Clone)] @@ -38,7 +38,7 @@ impl Handshake { next_state: 0.into(), } } - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut handshake = Handshake::new(); handshake.protocol_version = MCVarInt::read(t).await?; handshake.server_address = MCString::read(t).await?; @@ -46,9 +46,9 @@ impl Handshake { handshake.next_state = MCVarInt::read(t).await?; Ok(handshake) } - pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> { + pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> { for b in Into::>::into(self.clone()) { - write_byte(t, b)?; + write_byte(t, b).await?; } Ok(()) } @@ -75,13 +75,13 @@ impl StatusRequest { pub fn new() -> Self { StatusRequest {} } - pub async fn read(_t: &mut TcpStream) -> std::io::Result { + pub async fn read(_t: &mut TcpStream) -> tokio::io::Result { let statusrequest = StatusRequest::new(); Ok(statusrequest) } - pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> { + pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> { for b in Into::>::into(self.clone()) { - write_byte(t, b)?; + write_byte(t, b).await?; } Ok(()) } @@ -111,14 +111,14 @@ impl StatusPing { pub fn new() -> Self { StatusPing { payload: 0.into() } } - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut statusping = StatusPing::new(); statusping.payload = MCLong::read(t).await?; Ok(statusping) } - pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> { + pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> { for b in Into::>::into(self.clone()) { - write_byte(t, b)?; + write_byte(t, b).await?; } Ok(()) } @@ -150,14 +150,14 @@ impl LoginStart { player_name: "".into(), } } - pub async fn read(t: &mut TcpStream) -> std::io::Result { + pub async fn read(t: &mut TcpStream) -> tokio::io::Result { let mut loginstart = LoginStart::new(); loginstart.player_name = MCString::read(t).await?; Ok(loginstart) } - pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> { + pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> { for b in Into::>::into(self.clone()) { - write_byte(t, b)?; + write_byte(t, b).await?; } Ok(()) }