feature(sender): adapted relay and sender component to work with a simple transfer name

In order to be able to use a simple transfer name, the relay server and
the sender were adapted. On the sender, the on_create_room method was
adapted so that a call is made to the http_client to register the
transfer with the relay server. The handle_create_room method on the
server has been adapted so that when a room is created, the name is
transmitted by the receiver and is no longer generated on the
relayserver.This was also done in preparation for integrating a local
relayserver for the direct connection between sender and receiver.
This commit is contained in:
Patryk Hegenberg 2024-04-30 23:31:47 +02:00
parent 9f7b95ca98
commit 336ea18ad9
11 changed files with 88 additions and 41 deletions

2
Cargo.lock generated
View file

@ -406,7 +406,7 @@ checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
[[package]]
name = "caesar-transfer-iu"
version = "0.0.1"
version = "0.2.0"
dependencies = [
"aes-gcm",
"axum 0.7.5",

View file

@ -1,6 +1,6 @@
[package]
name = "caesar-transfer-iu"
version = "0.0.1"
version = "0.2.0"
edition = "2021"
build = "src/build.rs"
authors = ["Manuel Keidel", "Patryk Hegenberg", "Krzysztof Stankiewicz"]

View file

@ -9,7 +9,7 @@ use crate::relay::transfer::Transfer;
/// The rooms are stored in a `HashMap` with the room ID as the key and the
/// room as the value. This means that looking up a room by its ID is an O(1)
/// operation, which is very fast.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct AppState {
pub rooms: HashMap<String, Room>,
pub transfers: Vec<Transfer>,

View file

@ -2,7 +2,7 @@ use axum::extract::ws::Message;
use futures_util::{future::join_all, stream::SplitSink, SinkExt};
use std::{sync::Arc, vec};
use tokio::{sync::Mutex, sync::RwLock};
use tracing::error;
use tracing::{debug, error};
use crate::relay::appstate::AppState;
use crate::relay::room::Room;
@ -156,7 +156,7 @@ impl Client {
/// thread can access the server's state at a time. This is because the
/// server's state is not thread-safe, and accessing it from multiple
/// threads could result in undefined behavior.
async fn handle_create_room(&mut self, server: &RwLock<AppState>) {
async fn handle_create_room(&mut self, server: &RwLock<AppState>, id: Option<String>) {
let mut server = server.write().await;
// If the current client is already in a room, do nothing.
@ -170,7 +170,10 @@ impl Client {
// Generate a new room identifier.
let size = Room::DEFAULT_ROOM_SIZE;
let room_id = Uuid::new_v4().to_string();
let room_id = match id {
Some(id) => id,
None => Uuid::new_v4().to_string(),
};
// If there is already a room with the same identifier, send an error
// packet to the client and return.
@ -199,6 +202,7 @@ impl Client {
// Send a CreateRoom response packet to the client with the room's
// identifier.
debug!("Room created");
self.send_packet(self.sender.clone(), ResponsePacket::Create { id: room_id })
.await
}
@ -401,7 +405,7 @@ impl Client {
Err(_) => return,
};
match packet {
RequestPacket::Create => self.handle_create_room(server).await,
RequestPacket::Create { id } => self.handle_create_room(server, id).await,
RequestPacket::Join { id } => self.handle_join_room(server, id).await,
RequestPacket::Leave => self.handle_leave_room(server).await,
}

View file

@ -22,7 +22,9 @@ pub enum RequestPacket {
// The ID of the room that the client wants to join.
id: String,
},
Create,
Create {
id: Option<String>,
},
Leave,
}

View file

@ -1,4 +1,4 @@
use axum::extract::ws::Message;
use axum::extract::ws::{Message, WebSocket};
use futures_util::stream::SplitSink;
use std::sync::Arc;
use tokio::sync::Mutex;
@ -24,7 +24,7 @@ use tokio::sync::Mutex;
//
// The type alias is used so that the type is not mentioned every time it is
// used. This makes the code easier to read and understand.
type Sender = Arc<Mutex<SplitSink<axum::extract::ws::WebSocket, Message>>>;
type Sender = Arc<Mutex<SplitSink<WebSocket, Message>>>;
/// A `Room` is a collection of clients that are connected to each other.
///
@ -41,7 +41,7 @@ type Sender = Arc<Mutex<SplitSink<axum::extract::ws::WebSocket, Message>>>;
/// When a room reaches its maximum size, no more clients can join the room.
/// This is used to prevent rooms from getting too full and causing the
/// server to run out of memory.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Room {
pub senders: Vec<Sender>,
pub size: usize,

View file

@ -27,7 +27,7 @@ mod tests {
Transfer::new(
"Test".to_string(),
"127.0.0.1".to_string(),
"This_is_a_test_room_id".to_string()
"This_is_a_test_room_id".to_string(),
),
transfer
)

View file

@ -119,25 +119,39 @@ struct Context {
/// appended to the room id to create a URL. The URL is then printed to the
/// console using the qr2term library. Finally, the function prints a
/// message to the console with the URL.
fn on_create_room(context: &Context, id: String) -> Status {
fn on_create_room(context: &Context, id: String, relay: String) -> Status {
let base64 = general_purpose::STANDARD.encode(&context.hmac);
let url = format!("{}-{}", id, base64);
let rand_name = generate_random_name();
let hash_name = hash_random_name(rand_name.clone());
let res = send_info("http://localhost:8000", &hash_name, url.as_str());
debug!("Got Result: {:#?}", res);
let send_url = url.to_string();
let h_name = hash_name.to_string();
let server_url = String::from("http://") + relay.as_str();
let res = std::thread::spawn(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(send_info(&server_url, &h_name, send_url.as_str()))
})
.join()
.unwrap();
debug!("Got Result: {:?}", res);
// Print a newline to the console to separate the output from the command
// line.
println!();
// Try to generate a QR code from the URL. If the function fails for some
// reason, print an error message to the console.
if let Err(error) = qr2term::print_qr(&url) {
// if let Err(error) = qr2term::print_qr(&url) {
// error!("Failed to generate QR code: {}", error);
// }
if let Err(error) = qr2term::print_qr(&rand_name) {
error!("Failed to generate QR code: {}", error);
}
// Print a newline to the console to separate the output from the command
// line.
println!();
@ -535,13 +549,13 @@ fn on_handshake(context: &mut Context, handshake_response: HandshakeResponsePack
/// struct is then matched on to call the appropriate function.
///
/// If the message is invalid, an error is returned.
fn on_message(context: &mut Context, message: WebSocketMessage) -> Status {
fn on_message(context: &mut Context, message: WebSocketMessage, relay: String) -> Status {
if message.is_text() {
let text = message.into_text().unwrap();
let packet = serde_json::from_str(&text).unwrap();
return match packet {
JsonPacketResponse::Create { id } => on_create_room(context, id),
JsonPacketResponse::Create { id } => on_create_room(context, id, relay),
JsonPacketResponse::Join { size } => on_join_room(context, size),
JsonPacketResponse::Leave { index } => on_leave_room(context, index),
JsonPacketResponse::Error { message } => on_error(message),
@ -588,7 +602,7 @@ fn on_message(context: &mut Context, message: WebSocketMessage) -> Status {
/// When the function is finished, it will exit and the transfer will be complete. If there
/// is an error during the transfer, the function will print an error message to stdout and
/// exit.
pub async fn start(socket: Socket, paths: Vec<String>) {
pub async fn start(socket: Socket, paths: Vec<String>, room_id: Option<String>, relay: String) {
// Create a vector to store metadata about each file that will be sent.
let mut files = vec![];
@ -668,7 +682,11 @@ pub async fn start(socket: Socket, paths: Vec<String>) {
debug!("Attempting to create room...");
// Send a JSON packet to the server to create a room with a size of 2.
context.sender.send_json_packet(JsonPacket::Create);
debug!("With Room-ID: {:?}", room_id);
context.sender.send_json_packet(JsonPacket::Create {
id: room_id.clone(),
});
// context.sender.send_json_packet(JsonPacket::Create);
// Create a future that handles the outgoing stream of messages from the client to the
// server.
@ -678,7 +696,7 @@ pub async fn start(socket: Socket, paths: Vec<String>) {
// client.
let incoming_handler = incoming.try_for_each(|message| {
// Call the `on_message` function to handle the incoming message.
match on_message(&mut context, message) {
match on_message(&mut context, message, relay.clone()) {
// If the status is `Status::Exit`, the transfer is complete. Print a message to
// stdout and exit the function.
Status::Exit() => {
@ -800,7 +818,8 @@ mod tests {
on_create_room(
&context,
"b531e87d-e51a-4507-94f4-335cbe2d32f3-Nc5skZReq7qJN7INwckyAZLWEEbxsrFfH/692tUNgkM="
.to_string()
.to_string(),
String::from("0.0.0.0:8000")
),
Status::Continue()
);
@ -885,17 +904,19 @@ mod tests {
assert_eq!(
on_message(
&mut context,
WebSocketMessage::Text(r#"{"type":"leave","index":5}"#.to_string())
WebSocketMessage::Text(r#"{"type":"leave","index":5}"#.to_string()),
String::from("0.0.0.0:8000")
),
Status::Continue()
);
assert_eq!(on_message(&mut context, WebSocketMessage::Text(r#"{"type":"create","id":"b531e87d-e51a-4507-94f4-335cbe2d32f3-Nc5skZReq7qJN7INwckyAZLWEEbxsrFfH/692tUNgkM="}"#.to_string())), Status::Continue());
assert_eq!(on_message(&mut context, WebSocketMessage::Text(r#"{"type":"create","id":"b531e87d-e51a-4507-94f4-335cbe2d32f3-Nc5skZReq7qJN7INwckyAZLWEEbxsrFfH/692tUNgkM="}"#.to_string()), String::from("0.0.0.0:8000")), Status::Continue());
assert_eq!(
on_message(
&mut context,
WebSocketMessage::Text(
r#"{"type":"error","message":"Error Message: Test"}"#.to_string()
)
),
String::from("0.0.0.0:8000")
),
Status::Err("Error Message: Test".to_string())
);

View file

@ -3,10 +3,12 @@ use tracing::error;
use local_ip_address::{local_ip, local_ipv6};
use reqwest::blocking::Client;
use tokio::task;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
pub fn send_info(relay: &str, name: &str, room_id: &str) -> Result<String> {
pub async fn send_info(relay: &str, name: &str, room_id: &str) -> Result<String> {
let url = relay.to_string();
let sender_ip = match local_ipv6() {
Ok(ip) => ip,
Err(_) => match local_ip() {
@ -18,14 +20,26 @@ pub fn send_info(relay: &str, name: &str, room_id: &str) -> Result<String> {
},
};
let ip_str = sender_ip.to_owned().to_string();
let mut map = HashMap::new();
map.insert("name", String::from(name));
map.insert("ip", ip_str);
map.insert("room_id", String::from(room_id));
let client = Client::new();
let _ = match client.post(format!("{}/upload", relay)).json(&map).send() {
Ok(_) => Ok(room_id.to_string()),
Err(e) => Err(Box::new(e)),
let map = {
let mut map = HashMap::new();
map.insert("name", String::from(name));
map.insert("ip", ip_str);
map.insert("room_id", String::from(room_id));
map
};
Ok("".to_string())
let room_id = room_id.to_string();
let result: Result<String> = task::spawn_blocking(move || {
let client = Client::new();
client
.post(format!("{}/upload", url))
.json(&map)
.send()?
.text()?
.to_string();
Ok(room_id)
})
.await?;
Ok(result?)
}

View file

@ -48,20 +48,25 @@ use tokio_tungstenite::{
tungstenite::{client::IntoClientRequest, http::HeaderValue},
};
use tracing::{debug, error};
use uuid::Uuid;
pub async fn start_sender(relay: &str, files: &[String]) {
match relay.into_client_request() {
debug!("Got relay: {relay}");
let url = String::from("ws://") + relay + "/ws";
match url.clone().into_client_request() {
Ok(mut request) => {
request
.headers_mut()
.insert("Origin", HeaderValue::from_str(relay).unwrap());
debug!("Attempting to connect to {relay}...");
debug!("Attempting to connect to {url}...");
let room_id = Uuid::new_v4().to_string();
match connect_async(request).await {
Ok((socket, _)) => {
let paths = files.to_vec();
sender::start(socket, paths).await;
sender::start(socket, paths, Some(room_id), relay.to_string()).await;
// sender::start(socket, paths).await;
}
Err(e) => {
error!("Error: Failed to connect with error: {e}");

View file

@ -31,7 +31,8 @@ pub enum JsonPacket {
id: String,
},
/// Sent from the client to ask to create a new room.
Create,
Create { id: Option<String> },
// Create,
/// Sent from the client to ask to leave the current room.
Leave,
}