diff --git a/src/cli/args.rs b/src/cli/args.rs index 9d0104f..c8aff93 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -184,7 +184,7 @@ impl Args { listen_address, }) => { println!("Serve with address '{listen_address:?}' and '{port:?}'"); - relay::start_ws(port.as_ref(), listen_address.as_ref()).await; + relay::server::start_ws(port.as_ref(), listen_address.as_ref()).await; } // If the user does not specify a command, return early with no error. // This is because `command` is an `Option`. If the user does diff --git a/src/relay/appstate.rs b/src/relay/appstate.rs new file mode 100644 index 0000000..088b50c --- /dev/null +++ b/src/relay/appstate.rs @@ -0,0 +1,49 @@ +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; + +use crate::relay::room::Room; + +/// A struct that holds all of the rooms that the server knows about. +/// +/// The rooms are stored in a `HashMap` with the room ID as the key and the +/// room as the value. This means that looking up a room by its ID is an O(1) +/// operation, which is very fast. +#[derive(Debug)] +pub struct AppState { + pub rooms: HashMap, +} + +impl AppState { + /// Creates a new `Server` with an empty list of rooms. + /// + /// The `rooms` field of the returned `Server` is an empty `HashMap`. + /// This means that the server will not have any rooms when it is first + /// created. + /// + /// This function returns an `Arc>` because the server + /// needs to be shared between different parts of the program. The + /// `Arc` makes it so that the server can be shared by multiple threads, + /// and the `RwLock` makes it so that the server can be read from and + /// written to from multiple threads at the same time. + /// + /// The `Arc` and `RwLock` are both parts of the `tokio` library, which + /// provides asynchronous programming tools for Rust. + /// + /// The `Arc` and `RwLock` are used together to create a Mutex-like + /// object that can be shared between threads. The main difference + /// between a Mutex and an `Arc>` is that a Mutex can only be + /// locked by one thread at a time, while an `Arc>` can be + /// locked by multiple threads at the same time. + /// + /// This function is used to create a new `Server` and share it between + /// different parts of the program. The `Server` is shared because it + /// needs to be able to handle connections from multiple clients at the + /// same time. + pub fn new() -> Arc> { + // Create a new `Server` instance. + Arc::new(RwLock::new(AppState { + // Initialize the list of rooms to be empty. + rooms: HashMap::new(), + })) + } +} diff --git a/src/relay/client.rs b/src/relay/client.rs new file mode 100644 index 0000000..58a6064 --- /dev/null +++ b/src/relay/client.rs @@ -0,0 +1,490 @@ +use axum::extract::ws::Message; +use futures_util::{future::join_all, stream::SplitSink, SinkExt}; +use std::{sync::Arc, vec}; +use tokio::{sync::Mutex, sync::RwLock}; +use tracing::error; + +use crate::relay::appstate::AppState; +use crate::relay::room::Room; +use crate::relay::RequestPacket; +use crate::relay::ResponsePacket; +use uuid::Uuid; + +type Sender = Arc>>; +/// This struct represents a single client connection to the server. +/// +/// A `Client` instance holds a `Sender` and a `room_id`. +/// +/// The `Sender` is a type alias for a `tokio::sync::mpsc::Sender`. +/// It is used to send messages to the client. +/// +/// The `room_id` is an `Option`. It is used to keep track of which +/// room the client is currently in. If the `room_id` is `None`, then the +/// client is not in any room. If the `room_id` is `Some(id)`, where `id` is a +/// `String`, then the client is in the room with the ID `id`. +/// +/// The `room_id` is used to keep track of which room the client is in so +/// that the server knows which room to send messages to. When a client +/// joins a room, their `room_id` is set to the ID of the room that they +/// joined. When a client leaves a room, their `room_id` is set to `None`. +/// +/// The `Client` struct is used to keep track of which room each client is +/// in. It is used by the `Server` to determine which room to send messages +/// to. +/// +#[derive(Debug)] +pub struct Client { + sender: Sender, + room_id: Option, +} + +impl Client { + /// Creates a new `Client` instance. + /// + /// The `sender` argument is a `Sender` for sending messages to the client. + /// It is used by the `Server` to send messages to the client. + /// + /// The `room_id` field of the `Client` instance is set to `None` initially. + /// This is because the client is not in any room when they first connect + /// to the server. + /// + /// The `sender` field of the `Client` instance is used to send messages to + /// the client. When the server wants to send a message to the client, it + /// uses the `sender` to send the message. + /// + /// The `Client` instance is used by the `Server` to keep track of which + /// room each client is in. It is used by the `Server` to determine which + /// room to send messages to. + pub fn new(sender: Sender) -> Client { + Client { + sender, + room_id: None, + } + } + + /// Sends a message to a client. + /// + /// This function takes a `sender` argument, which is a `Mutex` guard + /// for a WebSocket connection. The `sender` is used to send a message + /// to the client. + /// + /// The `message` argument is the message that is sent to the client. It + /// is a WebSocket message. + /// + /// This function locks the `sender` Mutex to ensure that only one thread + /// can send a message at a time. This is because the SplitSink that the + /// `sender` mutex guards is not thread-safe, and sending a message from + /// multiple threads could result in the messages being sent out of order. + /// + /// If sending the message fails, this function logs an error message. + async fn send(&self, sender: Sender, message: Message) { + let mut sender = sender.lock().await; + if let Err(error) = sender.send(message).await { + error!("Failed to send message to the client: {}", error); + } + } + + /// Sends a packet to a client. + /// + /// This function takes a `sender` argument, which is a `Mutex` guard + /// for a WebSocket connection. The `sender` is used to send a message + /// to the client. + /// + /// The `packet` argument is the packet that is sent to the client. It + /// is a struct that contains the data that is being sent. + /// + /// This function serializes the `packet` using serde_json and sends it + /// to the client as a WebSocket Text message. + /// + /// This function locks the `sender` Mutex to ensure that only one thread + /// can send a message at a time. This is because the SplitSink that the + /// `sender` mutex guards is not thread-safe, and sending a message from + /// multiple threads could result in the messages being sent out of order. + async fn send_packet(&self, sender: Sender, packet: ResponsePacket) { + let serialized_packet = serde_json::to_string(&packet).unwrap(); + + self.send(sender, Message::Text(serialized_packet)).await; + } + + /// Sends an error packet to a client. + /// + /// This function takes a `sender` argument, which is a `Mutex` guard + /// for a WebSocket connection. The `sender` is used to send a message + /// to the client. + /// + /// The `message` argument is the message that is sent to the client. It + /// is a string that describes the error. + /// + /// This function creates an error packet with the `message` and sends it + /// to the client using the `send_packet` function. + /// + /// This function locks the `sender` Mutex to ensure that only one thread + /// can send a message at a time. This is because the SplitSink that the + /// `sender` mutex guards is not thread-safe, and sending a message from + /// multiple threads could result in the messages being sent out of order. + async fn send_error_packet(&self, sender: Sender, message: String) { + let error_packet = ResponsePacket::Error { message }; + + self.send_packet(sender, error_packet).await + } + + /// Handles a CreateRoom request from a client. + /// + /// This function is called when a client sends a CreateRoom request to + /// the server. The server will create a new room with the specified + /// size and return the room's identifier to the client. + /// + /// This function takes a `server` argument, which is a `RwLock` + /// guard for the server's state. The `server` is used to check if the + /// current client is already in a room, and to insert the new room into + /// the server's state. + /// + /// If the current client is already in a room, this function returns + /// without doing anything. This is to prevent a client from being in + /// multiple rooms at the same time. + /// + /// If there is already a room with the same identifier as the one that + /// is being created, this function sends an error packet to the client + /// and returns. + /// + /// If there is no existing room with the same identifier, this function + /// creates a new room with the specified size and inserts it into the + /// server's state. It then sends a CreateRoom response packet to the + /// client with the room's identifier. + /// + /// This function locks the `server` RwLock to ensure that only one + /// thread can access the server's state at a time. This is because the + /// server's state is not thread-safe, and accessing it from multiple + /// threads could result in undefined behavior. + async fn handle_create_room(&mut self, server: &RwLock) { + let mut server = server.write().await; + + // If the current client is already in a room, do nothing. + if server.rooms.iter().any(|(_, room)| { + room.senders + .iter() + .any(|sender| Arc::ptr_eq(sender, &self.sender)) + }) { + return; + } + + // Generate a new room identifier. + let size = Room::DEFAULT_ROOM_SIZE; + let room_id = Uuid::new_v4().to_string(); + + // If there is already a room with the same identifier, send an error + // packet to the client and return. + if server.rooms.contains_key(&room_id) { + drop(server); + + return self + .send_error_packet( + self.sender.clone(), + "A room with that identifier already exists.".to_string(), + ) + .await; + } + + // Create a new room with the specified size and insert it into the + // server's state. + let mut room = Room::new(size); + room.senders.push(self.sender.clone()); + + server.rooms.insert(room_id.clone(), room); + + // Set the client's room ID to the new room's identifier. + self.room_id = Some(room_id.clone()); + + drop(server); + + // Send a CreateRoom response packet to the client with the room's + // identifier. + self.send_packet(self.sender.clone(), ResponsePacket::Create { id: room_id }) + .await + } + + /// This function is called when the client sends a JoinRoom packet. + /// + /// If the client is already in a room, then this function does nothing. + /// + /// If the client is not in a room, then the function checks if the room + /// specified in the packet exists. If the room does not exist, an error + /// packet is sent to the client with a message indicating that the room + /// does not exist. + /// + /// If the room does exist, then the function checks if the room is full. + /// If the room is full, an error packet is sent to the client with a + /// message indicating that the room is full. + /// + /// If the room is not full, then the client is added to the room and the + /// function sends a JoinRoom response packet to the client with the size + /// of the room (excluding the client itself) and a `size` field set to + /// `None`. The response packet is sent to all other clients in the room. + async fn handle_join_room(&mut self, server: &RwLock, room_id: String) { + let mut server = server.write().await; + + // If the client is already in a room, do nothing. + if server.rooms.iter().any(|(_, room)| { + room.senders + .iter() + .any(|sender| Arc::ptr_eq(sender, &self.sender)) + }) { + return; + } + + // Get a mutable reference to the room specified in the packet. + // If the room does not exist, return an error to the client. + let Some(room) = server.rooms.get_mut(&room_id) else { + drop(server); + + return self + .send_error_packet(self.sender.clone(), "The room does not exist.".to_string()) + .await; + }; + + // If the room is full, return an error to the client. + if room.senders.len() >= room.size { + drop(server); + + return self + .send_error_packet(self.sender.clone(), "The room is full.".to_string()) + .await; + } + + // Add the client to the room and set the client's room ID to the new + // room's identifier. + room.senders.push(self.sender.clone()); + self.room_id = Some(room_id); + + // Create a list of futures to send JoinRoom response packets to all + // other clients in the room. The `size` field of the response packet is + // set to `None` if the client sending the packet is the one joining the + // room. Otherwise, the `size` field is set to the number of clients in + // the room minus one (to exclude the client joining the room). + let mut futures = vec![]; + for sender in &room.senders { + if Arc::ptr_eq(sender, &self.sender) { + futures.push(self.send_packet( + sender.clone(), + ResponsePacket::Join { + size: Some(room.senders.len() - 1), + }, + )); + } else { + futures.push(self.send_packet(sender.clone(), ResponsePacket::Join { size: None })); + } + } + + drop(server); + join_all(futures).await; + } + + /// Handles a request to leave a room. + /// + /// This function is called when a client sends a `LeaveRoom` request + /// packet. The function obtains a write lock on the server's state and + /// does the following: + /// + /// 1. Gets the room ID of the client who sent the request. If the client is + /// not in a room, the function returns early. + /// 2. Tries to get a mutable reference to the room with the obtained room + /// ID. If the room does not exist, the function returns early. + /// 3. Finds the index of the client's sender in the room's list of senders. + /// If the client is not in the room, the function returns early. + /// 4. Removes the client's sender from the room's list of senders. + /// 5. Sets the client's room ID to `None`. + /// 6. Creates a list of futures to send `LeaveRoom` response packets to + /// all other clients in the room. The `index` field of the response + /// packet is set to the index of the client's sender in the room's list + /// of senders. + /// 7. If the room is now empty, removes the room from the server's list + /// of rooms. + /// 8. Drops the write lock on the server's state. + /// 9. Waits for all futures to complete. + async fn handle_leave_room(&mut self, server: &RwLock) { + // Obtain a write lock on the server's state. + let mut server = server.write().await; + + // Get the room ID of the client who sent the request. + let Some(room_id) = self.room_id.clone() else { + // If the client is not in a room, return early. + return; + }; + + // Try to get a mutable reference to the room with the obtained room ID. + let Some(room) = server.rooms.get_mut(&room_id) else { + // If the room does not exist, return early. + return; + }; + + // Find the index of the client's sender in the room's list of senders. + let Some(index) = room + .senders + .iter() + .position(|sender| Arc::ptr_eq(sender, &self.sender)) + else { + // If the client is not in the room, return early. + return; + }; + + // Remove the client's sender from the room's list of senders. + room.senders.remove(index); + + // Set the client's room ID to `None`. + self.room_id = None; + + // Create a list of futures to send `LeaveRoom` response packets to + // all other clients in the room. The `index` field of the response + // packet is set to the index of the client's sender in the room's list + // of senders. + let mut futures = vec![]; + for sender in &room.senders { + futures.push(self.send_packet(sender.clone(), ResponsePacket::Leave { index })); + } + + // If the room is now empty, removes the room from the server's list + // of rooms. + if room.senders.is_empty() { + server.rooms.remove(&room_id); + } + + // Drop the write lock on the server's state. + drop(server); + + // Wait for all futures to complete. + join_all(futures).await; + } + + /// This function handles an incoming message from a client. + /// + /// The message can be one of four types: `Text`, `Binary`, `Ping`, or `Close`. + /// + /// If the message is `Text`, the function parses the message as a `RequestPacket` and + /// calls the appropriate function to handle the request. If the message cannot be + /// parsed as a `RequestPacket`, the function does nothing and returns early. + /// + /// If the message is `Binary`, the function first acquires a read lock on the server's + /// state. If the client is not currently in a room, the function drops the read lock and + /// returns early. If the client is not in a room, or if the room does not exist, the + /// function drops the read lock and returns early. + /// + /// The function then finds the index of the client's sender in the room's list of + /// senders. If the client's sender is not in the room's list of senders, the function + /// drops the read lock and returns early. + /// + /// The function then gets the binary data from the message and sets the first byte to + /// the index of the client's sender in the room's list of senders. If there is no + /// binary data in the message, the function drops the read lock and returns early. + /// + /// The function then determines where to send the message. If the first byte of the + /// message is less than the number of clients in the room, the function sends the message + /// to the client at that index in the room's list of senders. If the first byte of the + /// message is equal to the number of clients in the room plus one, the function sends the + /// message to all clients in the room, excluding the client that sent the message. + /// + /// If the first byte of the message is any other value, the function drops the read + /// lock and returns early. + /// + /// Finally, the function drops the read lock and waits for all futures to complete. + /// + /// If the message is `Ping`, the function prints a message to stdout. + /// + /// If the message is `Pong`, the function prints a message to stdout. + /// + /// If the message is `Close`, the function prints a message to stdout and calls the + /// `handle_close` function. + pub async fn handle_message(&mut self, server: &RwLock, message: Message) { + match message { + Message::Text(text) => { + let packet = match serde_json::from_str(&text) { + Ok(packet) => packet, + Err(_) => return, + }; + match packet { + RequestPacket::Create => self.handle_create_room(server).await, + RequestPacket::Join { id } => self.handle_join_room(server, id).await, + RequestPacket::Leave => self.handle_leave_room(server).await, + } + } + Message::Binary(_) => { + // Acquire a read lock on the server's state. + let server = server.read().await; + + // If the client is not currently in a room, return early. + let Some(room_id) = &self.room_id else { + drop(server); + return; + }; + + // If the room does not exist, return early. + let Some(room) = server.rooms.get(room_id) else { + drop(server); + return; + }; + + // Find the index of the client's sender in the room's list of senders. + let Some(index) = room + .senders + .iter() + .position(|sender| Arc::ptr_eq(sender, &self.sender)) + else { + drop(server); + return; + }; + + // Get the binary data from the message and set the first byte to + // the index of the client's sender in the room's list of senders. + let mut data = message.into_data(); + if data.is_empty() { + drop(server); + return; + } + + let source = u8::try_from(index).unwrap(); + + // Determine where to send the message. + let destination = usize::from(data[0]); + data[0] = source; + + // Send the message to the client at the destination index in the + // room's list of senders. + if destination < room.senders.len() { + let sender = room.senders[destination].clone(); + + drop(server); + return self.send(sender, Message::Binary(data)).await; + } + + // Send the message to all clients in the room, excluding the + // client that sent the message. + if destination == usize::from(u8::MAX) { + let mut futures = vec![]; + for sender in &room.senders { + if Arc::ptr_eq(sender, &self.sender) { + continue; + } + + futures.push(self.send(sender.clone(), Message::Binary(data.clone()))); + } + + drop(server); + join_all(futures).await; + } + } + Message::Ping(_) => { + println!("Got Message Type Ping"); + } + Message::Pong(_) => { + println!("Got Message Type Pong"); + } + Message::Close(_) => { + println!("Got Message Type Close"); + self.handle_close(server).await; + } + } + } + + pub async fn handle_close(&mut self, server: &RwLock) { + self.handle_leave_room(server).await + } +} diff --git a/src/relay/mod.rs b/src/relay/mod.rs index 90f096a..fa42e40 100644 --- a/src/relay/mod.rs +++ b/src/relay/mod.rs @@ -1,274 +1,69 @@ -/// This function starts the WebSocket server. -/// -/// It configures the server to listen on the specified host and port. If -/// these values are not specified in the environment, it falls back to using -/// the defaults of "0.0.0.0" for the host and "8000" for the port. -/// -/// It then sets up the application routes for the server. In this case, the -/// only route is for the WebSocket connection. -/// -/// The WebSocket route requires a `ConnectInfo` extractor to get the client's -/// IP address, which is then used to store the client in a data structure -/// keyed by their IP address. This allows for efficient lookup of clients by -/// their IP address. -/// -/// Finally, it starts the server by binding to the specified host and port, -/// and running the application. If the server fails to bind to the specified -/// host and port, it logs an error and exits. +pub mod appstate; +pub mod client; +pub mod room; pub mod server; -use axum::{ - extract::{ws::WebSocket, State, WebSocketUpgrade}, - response::IntoResponse, - routing::get, - Router, -}; -use futures_util::StreamExt; -use std::{env, net::SocketAddr, sync::Arc}; -use tokio::{ - net::TcpListener, - signal, - sync::{Mutex, RwLock}, -}; -use tower_http::trace::{DefaultMakeSpan, TraceLayer}; -use tracing::{debug, error, info, warn}; +use serde::{Deserialize, Serialize}; -use self::server::Client; - -/// This function starts the WebSocket server. -/// -/// It retrieves the environment variables that define how the server should -/// be configured. If any of these variables are not defined, it sets a -/// reasonable default value. -/// -/// The environment variables are: -/// -/// * `APP_ENVIRONMENT`: the environment the server is running in (defaults -/// to "development"). -/// * `APP_HOST`: the host the server should listen on (defaults to "0.0.0.0"). -/// * `APP_PORT`: the port the server should listen on (defaults to "8000"). -/// * `APP_DOMAIN`: the domain the server is accessible at (defaults to ""). -/// -/// It then sets up the application routes for the server. In this case, the -/// only route is for the WebSocket connection. -/// -/// The WebSocket route requires a `ConnectInfo` extractor to get the client's -/// IP address, which is then used to store the client in a data structure -/// keyed by their IP address. This allows for efficient lookup of clients by -/// their IP address. -/// -/// Finally, it starts the server by binding to the specified host and port, -/// and running the application. If the server fails to bind to the specified -/// host and port, it logs an error and exits. -pub async fn start_ws(port: Option<&i32>, listen_addr: Option<&String>) { - // Retrieve environment variables and set defaults if necessary. - let app_environemt = env::var("APP_ENVIRONMENT").unwrap_or("development".to_string()); - let app_host = match listen_addr { - Some(address) => address.to_string(), - None => env::var("APP_HOST").unwrap_or("0.0.0.0".to_string()), - }; - let app_port = match port { - Some(port) => port.to_string(), - None => env::var("APP_PORT").unwrap_or("8000".to_string()), - }; - - // Log information about the server's configuration. - debug!("Server configured to accept connections on host {app_host}...",); - debug!("Server configured to listen connections on port {app_port}...",); - - // Based on the environment variable, set the logging level. - match app_environemt.as_str() { - "development" => { - debug!("Running in development mode"); - } - "production" => { - debug!("Running in production mode"); - } - _ => { - debug!("Running in development mode"); - } - } - - // Create a new server data structure. - let server = server::AppState::new(); - - // Set up the application routes. - let app = Router::new() - .route("/ws", get(ws_handler)) - .with_state(server) - .layer( - TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::default().include_headers(true)), - ); - - // Attempt to bind to the specified host and port. - if let Ok(listener) = TcpListener::bind(&format!("{}:{}", app_host, app_port)).await { - // Log successful binding. - info!("Listening on: {}", listener.local_addr().unwrap()); - - // Run the server. - axum::serve( - listener, - app.into_make_service_with_connect_info::(), - ) - .with_graceful_shutdown(shutdown_signal()) - .await - .unwrap(); - } else { - // Log binding failure and exit. - error!("Failed to listen on: {}:{}", app_host, app_port); - } +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +// This enum is used to represent the different types of requests that a client +// can send to the server. +// +// The requests that a client can send are: +// +// * Join: A request to join a room. The request contains the ID of the room +// that the client wants to join. +// * Create: A request to create a new room. +// * Leave: A request to leave the current room. +pub enum RequestPacket { + Join { + // The ID of the room that the client wants to join. + id: String, + }, + Create, + Leave, } -/// This function is an endpoint for the WebSocket route. +/// This enum is used to represent the different types of responses that the +/// server can send to the client. /// -/// This function is called whenever a client makes a WebSocket request to -/// the `/ws` endpoint. +/// The responses that the server can send are: /// -/// The function takes two arguments: -/// -/// - `ws`: This is the WebSocketUpgrade object, which is used to upgrade the -/// HTTP connection to a WebSocket connection. -/// - `State(shared_state)`: This is the state of the server, which is stored -/// in a read-write lock. The state is shared between all WebSocket -/// connections. -/// - `ConnectInfo(addr)`: This is the information about the client that -/// connected to the server. The function uses this information to log the -/// address of the client that connected to the server. -/// -/// The function upgrades the HTTP connection to a WebSocket connection using -/// the `ws` argument. It then passes the upgraded WebSocket connection, along -/// with the state of the server, to the `handle_socket` function. -/// -/// The `handle_socket` function is defined in the `src/relay/mod.rs` file. It -/// is the function that handles the WebSocket connection. -/// -/// The `handle_socket` function takes three arguments: -/// -/// - `socket`: This is the WebSocket connection that it should handle. -/// - `who`: This is the address of the client that connected to the server. -/// - `rooms`: This is the state of the server, which is stored in a read-write -/// lock. The state is shared between all WebSocket connections. -/// -/// The `handle_socket` function handles the WebSocket connection by calling -/// the `handle_message` function on a `Client` object that it creates. The -/// `handle_message` function is defined in the `src/relay/client.rs` file. The -/// `handle_message` function handles incoming messages from the client and -/// takes care of sending the appropriate response back to the client. -pub async fn ws_handler( - ws: WebSocketUpgrade, - State(shared_state): State>>, - // ConnectInfo(addr): ConnectInfo, -) -> impl IntoResponse { - debug!("Got Request on Websocket route"); - // debug!("WebSocket connection established from:{}", addr.to_string()); - debug!("Upgrading Connection"); - ws.on_upgrade(move |socket| handle_socket(socket, shared_state)) -} - -/// This function is called when a new WebSocket connection is established. -/// The function takes three arguments: -/// -/// - `socket`: This is the WebSocket connection that it should handle. -/// - `who`: This is the address of the client that connected to the server. -/// - `rooms`: This is the state of the server, which is stored in a read-write -/// lock. The state is shared between all WebSocket connections. -/// -/// The function creates a `Client` object, which will handle the WebSocket -/// connection. The `Client` object is created with an Arc-wrapped Mutex -/// containing the `sender` of the WebSocket connection. The `sender` is used to -/// send messages to the client. -/// -/// The function then creates a new `split` of the WebSocket connection, which -/// is a pair of a `sender` and a `receiver`. The `sender` is used to send -/// messages to the client, and the `receiver` is used to receive messages from -/// the client. The `receiver` is wrapped in a `Stream` (which is an async -/// iterator) so that the function can use the `next` method to receive messages -/// from the client. -/// -/// The function then enters a loop that receives incoming messages from the -/// client and handles them. For each received message, the function calls the -/// `handle_message` method on the `Client` object that it created. The -/// `handle_message` method is defined in the `src/relay/client.rs` file. The -/// `handle_message` method handles incoming messages from the client and -/// takes care of sending the appropriate response back to the client. -/// -/// If the function encounters an error while reading a message from the -/// client, it logs the error and breaks out of the loop. -/// -/// After the loop finishes (either because an error occurred or because the -/// client disconnected), the function calls the `handle_close` method on the -/// `Client` object that it created. The `handle_close` method is defined in the -/// `src/relay/client.rs` file. The `handle_close` method handles the close event -/// from the client. -async fn handle_socket(socket: WebSocket, rooms: Arc>) { - let (sender, mut receiver) = socket.split(); - - let sender = Arc::new(Mutex::new(sender)); - let mut client = Client::new(sender.clone()); - while let Some(message) = receiver.next().await { - match message { - Ok(message) => { - client.handle_message(&rooms, message).await; - } - Err(error) => { - warn!("Failed to read message from client: {}", error); - break; - } - } - } - // Handle the close event from the client. - client.handle_close(&rooms).await -} - -/// This function sets up a signal handler for SIGINT (Ctrl+C) and SIGTERM -/// (terminate) on Unix platforms. It does nothing on non-Unix platforms. -/// -/// The function installs two signal handlers: one for SIGINT and one for -/// SIGTERM. When either of these signals is received, the signal handler -/// simply resolves the future with `()`. This allows the main function to -/// wait for the signal handler to trigger a shutdown. -/// -/// The function uses the `tokio::select!` macro to wait for either of the -/// signal handlers to resolve. When the future returned by `tokio::select!` -/// resolves, the function simply drops the value and does nothing else. -/// -/// The function does not actually do anything itself. It simply waits for -/// one of the signal handlers to trigger a shutdown. -async fn shutdown_signal() { - // Install a signal handler for SIGINT (Ctrl+C). This future resolves - // when the user presses Ctrl+C. - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); - }; - - // Install a signal handler for SIGTERM (terminate). This future - // resolves when the operating system sends a SIGTERM signal to the - // program. - #[cfg(unix)] - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - }; - - // If we are not on a Unix platform, we don't need to install a signal - // handler for SIGTERM. Instead, we create a future that never resolves. - #[cfg(not(unix))] - let terminate = std::future::pending::<()>(); - - // Wait for either of the two signal handlers to resolve. When one of them - // resolves, the other one may still be waiting, but it doesn't matter - // because we don't need to do anything else. - tokio::select! { - // If the Ctrl+C signal handler resolves, drop the value and do - // nothing else. - _ = ctrl_c => {}, - // If the terminate signal handler resolves, drop the value and do - // nothing else. - _ = terminate => {}, - } +/// * Join: A response to a `Join` request from the client. If the client +/// successfully joined a room, the `size` field will be `Some` and contain +/// the size of the room. If the client could not join a room, the `size` field +/// will be `None`. +/// * Create: A response to a `Create` request from the client. If the server +/// successfully created a room, the `id` field will contain the ID of the +/// room. If the server could not create a room, the `id` field will be empty. +/// * Leave: A response to a `Leave` request from the client. If the client +/// successfully left a room, the `index` field will contain the index of the +/// client that left the room. If the client could not leave a room, the +/// `index` field will be 0. +/// * Error: A response to indicate that an error occurred. The `message` +/// field will contain a description of the error. +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "camelCase")] +pub enum ResponsePacket { + Join { + /// The size of the room that the client joined. If the client could + /// not join a room, this field will be `None`. + #[serde(skip_serializing_if = "Option::is_none")] + size: Option, + }, + Create { + /// The ID of the room that the server created. If the server could + /// not create a room, this field will be empty. + id: String, + }, + Leave { + /// The index of the client that left the room. If the client could not + /// leave a room, this field will be 0. + index: usize, + }, + Error { + /// A description of the error that occurred. + message: String, + }, } diff --git a/src/relay/room.rs b/src/relay/room.rs new file mode 100644 index 0000000..cea60f3 --- /dev/null +++ b/src/relay/room.rs @@ -0,0 +1,73 @@ +use axum::extract::ws::Message; +use futures_util::stream::SplitSink; +use std::sync::Arc; +use tokio::sync::Mutex; + +// A type alias for a sender to a WebSocket connection. +// +// The sender is a mutex-guarded, split sink of a WebSocket stream and Message +// values. It is used to send messages to a client. +// +// The Mutex is used to ensure that only one thread can send a message at a +// time. This is because the SplitSink is not thread-safe, and sending a +// message from multiple threads could result in the messages being sent +// out of order. +// +// The SplitSink is used to send messages to a client. It is the part of the +// WebSocket stream that handles the sending of messages. +// +// The WebSocket stream is the underlying connection to the client. It is used +// to send and receive messages. +// +// The Message value is the type of data that is sent over the WebSocket +// connection. It is a struct that contains the data that is being sent. +// +// The type alias is used so that the type is not mentioned every time it is +// used. This makes the code easier to read and understand. +type Sender = Arc>>; + +/// A `Room` is a collection of clients that are connected to each other. +/// +/// Each room has a set of clients, represented by a `Vec` of `Sender` +/// instances. The `Sender` instances are used to send messages to the +/// clients in the room. +/// +/// The `senders` field is the list of senders that are connected to each +/// other. Each sender is a mutex-guarded, split sink of a WebSocket +/// stream and Message values. This is explained in more detail in the +/// documentation for the `Sender` type alias in the `packets` module. +/// +/// The `size` field is the maximum number of clients that a room can have. +/// When a room reaches its maximum size, no more clients can join the room. +/// This is used to prevent rooms from getting too full and causing the +/// server to run out of memory. +#[derive(Debug)] +pub struct Room { + pub senders: Vec, + pub size: usize, +} + +impl Room { + /// The default size of a room. + /// + /// This is the size that a room will have when it is created. + pub const DEFAULT_ROOM_SIZE: usize = 2; + + /// Creates a new `Room` with the given size. + /// + /// The `size` parameter is the maximum number of clients that can join the + /// room. If `size` is 0, then the room will not be able to hold any + /// clients. + /// + /// The `senders` field of the returned `Room` is an empty vector. + /// + /// The `size` field of the returned `Room` is `size`. + pub fn new(size: usize) -> Room { + Room { + // Initialize the list of senders to be empty. + senders: Vec::new(), + // Set the size of the room. + size, + } + } +} diff --git a/src/relay/server.rs b/src/relay/server.rs index 055c7bd..cc63c9b 100644 --- a/src/relay/server.rs +++ b/src/relay/server.rs @@ -1,663 +1,274 @@ -use axum::extract::ws::Message; -use futures_util::{future::join_all, stream::SplitSink, SinkExt}; -use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, sync::Arc, vec}; -use tokio::{sync::Mutex, sync::RwLock}; -use tracing::error; - -use uuid::Uuid; - -// A type alias for a sender to a WebSocket connection. -// -// The sender is a mutex-guarded, split sink of a WebSocket stream and Message -// values. It is used to send messages to a client. -// -// The Mutex is used to ensure that only one thread can send a message at a -// time. This is because the SplitSink is not thread-safe, and sending a -// message from multiple threads could result in the messages being sent -// out of order. -// -// The SplitSink is used to send messages to a client. It is the part of the -// WebSocket stream that handles the sending of messages. -// -// The WebSocket stream is the underlying connection to the client. It is used -// to send and receive messages. -// -// The Message value is the type of data that is sent over the WebSocket -// connection. It is a struct that contains the data that is being sent. -// -// The type alias is used so that the type is not mentioned every time it is -// used. This makes the code easier to read and understand. -type Sender = Arc>>; - -#[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "camelCase")] -// This enum is used to represent the different types of requests that a client -// can send to the server. -// -// The requests that a client can send are: -// -// * Join: A request to join a room. The request contains the ID of the room -// that the client wants to join. -// * Create: A request to create a new room. -// * Leave: A request to leave the current room. -pub enum RequestPacket { - Join { - // The ID of the room that the client wants to join. - id: String, - }, - Create, - Leave, -} - -/// This enum is used to represent the different types of responses that the -/// server can send to the client. +/// This function starts the WebSocket server. /// -/// The responses that the server can send are: +/// It configures the server to listen on the specified host and port. If +/// these values are not specified in the environment, it falls back to using +/// the defaults of "0.0.0.0" for the host and "8000" for the port. /// -/// * Join: A response to a `Join` request from the client. If the client -/// successfully joined a room, the `size` field will be `Some` and contain -/// the size of the room. If the client could not join a room, the `size` field -/// will be `None`. -/// * Create: A response to a `Create` request from the client. If the server -/// successfully created a room, the `id` field will contain the ID of the -/// room. If the server could not create a room, the `id` field will be empty. -/// * Leave: A response to a `Leave` request from the client. If the client -/// successfully left a room, the `index` field will contain the index of the -/// client that left the room. If the client could not leave a room, the -/// `index` field will be 0. -/// * Error: A response to indicate that an error occurred. The `message` -/// field will contain a description of the error. -#[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "type", rename_all = "camelCase")] -pub enum ResponsePacket { - Join { - /// The size of the room that the client joined. If the client could - /// not join a room, this field will be `None`. - #[serde(skip_serializing_if = "Option::is_none")] - size: Option, - }, - Create { - /// The ID of the room that the server created. If the server could - /// not create a room, this field will be empty. - id: String, - }, - Leave { - /// The index of the client that left the room. If the client could not - /// leave a room, this field will be 0. - index: usize, - }, - Error { - /// A description of the error that occurred. - message: String, - }, -} +/// It then sets up the application routes for the server. In this case, the +/// only route is for the WebSocket connection. +/// +/// The WebSocket route requires a `ConnectInfo` extractor to get the client's +/// IP address, which is then used to store the client in a data structure +/// keyed by their IP address. This allows for efficient lookup of clients by +/// their IP address. +/// +/// Finally, it starts the server by binding to the specified host and port, +/// and running the application. If the server fails to bind to the specified +/// host and port, it logs an error and exits. +use axum::{ + extract::{ws::WebSocket, State, WebSocketUpgrade}, + response::IntoResponse, + routing::get, + Router, +}; -/// A `Room` is a collection of clients that are connected to each other. -/// -/// Each room has a set of clients, represented by a `Vec` of `Sender` -/// instances. The `Sender` instances are used to send messages to the -/// clients in the room. -/// -/// The `senders` field is the list of senders that are connected to each -/// other. Each sender is a mutex-guarded, split sink of a WebSocket -/// stream and Message values. This is explained in more detail in the -/// documentation for the `Sender` type alias in the `packets` module. -/// -/// The `size` field is the maximum number of clients that a room can have. -/// When a room reaches its maximum size, no more clients can join the room. -/// This is used to prevent rooms from getting too full and causing the -/// server to run out of memory. -#[derive(Debug)] -pub struct Room { - senders: Vec, - size: usize, -} +use futures_util::StreamExt; +use std::{env, net::SocketAddr, sync::Arc}; +use tokio::{ + net::TcpListener, + signal, + sync::{Mutex, RwLock}, +}; +use tower_http::trace::{DefaultMakeSpan, TraceLayer}; +use tracing::{debug, error, info, warn}; -impl Room { - /// The default size of a room. - /// - /// This is the size that a room will have when it is created. - const DEFAULT_ROOM_SIZE: usize = 2; +use crate::relay::appstate::AppState; +use crate::relay::client::Client; - /// Creates a new `Room` with the given size. - /// - /// The `size` parameter is the maximum number of clients that can join the - /// room. If `size` is 0, then the room will not be able to hold any - /// clients. - /// - /// The `senders` field of the returned `Room` is an empty vector. - /// - /// The `size` field of the returned `Room` is `size`. - fn new(size: usize) -> Room { - Room { - // Initialize the list of senders to be empty. - senders: Vec::new(), - // Set the size of the room. - size, +/// This function starts the WebSocket server. +/// +/// It retrieves the environment variables that define how the server should +/// be configured. If any of these variables are not defined, it sets a +/// reasonable default value. +/// +/// The environment variables are: +/// +/// * `APP_ENVIRONMENT`: the environment the server is running in (defaults +/// to "development"). +/// * `APP_HOST`: the host the server should listen on (defaults to "0.0.0.0"). +/// * `APP_PORT`: the port the server should listen on (defaults to "8000"). +/// * `APP_DOMAIN`: the domain the server is accessible at (defaults to ""). +/// +/// It then sets up the application routes for the server. In this case, the +/// only route is for the WebSocket connection. +/// +/// The WebSocket route requires a `ConnectInfo` extractor to get the client's +/// IP address, which is then used to store the client in a data structure +/// keyed by their IP address. This allows for efficient lookup of clients by +/// their IP address. +/// +/// Finally, it starts the server by binding to the specified host and port, +/// and running the application. If the server fails to bind to the specified +/// host and port, it logs an error and exits. +pub async fn start_ws(port: Option<&i32>, listen_addr: Option<&String>) { + // Retrieve environment variables and set defaults if necessary. + let app_environemt = env::var("APP_ENVIRONMENT").unwrap_or("development".to_string()); + let app_host = match listen_addr { + Some(address) => address.to_string(), + None => env::var("APP_HOST").unwrap_or("0.0.0.0".to_string()), + }; + let app_port = match port { + Some(port) => port.to_string(), + None => env::var("APP_PORT").unwrap_or("8000".to_string()), + }; + + // Log information about the server's configuration. + debug!("Server configured to accept connections on host {app_host}...",); + debug!("Server configured to listen connections on port {app_port}...",); + + // Based on the environment variable, set the logging level. + match app_environemt.as_str() { + "development" => { + debug!("Running in development mode"); } + "production" => { + debug!("Running in production mode"); + } + _ => { + debug!("Running in development mode"); + } + } + + // Create a new server data structure. + let server = AppState::new(); + + // Set up the application routes. + let app = Router::new() + .route("/ws", get(ws_handler)) + .with_state(server) + .layer( + TraceLayer::new_for_http() + .make_span_with(DefaultMakeSpan::default().include_headers(true)), + ); + + // Attempt to bind to the specified host and port. + if let Ok(listener) = TcpListener::bind(&format!("{}:{}", app_host, app_port)).await { + // Log successful binding. + info!("Listening on: {}", listener.local_addr().unwrap()); + + // Run the server. + axum::serve( + listener, + app.into_make_service_with_connect_info::(), + ) + .with_graceful_shutdown(shutdown_signal()) + .await + .unwrap(); + } else { + // Log binding failure and exit. + error!("Failed to listen on: {}:{}", app_host, app_port); } } -/// A struct that holds all of the rooms that the server knows about. +/// This function is an endpoint for the WebSocket route. /// -/// The rooms are stored in a `HashMap` with the room ID as the key and the -/// room as the value. This means that looking up a room by its ID is an O(1) -/// operation, which is very fast. -#[derive(Debug)] -pub struct AppState { - pub rooms: HashMap, +/// This function is called whenever a client makes a WebSocket request to +/// the `/ws` endpoint. +/// +/// The function takes two arguments: +/// +/// - `ws`: This is the WebSocketUpgrade object, which is used to upgrade the +/// HTTP connection to a WebSocket connection. +/// - `State(shared_state)`: This is the state of the server, which is stored +/// in a read-write lock. The state is shared between all WebSocket +/// connections. +/// - `ConnectInfo(addr)`: This is the information about the client that +/// connected to the server. The function uses this information to log the +/// address of the client that connected to the server. +/// +/// The function upgrades the HTTP connection to a WebSocket connection using +/// the `ws` argument. It then passes the upgraded WebSocket connection, along +/// with the state of the server, to the `handle_socket` function. +/// +/// The `handle_socket` function is defined in the `src/relay/mod.rs` file. It +/// is the function that handles the WebSocket connection. +/// +/// The `handle_socket` function takes three arguments: +/// +/// - `socket`: This is the WebSocket connection that it should handle. +/// - `who`: This is the address of the client that connected to the server. +/// - `rooms`: This is the state of the server, which is stored in a read-write +/// lock. The state is shared between all WebSocket connections. +/// +/// The `handle_socket` function handles the WebSocket connection by calling +/// the `handle_message` function on a `Client` object that it creates. The +/// `handle_message` function is defined in the `src/relay/client.rs` file. The +/// `handle_message` function handles incoming messages from the client and +/// takes care of sending the appropriate response back to the client. +pub async fn ws_handler( + ws: WebSocketUpgrade, + State(shared_state): State>>, + // ConnectInfo(addr): ConnectInfo, +) -> impl IntoResponse { + debug!("Got Request on Websocket route"); + // debug!("WebSocket connection established from:{}", addr.to_string()); + debug!("Upgrading Connection"); + ws.on_upgrade(move |socket| handle_socket(socket, shared_state)) } -impl AppState { - /// Creates a new `Server` with an empty list of rooms. - /// - /// The `rooms` field of the returned `Server` is an empty `HashMap`. - /// This means that the server will not have any rooms when it is first - /// created. - /// - /// This function returns an `Arc>` because the server - /// needs to be shared between different parts of the program. The - /// `Arc` makes it so that the server can be shared by multiple threads, - /// and the `RwLock` makes it so that the server can be read from and - /// written to from multiple threads at the same time. - /// - /// The `Arc` and `RwLock` are both parts of the `tokio` library, which - /// provides asynchronous programming tools for Rust. - /// - /// The `Arc` and `RwLock` are used together to create a Mutex-like - /// object that can be shared between threads. The main difference - /// between a Mutex and an `Arc>` is that a Mutex can only be - /// locked by one thread at a time, while an `Arc>` can be - /// locked by multiple threads at the same time. - /// - /// This function is used to create a new `Server` and share it between - /// different parts of the program. The `Server` is shared because it - /// needs to be able to handle connections from multiple clients at the - /// same time. - pub fn new() -> Arc> { - // Create a new `Server` instance. - Arc::new(RwLock::new(AppState { - // Initialize the list of rooms to be empty. - rooms: HashMap::new(), - })) - } -} - -/// This struct represents a single client connection to the server. +/// This function is called when a new WebSocket connection is established. +/// The function takes three arguments: /// -/// A `Client` instance holds a `Sender` and a `room_id`. +/// - `socket`: This is the WebSocket connection that it should handle. +/// - `who`: This is the address of the client that connected to the server. +/// - `rooms`: This is the state of the server, which is stored in a read-write +/// lock. The state is shared between all WebSocket connections. /// -/// The `Sender` is a type alias for a `tokio::sync::mpsc::Sender`. -/// It is used to send messages to the client. +/// The function creates a `Client` object, which will handle the WebSocket +/// connection. The `Client` object is created with an Arc-wrapped Mutex +/// containing the `sender` of the WebSocket connection. The `sender` is used to +/// send messages to the client. /// -/// The `room_id` is an `Option`. It is used to keep track of which -/// room the client is currently in. If the `room_id` is `None`, then the -/// client is not in any room. If the `room_id` is `Some(id)`, where `id` is a -/// `String`, then the client is in the room with the ID `id`. +/// The function then creates a new `split` of the WebSocket connection, which +/// is a pair of a `sender` and a `receiver`. The `sender` is used to send +/// messages to the client, and the `receiver` is used to receive messages from +/// the client. The `receiver` is wrapped in a `Stream` (which is an async +/// iterator) so that the function can use the `next` method to receive messages +/// from the client. /// -/// The `room_id` is used to keep track of which room the client is in so -/// that the server knows which room to send messages to. When a client -/// joins a room, their `room_id` is set to the ID of the room that they -/// joined. When a client leaves a room, their `room_id` is set to `None`. +/// The function then enters a loop that receives incoming messages from the +/// client and handles them. For each received message, the function calls the +/// `handle_message` method on the `Client` object that it created. The +/// `handle_message` method is defined in the `src/relay/client.rs` file. The +/// `handle_message` method handles incoming messages from the client and +/// takes care of sending the appropriate response back to the client. /// -/// The `Client` struct is used to keep track of which room each client is -/// in. It is used by the `Server` to determine which room to send messages -/// to. +/// If the function encounters an error while reading a message from the +/// client, it logs the error and breaks out of the loop. /// -#[derive(Debug)] -pub struct Client { - sender: Sender, - room_id: Option, -} +/// After the loop finishes (either because an error occurred or because the +/// client disconnected), the function calls the `handle_close` method on the +/// `Client` object that it created. The `handle_close` method is defined in the +/// `src/relay/client.rs` file. The `handle_close` method handles the close event +/// from the client. +async fn handle_socket(socket: WebSocket, rooms: Arc>) { + let (sender, mut receiver) = socket.split(); -impl Client { - /// Creates a new `Client` instance. - /// - /// The `sender` argument is a `Sender` for sending messages to the client. - /// It is used by the `Server` to send messages to the client. - /// - /// The `room_id` field of the `Client` instance is set to `None` initially. - /// This is because the client is not in any room when they first connect - /// to the server. - /// - /// The `sender` field of the `Client` instance is used to send messages to - /// the client. When the server wants to send a message to the client, it - /// uses the `sender` to send the message. - /// - /// The `Client` instance is used by the `Server` to keep track of which - /// room each client is in. It is used by the `Server` to determine which - /// room to send messages to. - pub fn new(sender: Sender) -> Client { - Client { - sender, - room_id: None, - } - } - - /// Sends a message to a client. - /// - /// This function takes a `sender` argument, which is a `Mutex` guard - /// for a WebSocket connection. The `sender` is used to send a message - /// to the client. - /// - /// The `message` argument is the message that is sent to the client. It - /// is a WebSocket message. - /// - /// This function locks the `sender` Mutex to ensure that only one thread - /// can send a message at a time. This is because the SplitSink that the - /// `sender` mutex guards is not thread-safe, and sending a message from - /// multiple threads could result in the messages being sent out of order. - /// - /// If sending the message fails, this function logs an error message. - async fn send(&self, sender: Sender, message: Message) { - let mut sender = sender.lock().await; - if let Err(error) = sender.send(message).await { - error!("Failed to send message to the client: {}", error); - } - } - - /// Sends a packet to a client. - /// - /// This function takes a `sender` argument, which is a `Mutex` guard - /// for a WebSocket connection. The `sender` is used to send a message - /// to the client. - /// - /// The `packet` argument is the packet that is sent to the client. It - /// is a struct that contains the data that is being sent. - /// - /// This function serializes the `packet` using serde_json and sends it - /// to the client as a WebSocket Text message. - /// - /// This function locks the `sender` Mutex to ensure that only one thread - /// can send a message at a time. This is because the SplitSink that the - /// `sender` mutex guards is not thread-safe, and sending a message from - /// multiple threads could result in the messages being sent out of order. - async fn send_packet(&self, sender: Sender, packet: ResponsePacket) { - let serialized_packet = serde_json::to_string(&packet).unwrap(); - - self.send(sender, Message::Text(serialized_packet)).await; - } - - /// Sends an error packet to a client. - /// - /// This function takes a `sender` argument, which is a `Mutex` guard - /// for a WebSocket connection. The `sender` is used to send a message - /// to the client. - /// - /// The `message` argument is the message that is sent to the client. It - /// is a string that describes the error. - /// - /// This function creates an error packet with the `message` and sends it - /// to the client using the `send_packet` function. - /// - /// This function locks the `sender` Mutex to ensure that only one thread - /// can send a message at a time. This is because the SplitSink that the - /// `sender` mutex guards is not thread-safe, and sending a message from - /// multiple threads could result in the messages being sent out of order. - async fn send_error_packet(&self, sender: Sender, message: String) { - let error_packet = ResponsePacket::Error { message }; - - self.send_packet(sender, error_packet).await - } - - /// Handles a CreateRoom request from a client. - /// - /// This function is called when a client sends a CreateRoom request to - /// the server. The server will create a new room with the specified - /// size and return the room's identifier to the client. - /// - /// This function takes a `server` argument, which is a `RwLock` - /// guard for the server's state. The `server` is used to check if the - /// current client is already in a room, and to insert the new room into - /// the server's state. - /// - /// If the current client is already in a room, this function returns - /// without doing anything. This is to prevent a client from being in - /// multiple rooms at the same time. - /// - /// If there is already a room with the same identifier as the one that - /// is being created, this function sends an error packet to the client - /// and returns. - /// - /// If there is no existing room with the same identifier, this function - /// creates a new room with the specified size and inserts it into the - /// server's state. It then sends a CreateRoom response packet to the - /// client with the room's identifier. - /// - /// This function locks the `server` RwLock to ensure that only one - /// thread can access the server's state at a time. This is because the - /// server's state is not thread-safe, and accessing it from multiple - /// threads could result in undefined behavior. - async fn handle_create_room(&mut self, server: &RwLock) { - let mut server = server.write().await; - - // If the current client is already in a room, do nothing. - if server.rooms.iter().any(|(_, room)| { - room.senders - .iter() - .any(|sender| Arc::ptr_eq(sender, &self.sender)) - }) { - return; - } - - // Generate a new room identifier. - let size = Room::DEFAULT_ROOM_SIZE; - let room_id = Uuid::new_v4().to_string(); - - // If there is already a room with the same identifier, send an error - // packet to the client and return. - if server.rooms.contains_key(&room_id) { - drop(server); - - return self - .send_error_packet( - self.sender.clone(), - "A room with that identifier already exists.".to_string(), - ) - .await; - } - - // Create a new room with the specified size and insert it into the - // server's state. - let mut room = Room::new(size); - room.senders.push(self.sender.clone()); - - server.rooms.insert(room_id.clone(), room); - - // Set the client's room ID to the new room's identifier. - self.room_id = Some(room_id.clone()); - - drop(server); - - // Send a CreateRoom response packet to the client with the room's - // identifier. - self.send_packet(self.sender.clone(), ResponsePacket::Create { id: room_id }) - .await - } - - /// This function is called when the client sends a JoinRoom packet. - /// - /// If the client is already in a room, then this function does nothing. - /// - /// If the client is not in a room, then the function checks if the room - /// specified in the packet exists. If the room does not exist, an error - /// packet is sent to the client with a message indicating that the room - /// does not exist. - /// - /// If the room does exist, then the function checks if the room is full. - /// If the room is full, an error packet is sent to the client with a - /// message indicating that the room is full. - /// - /// If the room is not full, then the client is added to the room and the - /// function sends a JoinRoom response packet to the client with the size - /// of the room (excluding the client itself) and a `size` field set to - /// `None`. The response packet is sent to all other clients in the room. - async fn handle_join_room(&mut self, server: &RwLock, room_id: String) { - let mut server = server.write().await; - - // If the client is already in a room, do nothing. - if server.rooms.iter().any(|(_, room)| { - room.senders - .iter() - .any(|sender| Arc::ptr_eq(sender, &self.sender)) - }) { - return; - } - - // Get a mutable reference to the room specified in the packet. - // If the room does not exist, return an error to the client. - let Some(room) = server.rooms.get_mut(&room_id) else { - drop(server); - - return self - .send_error_packet(self.sender.clone(), "The room does not exist.".to_string()) - .await; - }; - - // If the room is full, return an error to the client. - if room.senders.len() >= room.size { - drop(server); - - return self - .send_error_packet(self.sender.clone(), "The room is full.".to_string()) - .await; - } - - // Add the client to the room and set the client's room ID to the new - // room's identifier. - room.senders.push(self.sender.clone()); - self.room_id = Some(room_id); - - // Create a list of futures to send JoinRoom response packets to all - // other clients in the room. The `size` field of the response packet is - // set to `None` if the client sending the packet is the one joining the - // room. Otherwise, the `size` field is set to the number of clients in - // the room minus one (to exclude the client joining the room). - let mut futures = vec![]; - for sender in &room.senders { - if Arc::ptr_eq(sender, &self.sender) { - futures.push(self.send_packet( - sender.clone(), - ResponsePacket::Join { - size: Some(room.senders.len() - 1), - }, - )); - } else { - futures.push(self.send_packet(sender.clone(), ResponsePacket::Join { size: None })); - } - } - - drop(server); - join_all(futures).await; - } - - /// Handles a request to leave a room. - /// - /// This function is called when a client sends a `LeaveRoom` request - /// packet. The function obtains a write lock on the server's state and - /// does the following: - /// - /// 1. Gets the room ID of the client who sent the request. If the client is - /// not in a room, the function returns early. - /// 2. Tries to get a mutable reference to the room with the obtained room - /// ID. If the room does not exist, the function returns early. - /// 3. Finds the index of the client's sender in the room's list of senders. - /// If the client is not in the room, the function returns early. - /// 4. Removes the client's sender from the room's list of senders. - /// 5. Sets the client's room ID to `None`. - /// 6. Creates a list of futures to send `LeaveRoom` response packets to - /// all other clients in the room. The `index` field of the response - /// packet is set to the index of the client's sender in the room's list - /// of senders. - /// 7. If the room is now empty, removes the room from the server's list - /// of rooms. - /// 8. Drops the write lock on the server's state. - /// 9. Waits for all futures to complete. - async fn handle_leave_room(&mut self, server: &RwLock) { - // Obtain a write lock on the server's state. - let mut server = server.write().await; - - // Get the room ID of the client who sent the request. - let Some(room_id) = self.room_id.clone() else { - // If the client is not in a room, return early. - return; - }; - - // Try to get a mutable reference to the room with the obtained room ID. - let Some(room) = server.rooms.get_mut(&room_id) else { - // If the room does not exist, return early. - return; - }; - - // Find the index of the client's sender in the room's list of senders. - let Some(index) = room - .senders - .iter() - .position(|sender| Arc::ptr_eq(sender, &self.sender)) - else { - // If the client is not in the room, return early. - return; - }; - - // Remove the client's sender from the room's list of senders. - room.senders.remove(index); - - // Set the client's room ID to `None`. - self.room_id = None; - - // Create a list of futures to send `LeaveRoom` response packets to - // all other clients in the room. The `index` field of the response - // packet is set to the index of the client's sender in the room's list - // of senders. - let mut futures = vec![]; - for sender in &room.senders { - futures.push(self.send_packet(sender.clone(), ResponsePacket::Leave { index })); - } - - // If the room is now empty, removes the room from the server's list - // of rooms. - if room.senders.is_empty() { - server.rooms.remove(&room_id); - } - - // Drop the write lock on the server's state. - drop(server); - - // Wait for all futures to complete. - join_all(futures).await; - } - - /// This function handles an incoming message from a client. - /// - /// The message can be one of four types: `Text`, `Binary`, `Ping`, or `Close`. - /// - /// If the message is `Text`, the function parses the message as a `RequestPacket` and - /// calls the appropriate function to handle the request. If the message cannot be - /// parsed as a `RequestPacket`, the function does nothing and returns early. - /// - /// If the message is `Binary`, the function first acquires a read lock on the server's - /// state. If the client is not currently in a room, the function drops the read lock and - /// returns early. If the client is not in a room, or if the room does not exist, the - /// function drops the read lock and returns early. - /// - /// The function then finds the index of the client's sender in the room's list of - /// senders. If the client's sender is not in the room's list of senders, the function - /// drops the read lock and returns early. - /// - /// The function then gets the binary data from the message and sets the first byte to - /// the index of the client's sender in the room's list of senders. If there is no - /// binary data in the message, the function drops the read lock and returns early. - /// - /// The function then determines where to send the message. If the first byte of the - /// message is less than the number of clients in the room, the function sends the message - /// to the client at that index in the room's list of senders. If the first byte of the - /// message is equal to the number of clients in the room plus one, the function sends the - /// message to all clients in the room, excluding the client that sent the message. - /// - /// If the first byte of the message is any other value, the function drops the read - /// lock and returns early. - /// - /// Finally, the function drops the read lock and waits for all futures to complete. - /// - /// If the message is `Ping`, the function prints a message to stdout. - /// - /// If the message is `Pong`, the function prints a message to stdout. - /// - /// If the message is `Close`, the function prints a message to stdout and calls the - /// `handle_close` function. - pub async fn handle_message(&mut self, server: &RwLock, message: Message) { + let sender = Arc::new(Mutex::new(sender)); + let mut client = Client::new(sender.clone()); + while let Some(message) = receiver.next().await { match message { - Message::Text(text) => { - let packet = match serde_json::from_str(&text) { - Ok(packet) => packet, - Err(_) => return, - }; - match packet { - RequestPacket::Create => self.handle_create_room(server).await, - RequestPacket::Join { id } => self.handle_join_room(server, id).await, - RequestPacket::Leave => self.handle_leave_room(server).await, - } + Ok(message) => { + client.handle_message(&rooms, message).await; } - Message::Binary(_) => { - // Acquire a read lock on the server's state. - let server = server.read().await; - - // If the client is not currently in a room, return early. - let Some(room_id) = &self.room_id else { - drop(server); - return; - }; - - // If the room does not exist, return early. - let Some(room) = server.rooms.get(room_id) else { - drop(server); - return; - }; - - // Find the index of the client's sender in the room's list of senders. - let Some(index) = room - .senders - .iter() - .position(|sender| Arc::ptr_eq(sender, &self.sender)) - else { - drop(server); - return; - }; - - // Get the binary data from the message and set the first byte to - // the index of the client's sender in the room's list of senders. - let mut data = message.into_data(); - if data.is_empty() { - drop(server); - return; - } - - let source = u8::try_from(index).unwrap(); - - // Determine where to send the message. - let destination = usize::from(data[0]); - data[0] = source; - - // Send the message to the client at the destination index in the - // room's list of senders. - if destination < room.senders.len() { - let sender = room.senders[destination].clone(); - - drop(server); - return self.send(sender, Message::Binary(data)).await; - } - - // Send the message to all clients in the room, excluding the - // client that sent the message. - if destination == usize::from(u8::MAX) { - let mut futures = vec![]; - for sender in &room.senders { - if Arc::ptr_eq(sender, &self.sender) { - continue; - } - - futures.push(self.send(sender.clone(), Message::Binary(data.clone()))); - } - - drop(server); - join_all(futures).await; - } - } - Message::Ping(_) => { - println!("Got Message Type Ping"); - } - Message::Pong(_) => { - println!("Got Message Type Pong"); - } - Message::Close(_) => { - println!("Got Message Type Close"); - self.handle_close(server).await; + Err(error) => { + warn!("Failed to read message from client: {}", error); + break; } } } + // Handle the close event from the client. + client.handle_close(&rooms).await +} - pub async fn handle_close(&mut self, server: &RwLock) { - self.handle_leave_room(server).await +/// This function sets up a signal handler for SIGINT (Ctrl+C) and SIGTERM +/// (terminate) on Unix platforms. It does nothing on non-Unix platforms. +/// +/// The function installs two signal handlers: one for SIGINT and one for +/// SIGTERM. When either of these signals is received, the signal handler +/// simply resolves the future with `()`. This allows the main function to +/// wait for the signal handler to trigger a shutdown. +/// +/// The function uses the `tokio::select!` macro to wait for either of the +/// signal handlers to resolve. When the future returned by `tokio::select!` +/// resolves, the function simply drops the value and does nothing else. +/// +/// The function does not actually do anything itself. It simply waits for +/// one of the signal handlers to trigger a shutdown. +async fn shutdown_signal() { + // Install a signal handler for SIGINT (Ctrl+C). This future resolves + // when the user presses Ctrl+C. + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + // Install a signal handler for SIGTERM (terminate). This future + // resolves when the operating system sends a SIGTERM signal to the + // program. + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + // If we are not on a Unix platform, we don't need to install a signal + // handler for SIGTERM. Instead, we create a future that never resolves. + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + // Wait for either of the two signal handlers to resolve. When one of them + // resolves, the other one may still be waiting, but it doesn't matter + // because we don't need to do anything else. + tokio::select! { + // If the Ctrl+C signal handler resolves, drop the value and do + // nothing else. + _ = ctrl_c => {}, + // If the terminate signal handler resolves, drop the value and do + // nothing else. + _ = terminate => {}, } }