sender: receiver: server: add basic direct file transfer

This commit is contained in:
Patryk Hegenberg 2024-04-21 15:13:55 +02:00
parent acec23a5d3
commit 08c4610043
9 changed files with 219 additions and 49 deletions

View file

@ -1,10 +1,10 @@
use crate::relay::server;
use crate::{
receiver::receiver,
sender::{sender, server::serf_file},
receiver::client as receiver,
sender::{client as sender, server::serf_file},
};
use clap::{Parser, Subcommand};
use log::debug;
use tracing::{debug, error, info};
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
@ -81,23 +81,50 @@ impl Args {
debug!("args: {:#?}", self);
match &self.command {
Some(Commands::Send { relay, file }) => {
sender::send_info(
let _ = match sender::send_info(
relay.as_deref().unwrap_or("http://0.0.0.0:1323"),
file.as_deref().unwrap_or("test.txt"),
)
.await?;
.await
{
Ok(name) => {
println!("Transfer name: {}", name);
serf_file(file.as_ref().unwrap()).await;
Ok(())
}
Err(err) => Err(err),
};
}
Some(Commands::Receive {
relay,
overwrite: _,
overwrite,
name,
}) => {
receiver::download_info(
let response = receiver::download_info(
relay.as_deref().unwrap_or("http://0.0.0.0:1323"),
name.as_deref().unwrap_or("None"),
)
.await?
.await;
match response {
Ok(res) => {
debug!("The response is: {:#?}", res);
let reachable = receiver::ping_sender(&res.ip).await;
match reachable {
Ok(_) => match receiver::download_file(&res, overwrite).await {
Ok(_) => {
info!("Download complete");
let _ = match receiver::signal_success(&res.ip).await {
Ok(_) => Ok(()),
Err(err) => Err(err),
};
}
Err(err) => error!(err),
},
Err(err) => error!("Error: {:#?}", err),
}
}
Err(err) => error!("Error: {:#?}", err),
}
}
Some(Commands::Serve {
port,

102
src/receiver/client.rs Normal file
View file

@ -0,0 +1,102 @@
use crate::transfer_info::transfer_info::TransferInfoRequest;
use reqwest::Client;
use std::error::Error;
use std::fmt;
use std::fs::File;
use std::io::copy;
use tracing::{debug, error, info};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#[derive(Debug)]
struct TransferNotFoundError {
message: String,
}
impl TransferNotFoundError {
fn new(msg: &str) -> Self {
Self {
message: msg.to_string(),
}
}
}
impl fmt::Display for TransferNotFoundError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.message)
}
}
impl Error for TransferNotFoundError {
fn description(&self) -> &str {
&self.message
}
}
pub async fn download_info(relay: &str, filename: &str) -> Result<TransferInfoRequest> {
match reqwest::get(format!("{}/download/{}", relay.to_string(), filename)).await {
Ok(resp) => {
let json = resp.json::<TransferInfoRequest>().await?;
debug!("Json Response: {:#?}", json);
if json.message == "error".to_string() {
Err(Box::new(TransferNotFoundError::new(
"no transfer with given name found",
)))
} else {
debug!("Got Positive response");
Ok(json)
}
}
Err(err) => {
error!("Error: {err}");
Err(Box::new(err))
}
}
}
pub async fn download_file(transfer_info: &TransferInfoRequest, overwrite: &bool) -> Result<()> {
if !*overwrite && File::open(&transfer_info.body.files).is_ok() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::AlreadyExists,
format!("File '{} already exists", &transfer_info.body.files),
)));
}
let resp = reqwest::get(format!("http://{}:1300/download_file", &transfer_info.ip)).await?;
if !resp.status().is_success() {
return Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to download file from {}", &transfer_info.ip),
)));
}
let mut dest = File::create(&transfer_info.body.files)?;
let content = resp.text().await?;
copy(&mut content.as_bytes(), &mut dest)?;
info!("Download complete");
Ok(())
}
pub async fn ping_sender(sender: &String) -> Result<bool> {
debug!("Pinging Sender on {:#?}", sender);
match reqwest::get(format!("http://{}:1300/ping", sender)).await {
Ok(resp) => {
debug!("Sender directly reachable");
debug!("Response is: {:#?}", resp);
Ok(true)
}
Err(err) => {
error!("Error: {err}");
Err(Box::new(err))
}
}
}
pub async fn signal_success(sender: &String) -> Result<()> {
debug!("Signaling shutdown to {:#?}", sender);
let client = Client::new();
let _ = client
.post(format!("http://{}:1300/shutdown", sender))
.send()
.await?;
Ok(())
}

View file

@ -1 +1 @@
pub mod receiver;
pub mod client;

View file

@ -1,16 +0,0 @@
use crate::transfer_info::transfer_info::TransferInfoRequest;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
pub async fn download_info(relay: &str, filename: &str) -> Result<()> {
match reqwest::get(format!("{}/download/{}", relay.to_string(), filename)).await {
Ok(resp) => {
let json = resp.json::<TransferInfoRequest>().await?;
println!("Json Response: {:#?}", json);
}
Err(err) => {
println!("Error: {err}");
}
}
Ok(())
}

View file

@ -84,7 +84,18 @@ async fn download_info(
}
None => {
warn!("couldn't find transfer-name: {}", name);
(StatusCode::NOT_FOUND, Json(TransferInfoRequest::new()))
(
StatusCode::NOT_FOUND,
Json(TransferInfoRequest {
ip: "".to_string(),
name: "".to_string(),
message: "error".to_string(),
body: TransferInfoBody {
keyword: "".to_string(),
files: "".to_string(),
},
}),
)
}
}
}
@ -99,6 +110,7 @@ async fn upload_info(
let t_request = TransferInfoRequest {
ip: addr.ip().to_string(),
name: generate_random_name(),
message: "created".to_string(),
body: TransferInfoBody {
keyword: payload.keyword,
files: payload.files,

View file

@ -2,10 +2,37 @@ use crate::transfer_info::transfer_info::TransferInfoRequest;
use log::{debug, error};
use reqwest::{Client, StatusCode};
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
pub async fn send_info(relay: &str, file: &str) -> Result<()> {
#[derive(Debug)]
struct TransferNotCreatedError {
message: String,
}
impl TransferNotCreatedError {
fn new(msg: &str) -> Self {
Self {
message: msg.to_string(),
}
}
}
impl fmt::Display for TransferNotCreatedError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.message)
}
}
impl Error for TransferNotCreatedError {
fn description(&self) -> &str {
&self.message
}
}
pub async fn send_info(relay: &str, file: &str) -> Result<String> {
debug!("Send Request to: {:?}", relay.to_string());
let mut map = HashMap::new();
map.insert("keyword", "test");
@ -20,10 +47,10 @@ pub async fn send_info(relay: &str, file: &str) -> Result<()> {
if res.status() == StatusCode::CREATED {
let transfer_info: TransferInfoRequest = res.json().await?;
debug!("Json Response: {:#?}", transfer_info);
println!("Transfer name: {}", transfer_info.name);
Ok(transfer_info.name)
} else {
error!("Error reading response");
Err(Box::new(TransferNotCreatedError::new(
"Transfer could not be created.",
)))
}
Ok(())
}

View file

@ -1,2 +1,2 @@
pub mod sender;
pub mod client;
pub mod server;

View file

@ -1,35 +1,32 @@
use crate::transfer_info::transfer_info::{TransferInfoBody, TransferInfoRequest};
use axum::{
extract::{connect_info::ConnectInfo, Json, Path, State},
http::{self, StatusCode},
extract::Json,
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Router,
};
use std::{env, net::SocketAddr};
use lazy_static::lazy_static;
use serde_json::json;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tower_http::services::ServeFile;
use tracing::debug;
lazy_static! {
static ref SHUTDOWN_SIGNAL: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
}
pub async fn serf_file(path: &String) {
debug!("Sender starting...");
let app_environemt = env::var("APP_ENVIRONMENT").unwrap_or("development".to_string());
let app_host = "0.0.0.0".to_string();
let app_port = "1300".to_string();
debug!("Server configured to accept connections on host {app_host}...");
debug!("Server configured to listen connections on port {app_port}...");
match app_environemt.as_str() {
"development" => {
debug!("Running in development mode");
}
"production" => {
debug!("Running in production mode");
}
_ => {
debug!("Running in development mode");
}
}
let app = Router::new().route_service("/download_file", ServeFile::new(path));
let app = Router::new()
.route_service("/download_file", ServeFile::new(path))
.route("/ping", get(ping))
.route("/shutdown", post(shutdown));
let listener = tokio::net::TcpListener::bind(format!("{}:{}", app_host, app_port).to_string())
.await
.unwrap();
@ -37,6 +34,25 @@ pub async fn serf_file(path: &String) {
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(async {
while !*SHUTDOWN_SIGNAL.lock().await {
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await
.unwrap();
}
async fn ping() -> impl IntoResponse {
let response = json!({
"message": "pong"
});
(StatusCode::OK, Json(response))
}
async fn shutdown() -> impl IntoResponse {
debug!("Initiating server shutdown...");
*SHUTDOWN_SIGNAL.lock().await = true;
debug!("Server is shutting down...");
(StatusCode::OK, "Server is shutting down...")
}

View file

@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
pub struct TransferInfoRequest {
pub ip: String,
pub name: String,
pub message: String,
pub body: TransferInfoBody,
}
@ -18,6 +19,7 @@ impl TransferInfoRequest {
Self {
ip: "".to_string(),
name: "".to_string(),
message: "".to_string(),
body: TransferInfoBody {
keyword: "".to_string(),
files: "".to_string(),