From 08c4610043f2bd02fe8d65ac1d730f2083e18d68 Mon Sep 17 00:00:00 2001 From: Patryk Hegenberg Date: Sun, 21 Apr 2024 15:13:55 +0200 Subject: [PATCH] sender: receiver: server: add basic direct file transfer --- src/cli/args.rs | 45 +++++++++--- src/receiver/client.rs | 102 ++++++++++++++++++++++++++++ src/receiver/mod.rs | 2 +- src/receiver/receiver.rs | 16 ----- src/relay/server.rs | 14 +++- src/sender/{sender.rs => client.rs} | 37 ++++++++-- src/sender/mod.rs | 2 +- src/sender/server.rs | 48 ++++++++----- src/transfer_info/transfer_info.rs | 2 + 9 files changed, 219 insertions(+), 49 deletions(-) create mode 100644 src/receiver/client.rs delete mode 100644 src/receiver/receiver.rs rename src/sender/{sender.rs => client.rs} (50%) diff --git a/src/cli/args.rs b/src/cli/args.rs index 45c5984..3f9cf95 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -1,10 +1,10 @@ use crate::relay::server; use crate::{ - receiver::receiver, - sender::{sender, server::serf_file}, + receiver::client as receiver, + sender::{client as sender, server::serf_file}, }; use clap::{Parser, Subcommand}; -use log::debug; +use tracing::{debug, error, info}; #[derive(Parser, Debug)] #[command(version, about, long_about = None)] @@ -81,23 +81,50 @@ impl Args { debug!("args: {:#?}", self); match &self.command { Some(Commands::Send { relay, file }) => { - sender::send_info( + let _ = match sender::send_info( relay.as_deref().unwrap_or("http://0.0.0.0:1323"), file.as_deref().unwrap_or("test.txt"), ) - .await?; - serf_file(file.as_ref().unwrap()).await; + .await + { + Ok(name) => { + println!("Transfer name: {}", name); + serf_file(file.as_ref().unwrap()).await; + Ok(()) + } + Err(err) => Err(err), + }; } Some(Commands::Receive { relay, - overwrite: _, + overwrite, name, }) => { - receiver::download_info( + let response = receiver::download_info( relay.as_deref().unwrap_or("http://0.0.0.0:1323"), name.as_deref().unwrap_or("None"), ) - .await? + .await; + match response { + Ok(res) => { + debug!("The response is: {:#?}", res); + let reachable = receiver::ping_sender(&res.ip).await; + match reachable { + Ok(_) => match receiver::download_file(&res, overwrite).await { + Ok(_) => { + info!("Download complete"); + let _ = match receiver::signal_success(&res.ip).await { + Ok(_) => Ok(()), + Err(err) => Err(err), + }; + } + Err(err) => error!(err), + }, + Err(err) => error!("Error: {:#?}", err), + } + } + Err(err) => error!("Error: {:#?}", err), + } } Some(Commands::Serve { port, diff --git a/src/receiver/client.rs b/src/receiver/client.rs new file mode 100644 index 0000000..0f9315f --- /dev/null +++ b/src/receiver/client.rs @@ -0,0 +1,102 @@ +use crate::transfer_info::transfer_info::TransferInfoRequest; +use reqwest::Client; +use std::error::Error; +use std::fmt; +use std::fs::File; +use std::io::copy; +use tracing::{debug, error, info}; + +type Result = std::result::Result>; + +#[derive(Debug)] +struct TransferNotFoundError { + message: String, +} + +impl TransferNotFoundError { + fn new(msg: &str) -> Self { + Self { + message: msg.to_string(), + } + } +} + +impl fmt::Display for TransferNotFoundError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.message) + } +} + +impl Error for TransferNotFoundError { + fn description(&self) -> &str { + &self.message + } +} + +pub async fn download_info(relay: &str, filename: &str) -> Result { + match reqwest::get(format!("{}/download/{}", relay.to_string(), filename)).await { + Ok(resp) => { + let json = resp.json::().await?; + debug!("Json Response: {:#?}", json); + if json.message == "error".to_string() { + Err(Box::new(TransferNotFoundError::new( + "no transfer with given name found", + ))) + } else { + debug!("Got Positive response"); + Ok(json) + } + } + Err(err) => { + error!("Error: {err}"); + Err(Box::new(err)) + } + } +} + +pub async fn download_file(transfer_info: &TransferInfoRequest, overwrite: &bool) -> Result<()> { + if !*overwrite && File::open(&transfer_info.body.files).is_ok() { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::AlreadyExists, + format!("File '{} already exists", &transfer_info.body.files), + ))); + } + + let resp = reqwest::get(format!("http://{}:1300/download_file", &transfer_info.ip)).await?; + if !resp.status().is_success() { + return Err(Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to download file from {}", &transfer_info.ip), + ))); + } + let mut dest = File::create(&transfer_info.body.files)?; + let content = resp.text().await?; + copy(&mut content.as_bytes(), &mut dest)?; + info!("Download complete"); + Ok(()) +} + +pub async fn ping_sender(sender: &String) -> Result { + debug!("Pinging Sender on {:#?}", sender); + match reqwest::get(format!("http://{}:1300/ping", sender)).await { + Ok(resp) => { + debug!("Sender directly reachable"); + debug!("Response is: {:#?}", resp); + Ok(true) + } + Err(err) => { + error!("Error: {err}"); + Err(Box::new(err)) + } + } +} + +pub async fn signal_success(sender: &String) -> Result<()> { + debug!("Signaling shutdown to {:#?}", sender); + let client = Client::new(); + let _ = client + .post(format!("http://{}:1300/shutdown", sender)) + .send() + .await?; + Ok(()) +} diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs index 4c0db7e..b9babe5 100644 --- a/src/receiver/mod.rs +++ b/src/receiver/mod.rs @@ -1 +1 @@ -pub mod receiver; +pub mod client; diff --git a/src/receiver/receiver.rs b/src/receiver/receiver.rs deleted file mode 100644 index 1551e79..0000000 --- a/src/receiver/receiver.rs +++ /dev/null @@ -1,16 +0,0 @@ -use crate::transfer_info::transfer_info::TransferInfoRequest; - -type Result = std::result::Result>; - -pub async fn download_info(relay: &str, filename: &str) -> Result<()> { - match reqwest::get(format!("{}/download/{}", relay.to_string(), filename)).await { - Ok(resp) => { - let json = resp.json::().await?; - println!("Json Response: {:#?}", json); - } - Err(err) => { - println!("Error: {err}"); - } - } - Ok(()) -} diff --git a/src/relay/server.rs b/src/relay/server.rs index cea4cb7..57260ea 100644 --- a/src/relay/server.rs +++ b/src/relay/server.rs @@ -84,7 +84,18 @@ async fn download_info( } None => { warn!("couldn't find transfer-name: {}", name); - (StatusCode::NOT_FOUND, Json(TransferInfoRequest::new())) + ( + StatusCode::NOT_FOUND, + Json(TransferInfoRequest { + ip: "".to_string(), + name: "".to_string(), + message: "error".to_string(), + body: TransferInfoBody { + keyword: "".to_string(), + files: "".to_string(), + }, + }), + ) } } } @@ -99,6 +110,7 @@ async fn upload_info( let t_request = TransferInfoRequest { ip: addr.ip().to_string(), name: generate_random_name(), + message: "created".to_string(), body: TransferInfoBody { keyword: payload.keyword, files: payload.files, diff --git a/src/sender/sender.rs b/src/sender/client.rs similarity index 50% rename from src/sender/sender.rs rename to src/sender/client.rs index 729bb13..a94f980 100644 --- a/src/sender/sender.rs +++ b/src/sender/client.rs @@ -2,10 +2,37 @@ use crate::transfer_info::transfer_info::TransferInfoRequest; use log::{debug, error}; use reqwest::{Client, StatusCode}; use std::collections::HashMap; +use std::error::Error; +use std::fmt; type Result = std::result::Result>; -pub async fn send_info(relay: &str, file: &str) -> Result<()> { +#[derive(Debug)] +struct TransferNotCreatedError { + message: String, +} + +impl TransferNotCreatedError { + fn new(msg: &str) -> Self { + Self { + message: msg.to_string(), + } + } +} + +impl fmt::Display for TransferNotCreatedError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.message) + } +} + +impl Error for TransferNotCreatedError { + fn description(&self) -> &str { + &self.message + } +} + +pub async fn send_info(relay: &str, file: &str) -> Result { debug!("Send Request to: {:?}", relay.to_string()); let mut map = HashMap::new(); map.insert("keyword", "test"); @@ -20,10 +47,10 @@ pub async fn send_info(relay: &str, file: &str) -> Result<()> { if res.status() == StatusCode::CREATED { let transfer_info: TransferInfoRequest = res.json().await?; debug!("Json Response: {:#?}", transfer_info); - println!("Transfer name: {}", transfer_info.name); + Ok(transfer_info.name) } else { - error!("Error reading response"); + Err(Box::new(TransferNotCreatedError::new( + "Transfer could not be created.", + ))) } - - Ok(()) } diff --git a/src/sender/mod.rs b/src/sender/mod.rs index 587a0dc..c07f47e 100644 --- a/src/sender/mod.rs +++ b/src/sender/mod.rs @@ -1,2 +1,2 @@ -pub mod sender; +pub mod client; pub mod server; diff --git a/src/sender/server.rs b/src/sender/server.rs index b97f3cf..acde90b 100644 --- a/src/sender/server.rs +++ b/src/sender/server.rs @@ -1,35 +1,32 @@ use crate::transfer_info::transfer_info::{TransferInfoBody, TransferInfoRequest}; use axum::{ - extract::{connect_info::ConnectInfo, Json, Path, State}, - http::{self, StatusCode}, + extract::Json, + http::StatusCode, response::IntoResponse, routing::{get, post}, Router, }; -use std::{env, net::SocketAddr}; +use lazy_static::lazy_static; +use serde_json::json; +use std::{net::SocketAddr, sync::Arc, time::Duration}; +use tokio::sync::Mutex; use tower_http::services::ServeFile; use tracing::debug; +lazy_static! { + static ref SHUTDOWN_SIGNAL: Arc> = Arc::new(Mutex::new(false)); +} pub async fn serf_file(path: &String) { debug!("Sender starting..."); - let app_environemt = env::var("APP_ENVIRONMENT").unwrap_or("development".to_string()); let app_host = "0.0.0.0".to_string(); let app_port = "1300".to_string(); debug!("Server configured to accept connections on host {app_host}..."); debug!("Server configured to listen connections on port {app_port}..."); - match app_environemt.as_str() { - "development" => { - debug!("Running in development mode"); - } - "production" => { - debug!("Running in production mode"); - } - _ => { - debug!("Running in development mode"); - } - } - let app = Router::new().route_service("/download_file", ServeFile::new(path)); + let app = Router::new() + .route_service("/download_file", ServeFile::new(path)) + .route("/ping", get(ping)) + .route("/shutdown", post(shutdown)); let listener = tokio::net::TcpListener::bind(format!("{}:{}", app_host, app_port).to_string()) .await .unwrap(); @@ -37,6 +34,25 @@ pub async fn serf_file(path: &String) { listener, app.into_make_service_with_connect_info::(), ) + .with_graceful_shutdown(async { + while !*SHUTDOWN_SIGNAL.lock().await { + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) .await .unwrap(); } + +async fn ping() -> impl IntoResponse { + let response = json!({ + "message": "pong" + }); + (StatusCode::OK, Json(response)) +} + +async fn shutdown() -> impl IntoResponse { + debug!("Initiating server shutdown..."); + *SHUTDOWN_SIGNAL.lock().await = true; + debug!("Server is shutting down..."); + (StatusCode::OK, "Server is shutting down...") +} diff --git a/src/transfer_info/transfer_info.rs b/src/transfer_info/transfer_info.rs index 304603c..c6e7aa5 100644 --- a/src/transfer_info/transfer_info.rs +++ b/src/transfer_info/transfer_info.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; pub struct TransferInfoRequest { pub ip: String, pub name: String, + pub message: String, pub body: TransferInfoBody, } @@ -18,6 +19,7 @@ impl TransferInfoRequest { Self { ip: "".to_string(), name: "".to_string(), + message: "".to_string(), body: TransferInfoBody { keyword: "".to_string(), files: "".to_string(),