From 2262fd9f756e7a7e55ce97b900e1dcaddd3de646 Mon Sep 17 00:00:00 2001 From: Patryk Hegenberg Date: Wed, 1 May 2024 17:46:04 +0200 Subject: [PATCH] feature(sender,receiver): enabled local and relay transfer actually a clean close of the sender application is still missing and has to be included --- src/cli/args.rs | 1 + src/receiver/http_client.rs | 8 +++---- src/receiver/mod.rs | 28 ++++++++++++++-------- src/relay/appstate.rs | 4 ++-- src/relay/server.rs | 46 ++++++++++++++++++++++------------- src/relay/transfer.rs | 26 ++++++++++++++------ src/sender/client.rs | 48 +++++++++++++++++++++++++++---------- src/sender/http_client.rs | 2 +- src/sender/mod.rs | 42 +++++++++++++++++++++++++------- 9 files changed, 144 insertions(+), 61 deletions(-) diff --git a/src/cli/args.rs b/src/cli/args.rs index c377826..894086c 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -3,6 +3,7 @@ use crate::relay; use crate::sender; use clap::{Parser, Subcommand}; use std::{env, sync::Arc}; +use tokio::sync::mpsc; use tracing::debug; /// This struct defines the CLI arguments and subcommands for the caesar command line application. diff --git a/src/receiver/http_client.rs b/src/receiver/http_client.rs index 9c6e155..0fd8008 100644 --- a/src/receiver/http_client.rs +++ b/src/receiver/http_client.rs @@ -3,18 +3,18 @@ use reqwest; use sha2::{Digest, Sha256}; use tracing::error; -use crate::relay::transfer::Transfer; +use crate::relay::transfer::TransferResponse; type Result = std::result::Result>; -pub async fn download_info(relay: &str, name: &str) -> Result { +pub async fn download_info(relay: &str, name: &str) -> Result { let url = String::from("http://") + relay; let hashed_name = Sha256::digest(name.as_bytes()); let hashed_string = hex::encode(hashed_name); match reqwest::get(format!("{}/download/{}", url, hashed_string)).await { - Ok(resp) => match resp.json::().await { - Ok(res) => Ok(res.room_id), + Ok(resp) => match resp.json::().await { + Ok(res) => Ok(res), Err(e) => Err(Box::new(e)), }, Err(err) => { diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index 7221412..5377490 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -47,17 +47,29 @@ use tokio_tungstenite::{ use tracing::{debug, error}; pub async fn start_receiver(relay: &str, name: &str) { - let room_id = http_client::download_info(relay, name).await.unwrap(); - debug!("Got Room_ID from Server: {room_id}"); + let res = http_client::download_info(relay, name).await.unwrap(); + debug!("Got room_id from Server: {:?}", res); + let res_ip = res.ip + ":9000"; - start_ws_com(relay, room_id.as_str()).await; + if let Err(e) = start_ws_com(res_ip.as_str(), res.room_id[0].as_str()).await { + debug!("Failed to connect local with first room_id: {e}"); + if let Err(e) = start_ws_com(res_ip.as_str(), res.room_id[1].as_str()).await { + debug!("Failed to connect local with second room_id: {e}"); + if let Err(e) = start_ws_com(relay, res.room_id[0].as_str()).await { + debug!("Failed to connect remote with first room_id: {e}"); + if let Err(e) = start_ws_com(relay, res.room_id[1].as_str()).await { + error!("Failed to accomplish transfer with error: {e}"); + } + } + } + } } -pub async fn start_ws_com(relay: &str, name: &str) { +pub async fn start_ws_com(relay: &str, name: &str) -> Result<(), Box> { let url = String::from("ws://") + relay + "/ws"; let Ok(mut request) = url.into_client_request() else { println!("Error: Failed to create request."); - return; + return Err("Failed to create request".into()); }; // Insert the origin into the request headers to prevent @@ -68,11 +80,6 @@ pub async fn start_ws_com(relay: &str, name: &str) { println!("Attempting to connect..."); - // let Ok((socket, _)) = connect_async(request).await else { - // error!("Error: Failed to connect."); - // return; - // }; - match connect_async(request).await { Ok((socket, _)) => receiver::start(socket, name).await, Err(e) => error!("Error: Failed to connect: {e:?}"), @@ -81,4 +88,5 @@ pub async fn start_ws_com(relay: &str, name: &str) { // receiver::client module and is the function that interacts with // the server to receive files. // receiver::start(socket, name).await + Ok(()) } diff --git a/src/relay/appstate.rs b/src/relay/appstate.rs index a45f6ac..b4d40b4 100644 --- a/src/relay/appstate.rs +++ b/src/relay/appstate.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use crate::relay::room::Room; -use crate::relay::transfer::Transfer; +use crate::relay::transfer::TransferResponse; /// A struct that holds all of the rooms that the server knows about. /// @@ -12,7 +12,7 @@ use crate::relay::transfer::Transfer; #[derive(Debug, Clone)] pub struct AppState { pub rooms: HashMap, - pub transfers: Vec, + pub transfers: Vec, } impl AppState { diff --git a/src/relay/server.rs b/src/relay/server.rs index dd11b88..e4e4034 100644 --- a/src/relay/server.rs +++ b/src/relay/server.rs @@ -19,7 +19,7 @@ use axum::{ extract::{ws::WebSocket, Json, Path, State, WebSocketUpgrade}, http::StatusCode, response::IntoResponse, - routing::{get, post}, + routing::{get, post, put}, Router, }; @@ -34,9 +34,9 @@ use tokio::{ use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use tracing::{debug, error, info, warn}; -use crate::relay::appstate::AppState; use crate::relay::client::Client; -use crate::relay::transfer::Transfer; +use crate::relay::transfer::TransferResponse; +use crate::relay::{appstate::AppState, transfer::TransferRequest}; /// This function starts the WebSocket server. /// @@ -98,7 +98,7 @@ pub async fn start_ws(port: Option<&i32>, listen_addr: Option<&String>) { // Set up the application routes. let app = Router::new() .route("/ws", get(ws_handler)) - .route("/upload", post(upload_info)) + .route("/upload", put(upload_info)) .route("/download/:name", get(download_info)) .route("/download_success/:name", post(download_success)) .with_state(server) @@ -282,21 +282,35 @@ async fn shutdown_signal() { pub async fn upload_info( State(shared_state): State>>, // ConnectInfo(addr): ConnectInfo, - Json(payload): Json, + Json(payload): Json, ) -> impl IntoResponse { // debug!("Got upload request from {}", addr.ip().to_string()); let mut data = shared_state.write().await; - let t_request = Transfer { - name: payload.name, - ip: payload.ip, - room_id: payload.room_id, - }; - data.transfers.push(t_request.clone()); + match data + .transfers + .iter_mut() + .find(|request| request.name == payload.name) + { + Some(request) => { + request.room_id.push(payload.room_id); + debug!("Found Transfer and updated"); + (StatusCode::OK, Json(request.clone())) + } + None => { + let mut t_request = TransferResponse { + name: payload.name, + ip: payload.ip, + room_id: Vec::new(), + }; + t_request.room_id.push(payload.room_id); + data.transfers.push(t_request.clone()); - debug!("New TransferRequest created"); - debug!("Actual AppState is {:#?}", *data); + debug!("New TransferRequest created"); + debug!("Actual AppState is {:#?}", *data); - (StatusCode::CREATED, Json(t_request)) + (StatusCode::CREATED, Json(t_request)) + } + } } pub async fn download_info( @@ -313,10 +327,10 @@ pub async fn download_info( warn!("couldn't find transfer-name: {}", name); ( StatusCode::NOT_FOUND, - Json(Transfer { + Json(TransferResponse { name: String::from(""), ip: String::from(""), - room_id: String::from(""), + room_id: vec![String::from("")], }), ) } diff --git a/src/relay/transfer.rs b/src/relay/transfer.rs index 82d7465..a69329d 100644 --- a/src/relay/transfer.rs +++ b/src/relay/transfer.rs @@ -1,33 +1,45 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct Transfer { +pub struct TransferRequest { pub name: String, pub ip: String, pub room_id: String, } - -impl Transfer { +impl TransferRequest { pub fn new(name: String, ip: String, room_id: String) -> Self { Self { name, ip, room_id } } } + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct TransferResponse { + pub name: String, + pub ip: String, + pub room_id: Vec, +} + +impl TransferResponse { + pub fn new(name: String, ip: String, room_id: Vec) -> Self { + Self { name, ip, room_id } + } +} #[cfg(test)] mod tests { use super::*; #[test] fn test_new() { - let transfer = Transfer { + let transfer = TransferResponse { name: "Test".to_string(), ip: "127.0.0.1".to_string(), - room_id: "This_is_a_test_room_id".to_string(), + room_id: vec!["This_is_a_test_room_id".to_string()], }; assert_eq!( - Transfer::new( + TransferResponse::new( "Test".to_string(), "127.0.0.1".to_string(), - "This_is_a_test_room_id".to_string(), + vec!["This_is_a_test_room_id".to_string()], ), transfer ) diff --git a/src/sender/client.rs b/src/sender/client.rs index d74ca0f..c00afaa 100644 --- a/src/sender/client.rs +++ b/src/sender/client.rs @@ -110,6 +110,13 @@ struct Context { task: Option>, } +impl Context { + async fn clean_up(&mut self) { + if let Some(task) = &self.task { + task.abort(); + } + } +} /// This function is called when the client receives a create room packet /// from the server. The function is responsible for printing a URL to the /// console that the user can use to join the room. @@ -119,12 +126,13 @@ struct Context { /// appended to the room id to create a URL. The URL is then printed to the /// console using the qr2term library. Finally, the function prints a /// message to the console with the URL. -fn on_create_room(context: &Context, id: String, relay: String) -> Status { +fn on_create_room(context: &Context, id: String, relay: String, transfer_name: String) -> Status { + debug!("Creating room on: {relay}"); let base64 = general_purpose::STANDARD.encode(&context.hmac); let url = format!("{}-{}", id, base64); - let rand_name = generate_random_name(); - let hash_name = hash_random_name(rand_name.clone()); + // let rand_name = generate_random_name(); + let hash_name = hash_random_name(transfer_name.clone()); let send_url = url.to_string(); let h_name = hash_name.to_string(); @@ -149,7 +157,7 @@ fn on_create_room(context: &Context, id: String, relay: String) -> Status { // error!("Failed to generate QR code: {}", error); // } - if let Err(error) = qr2term::print_qr(&rand_name) { + if let Err(error) = qr2term::print_qr(&transfer_name) { error!("Failed to generate QR code: {}", error); } // Print a newline to the console to separate the output from the command @@ -158,7 +166,7 @@ fn on_create_room(context: &Context, id: String, relay: String) -> Status { // Print a message to the console with the URL. println!("Created room: {}", url); - println!("Transfername is: {}", rand_name); + println!("Transfername is: {}", transfer_name); // Continue the event loop. Status::Continue() @@ -549,13 +557,18 @@ fn on_handshake(context: &mut Context, handshake_response: HandshakeResponsePack /// struct is then matched on to call the appropriate function. /// /// If the message is invalid, an error is returned. -fn on_message(context: &mut Context, message: WebSocketMessage, relay: String) -> Status { +fn on_message( + context: &mut Context, + message: WebSocketMessage, + relay: String, + transfer_name: String, +) -> Status { if message.is_text() { let text = message.into_text().unwrap(); let packet = serde_json::from_str(&text).unwrap(); return match packet { - JsonPacketResponse::Create { id } => on_create_room(context, id, relay), + JsonPacketResponse::Create { id } => on_create_room(context, id, relay, transfer_name), JsonPacketResponse::Join { size } => on_join_room(context, size), JsonPacketResponse::Leave { index } => on_leave_room(context, index), JsonPacketResponse::Error { message } => on_error(message), @@ -602,7 +615,13 @@ fn on_message(context: &mut Context, message: WebSocketMessage, relay: String) - /// When the function is finished, it will exit and the transfer will be complete. If there /// is an error during the transfer, the function will print an error message to stdout and /// exit. -pub async fn start(socket: Socket, paths: Vec, room_id: Option, relay: String) { +pub async fn start( + socket: Socket, + paths: Vec, + room_id: Option, + relay: String, + transfer_name: String, +) { // Create a vector to store metadata about each file that will be sent. let mut files = vec![]; @@ -696,7 +715,7 @@ pub async fn start(socket: Socket, paths: Vec, room_id: Option, // client. let incoming_handler = incoming.try_for_each(|message| { // Call the `on_message` function to handle the incoming message. - match on_message(&mut context, message, relay.clone()) { + match on_message(&mut context, message, relay.clone(), transfer_name.clone()) { // If the status is `Status::Exit`, the transfer is complete. Print a message to // stdout and exit the function. Status::Exit() => { @@ -819,7 +838,8 @@ mod tests { &context, "b531e87d-e51a-4507-94f4-335cbe2d32f3-Nc5skZReq7qJN7INwckyAZLWEEbxsrFfH/692tUNgkM=" .to_string(), - String::from("0.0.0.0:8000") + String::from("0.0.0.0:8000"), + String::from("Test") ), Status::Continue() ); @@ -905,18 +925,20 @@ mod tests { on_message( &mut context, WebSocketMessage::Text(r#"{"type":"leave","index":5}"#.to_string()), - String::from("0.0.0.0:8000") + String::from("0.0.0.0:8000"), + String::from("Test") ), Status::Continue() ); - assert_eq!(on_message(&mut context, WebSocketMessage::Text(r#"{"type":"create","id":"b531e87d-e51a-4507-94f4-335cbe2d32f3-Nc5skZReq7qJN7INwckyAZLWEEbxsrFfH/692tUNgkM="}"#.to_string()), String::from("0.0.0.0:8000")), Status::Continue()); + assert_eq!(on_message(&mut context, WebSocketMessage::Text(r#"{"type":"create","id":"b531e87d-e51a-4507-94f4-335cbe2d32f3-Nc5skZReq7qJN7INwckyAZLWEEbxsrFfH/692tUNgkM="}"#.to_string()), String::from("0.0.0.0:8000"), String::from("Test")), Status::Continue()); assert_eq!( on_message( &mut context, WebSocketMessage::Text( r#"{"type":"error","message":"Error Message: Test"}"#.to_string() ), - String::from("0.0.0.0:8000") + String::from("0.0.0.0:8000"), + String::from("Test") ), Status::Err("Error Message: Test".to_string()) ); diff --git a/src/sender/http_client.rs b/src/sender/http_client.rs index 2a0eeb0..053dc6f 100644 --- a/src/sender/http_client.rs +++ b/src/sender/http_client.rs @@ -32,7 +32,7 @@ pub async fn send_info(relay: &str, name: &str, room_id: &str) -> Result let result: Result = task::spawn_blocking(move || { let client = Client::new(); client - .post(format!("{}/upload", url)) + .put(format!("{}/upload", url)) .json(&map) .send()? .text()? diff --git a/src/sender/mod.rs b/src/sender/mod.rs index a862c4a..8db8772 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -46,7 +46,7 @@ use std::{net::SocketAddr, sync::Arc}; use crate::{ relay::{appstate::AppState, server::ws_handler}, - sender::client as sender, + sender::{client as sender, util::generate_random_name}, }; use axum::{routing::get, Router}; use tokio::{net::TcpListener, task}; @@ -61,20 +61,31 @@ use uuid::Uuid; pub async fn start_sender(relay: Arc, files: Arc>) { debug!("Got relay: {relay}"); let room_id = Uuid::new_v4().to_string(); + let rand_name = generate_random_name(); let local_room_id = room_id.clone(); let local_files = files.clone(); + let local_relay = relay.clone(); + let local_rand_name = rand_name.clone(); let local_ws_thread = task::spawn(async move { start_local_ws().await; }); - let relay_thread = - task::spawn( - async move { connect_to_relay(relay.clone(), files.clone(), Some(room_id)).await }, - ); + let relay_thread = task::spawn(async move { + connect_to_server( + relay.clone(), + files.clone(), + Some(room_id), + relay.clone(), + Arc::new(rand_name.clone()), + ) + .await + }); let local_thread = task::spawn(async move { - connect_to_relay( + connect_to_server( Arc::new(String::from("0.0.0.0:9000")), local_files.clone(), Some(local_room_id), + local_relay.clone(), + Arc::new(local_rand_name.clone()), ) .await }); @@ -119,8 +130,16 @@ pub async fn start_local_ws() { } } -async fn connect_to_relay(relay: Arc, files: Arc>, room_id: Option) { +async fn connect_to_server( + relay: Arc, + files: Arc>, + room_id: Option, + message_server: Arc, + transfer_name: Arc, +) { let url = format!("ws://{}/ws", relay); + let message_relay = format!("{}", message_server); + let transfer_name = format!("{}", transfer_name); match url.clone().into_client_request() { Ok(mut request) => { request @@ -136,7 +155,14 @@ async fn connect_to_relay(relay: Arc, files: Arc>, room_id: match connect_async(request).await { Ok((socket, _)) => { let paths = files.to_vec(); - sender::start(socket, paths, Some(room_id), relay.to_string()).await; + sender::start( + socket, + paths, + Some(room_id), + message_relay.to_string(), + transfer_name.clone(), + ) + .await; } Err(e) => { error!("Error: Failed to connect with error: {e}");