Actually use the async networking

This commit is contained in:
Garen Tyler 2021-03-01 21:13:02 -07:00
parent a4b461b6c0
commit 03405dfb79
5 changed files with 77 additions and 67 deletions

View File

@ -3,12 +3,12 @@
pub use functions::*;
pub use numbers::*;
pub use other::*;
use std::io::prelude::*;
use std::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
// /// Make sure all types can serialize and deserialize to/from `Vec<u8>`.
// pub trait MCType: Into<Vec<u8>> + TryFrom<Vec<u8>> + Display {
// pub async fn read(_stream: &mut TcpStream) -> std::io::Result<Self>;
// pub async fn read(_stream: &mut TcpStream) -> tokio::io::Result<Self>;
// }
/// Helper functions.
@ -16,14 +16,14 @@ pub mod functions {
use super::*;
/// Read a single byte from the given `TcpStream`.
pub async fn read_byte(t: &mut TcpStream) -> std::io::Result<u8> {
pub async fn read_byte(t: &mut TcpStream) -> tokio::io::Result<u8> {
let mut buffer = [0u8; 1];
t.read_exact(&mut buffer)?;
t.read_exact(&mut buffer).await?;
Ok(buffer[0])
}
/// Write a single byte to the given `TcpStream`.
pub fn write_byte(t: &mut TcpStream, value: u8) -> std::io::Result<u8> {
t.write(&[value])?;
pub async fn write_byte(t: &mut TcpStream, value: u8) -> tokio::io::Result<u8> {
t.write(&[value]).await?;
Ok(value)
}
/// Take `l` bytes from the given `Vec<u8>`.
@ -43,8 +43,8 @@ pub mod functions {
a.into_boxed_slice()
}
/// Makes returning errors shorter.
pub fn io_error(s: &str) -> std::io::Error {
use std::io::{Error, ErrorKind};
pub fn io_error(s: &str) -> tokio::io::Error {
use tokio::io::{Error, ErrorKind};
Error::new(ErrorKind::Other, s)
}
}
@ -113,7 +113,7 @@ pub mod other {
}
}
impl MCBoolean {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCBoolean> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCBoolean> {
let b = read_byte(t).await?;
Ok(MCBoolean::try_from(vec![b]).unwrap())
}
@ -188,7 +188,7 @@ pub mod other {
}
}
impl MCString {
pub async fn read(t: &mut TcpStream) -> std::io::Result<Self> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<Self> {
let str_len = MCVarInt::read(t).await?;
let mut str_bytes = vec![];
for _ in 0..str_len.into() {
@ -243,7 +243,7 @@ pub mod other {
}
}
impl MCChat {
pub async fn read(_t: &mut TcpStream) -> std::io::Result<Self> {
pub async fn read(_t: &mut TcpStream) -> tokio::io::Result<Self> {
Err(io_error("Cannot read MCChat from stream"))
}
}
@ -261,7 +261,7 @@ pub mod numbers {
pub value: i8, // -128 to 127
}
impl MCByte {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCByte> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCByte> {
Ok(MCByte::from_bytes(vec![read_byte(t).await?]))
}
pub fn from_bytes(v: Vec<u8>) -> MCByte {
@ -319,7 +319,7 @@ pub mod numbers {
pub value: u8, // 0 to 255
}
impl MCUnsignedByte {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCUnsignedByte> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCUnsignedByte> {
Ok(MCUnsignedByte::from_bytes(vec![read_byte(t).await?]))
}
pub fn from_bytes(v: Vec<u8>) -> MCUnsignedByte {
@ -377,7 +377,7 @@ pub mod numbers {
pub value: i16, // -32768 to 32767
}
impl MCShort {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCShort> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCShort> {
let mut bytes = Vec::new();
bytes.push(read_byte(t).await?); // MSD
bytes.push(read_byte(t).await?); // LSD
@ -440,7 +440,7 @@ pub mod numbers {
pub value: u16, // 0 to 65535
}
impl MCUnsignedShort {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCUnsignedShort> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCUnsignedShort> {
let mut bytes = Vec::new();
bytes.push(read_byte(t).await?); // MSD
bytes.push(read_byte(t).await?); // LSD
@ -503,7 +503,7 @@ pub mod numbers {
pub value: i32, // -2147483648 to 2147483647
}
impl MCInt {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCInt> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCInt> {
let mut bytes = Vec::new();
for _ in 0..4 {
bytes.push(read_byte(t).await?);
@ -567,7 +567,7 @@ pub mod numbers {
pub value: u32, // 0 to 4294967295
}
impl MCUnsignedInt {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCUnsignedInt> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCUnsignedInt> {
let mut bytes = Vec::new();
for _ in 0..4 {
bytes.push(read_byte(t).await?);
@ -631,7 +631,7 @@ pub mod numbers {
pub value: i64, // -9223372036854775808 to 9223372036854775807
}
impl MCLong {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCLong> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCLong> {
let mut bytes = Vec::new();
for _ in 0..8 {
bytes.push(read_byte(t).await?);
@ -695,7 +695,7 @@ pub mod numbers {
pub value: u64, // 0 to 18446744073709551615
}
impl MCUnsignedLong {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCUnsignedLong> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCUnsignedLong> {
let mut bytes = Vec::new();
for _ in 0..8 {
bytes.push(read_byte(t).await?);
@ -759,7 +759,7 @@ pub mod numbers {
pub value: f32, // 32-bit floating point number
}
impl MCFloat {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCFloat> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCFloat> {
let mut bytes = Vec::new();
for _ in 0..4 {
bytes.push(read_byte(t).await?);
@ -823,7 +823,7 @@ pub mod numbers {
pub value: f64, // 64-bit floating point number
}
impl MCDouble {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCDouble> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCDouble> {
let mut bytes = Vec::new();
for _ in 0..8 {
bytes.push(read_byte(t).await?);
@ -887,7 +887,7 @@ pub mod numbers {
pub value: i32, // Variable length 32-bit integer
}
impl MCVarInt {
pub async fn read(t: &mut TcpStream) -> std::io::Result<MCVarInt> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<MCVarInt> {
let mut num_read = 0;
let mut result = 0i32;
let mut read = 0u8;

View File

@ -5,8 +5,8 @@ use crate::mctypes::*;
use log::{debug, info};
use packets::*;
use serde_json::json;
use std::net::{TcpListener, TcpStream, ToSocketAddrs};
use std::sync::mpsc::{self, Receiver, TryRecvError};
use tokio::net::{TcpListener, TcpStream, ToSocketAddrs};
/// The part of the server that handles
/// connecting clients and receiving/sending packets.
@ -20,18 +20,24 @@ impl NetworkServer {
/// then hold that in a queue for processing on an update.
pub fn new<A: 'static + ToSocketAddrs + Send>(addr: A) -> NetworkServer {
let (tx, rx) = mpsc::channel();
std::thread::spawn(move || {
let listener = TcpListener::bind(addr).expect("Could not bind to TCP socket");
for (id, stream) in listener.incoming().enumerate() {
if let Ok(s) = stream {
tx.send(NetworkClient {
id: id as u128,
connected: true,
stream: s,
state: NetworkClientState::Handshake,
})
tokio::task::spawn(async move {
let listener = TcpListener::bind(addr)
.await
.expect("Could not bind to TCP socket");
let mut id = 0;
loop {
let (stream, _) = listener
.accept()
.await
.expect("Network receiver disconnected");
}
tx.send(NetworkClient {
id: id as u128,
connected: true,
stream,
state: NetworkClientState::Handshake,
})
.expect("Network receiver disconnected");
id += 1;
}
});
info!("Network server started!");
@ -87,7 +93,8 @@ impl NetworkClient {
pub async fn update(&mut self) {
match self.state {
NetworkClientState::Handshake => {
let (_packet_length, _packet_id) = read_packet_header(&mut self.stream).await.unwrap();
let (_packet_length, _packet_id) =
read_packet_header(&mut self.stream).await.unwrap();
let handshake = Handshake::read(&mut self.stream).await.unwrap();
// Minecraft versions 1.8 - 1.8.9 use protocol version 47.
let compatible_versions = handshake.protocol_version == 47;
@ -109,7 +116,8 @@ impl NetworkClient {
debug!("Got handshake: {:?}", handshake);
}
NetworkClientState::Status => {
let (_packet_length, _packet_id) = read_packet_header(&mut self.stream).await.unwrap();
let (_packet_length, _packet_id) =
read_packet_header(&mut self.stream).await.unwrap();
let statusrequest = StatusRequest::read(&mut self.stream).await.unwrap();
debug!("Got status request: {:?}", statusrequest);
let mut statusresponse = StatusResponse::new();
@ -138,7 +146,8 @@ impl NetworkClient {
.into();
statusresponse.write(&mut self.stream).await.unwrap();
debug!("Sending status response: StatusResponse");
let (_packet_length, _packet_id) = read_packet_header(&mut self.stream).await.unwrap();
let (_packet_length, _packet_id) =
read_packet_header(&mut self.stream).await.unwrap();
let statusping = StatusPing::read(&mut self.stream).await.unwrap();
debug!("Got status ping: {:?}", statusping);
let mut statuspong = StatusPong::new();
@ -148,7 +157,8 @@ impl NetworkClient {
self.state = NetworkClientState::Disconnected;
}
NetworkClientState::Login => {
let (_packet_length, _packet_id) = read_packet_header(&mut self.stream).await.unwrap();
let (_packet_length, _packet_id) =
read_packet_header(&mut self.stream).await.unwrap();
let loginstart = LoginStart::read(&mut self.stream).await.unwrap();
debug!("{:?}", loginstart);
// Offline mode skips encryption and compression.

View File

@ -1,6 +1,6 @@
use crate::mctypes::*;
use std::convert::{Into, TryFrom};
use std::net::TcpStream;
use tokio::net::TcpStream;
#[derive(Debug, Clone)]
pub struct StatusResponse {
@ -28,14 +28,14 @@ impl StatusResponse {
json_response: MCString::from(""),
}
}
pub async fn read(t: &'_ mut TcpStream) -> std::io::Result<Self> {
pub async fn read(t: &'_ mut TcpStream) -> tokio::io::Result<Self> {
let mut statusresponse = StatusResponse::new();
statusresponse.json_response = MCString::read(t).await?;
Ok(statusresponse)
}
pub async fn write(&self, t: &'_ mut TcpStream) -> std::io::Result<()> {
pub async fn write(&self, t: &'_ mut TcpStream) -> tokio::io::Result<()> {
for b in Into::<Vec<u8>>::into(self.clone()) {
write_byte(t, b)?;
write_byte(t, b).await?;
}
Ok(())
}
@ -65,14 +65,14 @@ impl StatusPong {
pub fn new() -> Self {
StatusPong { payload: 0.into() }
}
pub async fn read(t: &mut TcpStream) -> std::io::Result<Self> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<Self> {
let mut statuspong = StatusPong::new();
statuspong.payload = MCLong::read(t).await?;
Ok(statuspong)
}
pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> {
pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> {
for b in Into::<Vec<u8>>::into(self.clone()) {
write_byte(t, b)?;
write_byte(t, b).await?;
}
Ok(())
}
@ -107,15 +107,15 @@ impl LoginSuccess {
username: MCString::from(""),
}
}
pub async fn read(t: &mut TcpStream) -> std::io::Result<Self> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<Self> {
let mut loginsuccess = LoginSuccess::new();
loginsuccess.uuid = MCString::read(t).await?;
loginsuccess.username = MCString::read(t).await?;
Ok(loginsuccess)
}
pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> {
pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> {
for b in Into::<Vec<u8>>::into(self.clone()) {
write_byte(t, b)?;
write_byte(t, b).await?;
}
Ok(())
}
@ -149,16 +149,16 @@ impl LoginDisconnect {
},
}
}
pub async fn read(t: &mut TcpStream) -> std::io::Result<Self> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<Self> {
let mut logindisconnect = LoginDisconnect::new();
logindisconnect.reason = MCChat {
text: MCString::read(t).await?,
};
Ok(logindisconnect)
}
pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> {
pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> {
for b in Into::<Vec<u8>>::into(self.clone()) {
write_byte(t, b)?;
write_byte(t, b).await?;
}
Ok(())
}

View File

@ -6,10 +6,10 @@ pub mod serverbound;
use crate::mctypes::MCVarInt;
pub use clientbound::*;
pub use serverbound::*;
use std::net::TcpStream;
use tokio::net::TcpStream;
/// A helper function to read the packet header.
pub async fn read_packet_header(t: &mut TcpStream) -> std::io::Result<(MCVarInt, MCVarInt)> {
pub async fn read_packet_header(t: &mut TcpStream) -> tokio::io::Result<(MCVarInt, MCVarInt)> {
let length = MCVarInt::read(t).await?;
let id = MCVarInt::read(t).await?;
Ok((length, id))

View File

@ -1,6 +1,6 @@
use crate::mctypes::*;
use std::convert::{Into, TryFrom};
use std::net::TcpStream;
use tokio::net::TcpStream;
/// Needed for every interaction with the server.
#[derive(Debug, Clone)]
@ -38,7 +38,7 @@ impl Handshake {
next_state: 0.into(),
}
}
pub async fn read(t: &mut TcpStream) -> std::io::Result<Self> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<Self> {
let mut handshake = Handshake::new();
handshake.protocol_version = MCVarInt::read(t).await?;
handshake.server_address = MCString::read(t).await?;
@ -46,9 +46,9 @@ impl Handshake {
handshake.next_state = MCVarInt::read(t).await?;
Ok(handshake)
}
pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> {
pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> {
for b in Into::<Vec<u8>>::into(self.clone()) {
write_byte(t, b)?;
write_byte(t, b).await?;
}
Ok(())
}
@ -75,13 +75,13 @@ impl StatusRequest {
pub fn new() -> Self {
StatusRequest {}
}
pub async fn read(_t: &mut TcpStream) -> std::io::Result<Self> {
pub async fn read(_t: &mut TcpStream) -> tokio::io::Result<Self> {
let statusrequest = StatusRequest::new();
Ok(statusrequest)
}
pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> {
pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> {
for b in Into::<Vec<u8>>::into(self.clone()) {
write_byte(t, b)?;
write_byte(t, b).await?;
}
Ok(())
}
@ -111,14 +111,14 @@ impl StatusPing {
pub fn new() -> Self {
StatusPing { payload: 0.into() }
}
pub async fn read(t: &mut TcpStream) -> std::io::Result<Self> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<Self> {
let mut statusping = StatusPing::new();
statusping.payload = MCLong::read(t).await?;
Ok(statusping)
}
pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> {
pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> {
for b in Into::<Vec<u8>>::into(self.clone()) {
write_byte(t, b)?;
write_byte(t, b).await?;
}
Ok(())
}
@ -150,14 +150,14 @@ impl LoginStart {
player_name: "".into(),
}
}
pub async fn read(t: &mut TcpStream) -> std::io::Result<Self> {
pub async fn read(t: &mut TcpStream) -> tokio::io::Result<Self> {
let mut loginstart = LoginStart::new();
loginstart.player_name = MCString::read(t).await?;
Ok(loginstart)
}
pub async fn write(&self, t: &mut TcpStream) -> std::io::Result<()> {
pub async fn write(&self, t: &mut TcpStream) -> tokio::io::Result<()> {
for b in Into::<Vec<u8>>::into(self.clone()) {
write_byte(t, b)?;
write_byte(t, b).await?;
}
Ok(())
}