Fix clippy issues

This commit is contained in:
Garen Tyler 2024-12-09 01:25:24 -07:00
parent 0cbfe045e3
commit 89195ca71c
Signed by: garentyler
SSH Key Fingerprint: SHA256:G4ke7blZMdpWPbkescyZ7IQYE4JAtwpI85YoJdq+S7U
6 changed files with 58 additions and 35 deletions

View File

@ -18,7 +18,7 @@ use config::Subcommand;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::time::Instant; use std::time::Instant;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{info, error}; use tracing::{error, info};
pub const PROTOCOL_VERSION: i32 = 762; pub const PROTOCOL_VERSION: i32 = 762;
pub const GAME_VERSION: &str = "1.19.4"; pub const GAME_VERSION: &str = "1.19.4";

View File

@ -1,3 +1,4 @@
use super::error::Error;
use crate::protocol::{ use crate::protocol::{
packets::{Packet, PacketDirection}, packets::{Packet, PacketDirection},
parsing::Parsable, parsing::Parsable,
@ -8,7 +9,6 @@ use tokio_util::{
bytes::{Buf, BytesMut}, bytes::{Buf, BytesMut},
codec::{Decoder, Encoder}, codec::{Decoder, Encoder},
}; };
use super::error::Error;
use tracing::trace; use tracing::trace;
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
@ -66,9 +66,7 @@ impl Decoder for PacketCodec {
trace!("parsing error: {:02X?}", e.input); trace!("parsing error: {:02X?}", e.input);
Err(Error::Parsing) Err(Error::Parsing)
} }
Err(nom::Err::Failure(_)) => { Err(nom::Err::Failure(_)) => Err(Error::Parsing),
Err(Error::Parsing)
}
} }
} }
} }

View File

@ -42,10 +42,10 @@ impl ConnectionManager {
self.clients.get_mut(&id) self.clients.get_mut(&id)
} }
pub fn clients(&self) -> impl Iterator<Item = &Connection> { pub fn clients(&self) -> impl Iterator<Item = &Connection> {
self.clients.iter().map(|(_id, c)| c) self.clients.values()
} }
pub fn clients_mut(&mut self) -> impl Iterator<Item = &mut Connection> { pub fn clients_mut(&mut self) -> impl Iterator<Item = &mut Connection> {
self.clients.iter_mut().map(|(_id, c)| c) self.clients.values_mut()
} }
pub async fn spawn_listener<A>( pub async fn spawn_listener<A>(
&self, &self,
@ -107,20 +107,24 @@ impl ConnectionManager {
} }
None => { None => {
self.clients.insert(id, connection); self.clients.insert(id, connection);
}, }
} }
} }
Err(mpsc::error::TryRecvError::Disconnected) => return Err(Error::ConnectionChannelDisconnnection), Err(mpsc::error::TryRecvError::Disconnected) => {
return Err(Error::ConnectionChannelDisconnnection)
}
Err(mpsc::error::TryRecvError::Empty) => break, Err(mpsc::error::TryRecvError::Empty) => break,
}; };
} }
// Disconnect any clients that have timed out. // Disconnect any clients that have timed out.
// We don't actually care if the disconnections succeed, // We don't actually care if the disconnections succeed,
// the connection is going to be dropped anyway. // the connection is going to be dropped anyway.
let _ = futures::future::join_all({ let _ = futures::future::join_all({
// Workaround until issue #59618 hash_extract_if gets stabilized. // Workaround until issue #59618 hash_extract_if gets stabilized.
let ids = self.clients.iter() let ids = self
.clients
.iter()
.filter_map(|(id, c)| { .filter_map(|(id, c)| {
if c.received_elapsed() > Duration::from_secs(10) { if c.received_elapsed() > Duration::from_secs(10) {
Some(*id) Some(*id)
@ -132,7 +136,8 @@ impl ConnectionManager {
ids.into_iter() ids.into_iter()
.map(|id| self.clients.remove(&id).unwrap()) .map(|id| self.clients.remove(&id).unwrap())
.map(|client| client.disconnect(None)) .map(|client| client.disconnect(None))
}).await; })
.await;
// Remove disconnected clients. // Remove disconnected clients.
let before = self.clients.len(); let before = self.clients.len();

View File

@ -242,8 +242,8 @@ packets!(
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::protocol::{packets::handshake::serverbound::Handshake, types::VarInt, ClientState};
use super::{Packet, PacketDirection}; use super::{Packet, PacketDirection};
use crate::protocol::{packets::handshake::serverbound::Handshake, types::VarInt, ClientState};
fn get_handshake() -> (Handshake, &'static [u8]) { fn get_handshake() -> (Handshake, &'static [u8]) {
( (
@ -255,18 +255,13 @@ mod tests {
}, },
&[ &[
// Packet length // Packet length
0x10, 0x10, // Packet ID
// Packet ID 0x00, // protocol_version: VarInt
0x00, 0xff, 0x05, // host: String
// protocol_version: VarInt 0x09, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74, // port: u16
0xff, 0x05, 0x63, 0xdd, // next_state: ClientState (VarInt)
// host: String
0x09, 0x6c, 0x6f, 0x63, 0x61, 0x6c, 0x68, 0x6f, 0x73, 0x74,
// port: u16
0x63, 0xdd,
// next_state: ClientState (VarInt)
0x01, 0x01,
] ],
) )
} }
@ -274,7 +269,12 @@ mod tests {
fn packet_parsing_works() { fn packet_parsing_works() {
let (handshake, handshake_bytes) = get_handshake(); let (handshake, handshake_bytes) = get_handshake();
let (rest, packet) = Packet::parse(ClientState::Handshake, PacketDirection::Serverbound, handshake_bytes).unwrap(); let (rest, packet) = Packet::parse(
ClientState::Handshake,
PacketDirection::Serverbound,
handshake_bytes,
)
.unwrap();
assert_eq!(packet, Packet::Handshake(handshake)); assert_eq!(packet, Packet::Handshake(handshake));
assert!(rest.is_empty()); assert!(rest.is_empty());
} }

View File

@ -1,6 +1,6 @@
pub use crate::net::error::Error as NetworkError;
pub use std::io::Error as IoError; pub use std::io::Error as IoError;
pub use tokio::task::JoinError as TaskError; pub use tokio::task::JoinError as TaskError;
pub use crate::net::error::Error as NetworkError;
/// This type represents all possible errors that can occur when running the proxy. /// This type represents all possible errors that can occur when running the proxy.
#[allow(dead_code)] #[allow(dead_code)]

View File

@ -7,11 +7,11 @@ use crate::protocol::ClientState;
use crate::App; use crate::App;
use crate::{config::Config, net::connection::ConnectionManager}; use crate::{config::Config, net::connection::ConnectionManager};
use config::ProxyConfig; use config::ProxyConfig;
use error::{Error, NetworkError};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{info, trace, error, debug}; use tracing::{debug, error, info, trace};
use error::{Error, NetworkError};
#[derive(Debug)] #[derive(Debug)]
pub struct Proxy { pub struct Proxy {
@ -23,14 +23,21 @@ pub struct Proxy {
} }
impl Proxy { impl Proxy {
pub async fn connect_upstream(upstream_address: &str) -> Result<Connection, Error> { pub async fn connect_upstream(upstream_address: &str) -> Result<Connection, Error> {
let upstream = TcpStream::connect(upstream_address).await.map_err(Error::Io)?; let upstream = TcpStream::connect(upstream_address)
.await
.map_err(Error::Io)?;
Ok(Connection::new_server(0, upstream)) Ok(Connection::new_server(0, upstream))
} }
pub fn rewrite_packet(packet: Packet) -> Packet { pub fn rewrite_packet(packet: Packet) -> Packet {
match packet { match packet {
Packet::StatusResponse(mut status) => { Packet::StatusResponse(mut status) => {
let new_description = ProxyConfig::default().version.clone(); let new_description = ProxyConfig::default().version.clone();
*status.response.as_object_mut().unwrap().get_mut("description").unwrap() = serde_json::Value::String(new_description); *status
.response
.as_object_mut()
.unwrap()
.get_mut("description")
.unwrap() = serde_json::Value::String(new_description);
Packet::StatusResponse(status) Packet::StatusResponse(status)
} }
p => p, p => p,
@ -52,7 +59,7 @@ impl App for Proxy {
async fn new(running: CancellationToken) -> Result<Self, Self::Error> { async fn new(running: CancellationToken) -> Result<Self, Self::Error> {
let config = Config::instance(); let config = Config::instance();
let bind_address = format!("0.0.0.0:{}", config.proxy.port); let bind_address = format!("0.0.0.0:{}", config.proxy.port);
// Only allow one client to join at a time. // Only allow one client to join at a time.
let connections = ConnectionManager::new(Some(1)); let connections = ConnectionManager::new(Some(1));
let listener = connections let listener = connections
@ -60,7 +67,10 @@ impl App for Proxy {
.await .await
.map_err(Error::Network)?; .map_err(Error::Network)?;
let upstream_address = format!("{}:{}", config.proxy.upstream_host, config.proxy.upstream_port); let upstream_address = format!(
"{}:{}",
config.proxy.upstream_host, config.proxy.upstream_port
);
info!("Upstream server: {}", upstream_address); info!("Upstream server: {}", upstream_address);
let upstream = Proxy::connect_upstream(&upstream_address).await?; let upstream = Proxy::connect_upstream(&upstream_address).await?;
@ -81,7 +91,7 @@ impl App for Proxy {
}; };
let mut client_parsing_error = false; let mut client_parsing_error = false;
// At the same time, try to read packets from the server and client. // At the same time, try to read packets from the server and client.
// Forward the packet onto the other. // Forward the packet onto the other.
tokio::select! { tokio::select! {
@ -129,7 +139,13 @@ impl App for Proxy {
let id = client.id(); let id = client.id();
// Drop the &mut Connection // Drop the &mut Connection
let _ = client; let _ = client;
let _ = self.connections.disconnect(id, Some(serde_json::json!({ "text": "Received malformed data." }))).await; let _ = self
.connections
.disconnect(
id,
Some(serde_json::json!({ "text": "Received malformed data." })),
)
.await;
} }
if self.upstream.client_state() == ClientState::Disconnected { if self.upstream.client_state() == ClientState::Disconnected {
// Start a new connection with the upstream server. // Start a new connection with the upstream server.
@ -144,7 +160,11 @@ impl App for Proxy {
self.running.cancel(); self.running.cancel();
let _ = self.listener.await.map_err(Error::Task)?; let _ = self.listener.await.map_err(Error::Task)?;
let _ = self.connections.shutdown(None).await.map_err(Error::Network)?; let _ = self
.connections
.shutdown(None)
.await
.map_err(Error::Network)?;
Ok(()) Ok(())
} }