feature(sender,receiver): enabled local and relay transfer

actually a clean close of the sender application is still missing and
has to be included
This commit is contained in:
Patryk Hegenberg 2024-05-01 17:46:04 +02:00
parent 300f688111
commit 2262fd9f75
9 changed files with 144 additions and 61 deletions

View file

@ -3,6 +3,7 @@ use crate::relay;
use crate::sender;
use clap::{Parser, Subcommand};
use std::{env, sync::Arc};
use tokio::sync::mpsc;
use tracing::debug;
/// This struct defines the CLI arguments and subcommands for the caesar command line application.

View file

@ -3,18 +3,18 @@ use reqwest;
use sha2::{Digest, Sha256};
use tracing::error;
use crate::relay::transfer::Transfer;
use crate::relay::transfer::TransferResponse;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
pub async fn download_info(relay: &str, name: &str) -> Result<String> {
pub async fn download_info(relay: &str, name: &str) -> Result<TransferResponse> {
let url = String::from("http://") + relay;
let hashed_name = Sha256::digest(name.as_bytes());
let hashed_string = hex::encode(hashed_name);
match reqwest::get(format!("{}/download/{}", url, hashed_string)).await {
Ok(resp) => match resp.json::<Transfer>().await {
Ok(res) => Ok(res.room_id),
Ok(resp) => match resp.json::<TransferResponse>().await {
Ok(res) => Ok(res),
Err(e) => Err(Box::new(e)),
},
Err(err) => {

View file

@ -47,17 +47,29 @@ use tokio_tungstenite::{
use tracing::{debug, error};
pub async fn start_receiver(relay: &str, name: &str) {
let room_id = http_client::download_info(relay, name).await.unwrap();
debug!("Got Room_ID from Server: {room_id}");
let res = http_client::download_info(relay, name).await.unwrap();
debug!("Got room_id from Server: {:?}", res);
let res_ip = res.ip + ":9000";
start_ws_com(relay, room_id.as_str()).await;
if let Err(e) = start_ws_com(res_ip.as_str(), res.room_id[0].as_str()).await {
debug!("Failed to connect local with first room_id: {e}");
if let Err(e) = start_ws_com(res_ip.as_str(), res.room_id[1].as_str()).await {
debug!("Failed to connect local with second room_id: {e}");
if let Err(e) = start_ws_com(relay, res.room_id[0].as_str()).await {
debug!("Failed to connect remote with first room_id: {e}");
if let Err(e) = start_ws_com(relay, res.room_id[1].as_str()).await {
error!("Failed to accomplish transfer with error: {e}");
}
}
}
}
}
pub async fn start_ws_com(relay: &str, name: &str) {
pub async fn start_ws_com(relay: &str, name: &str) -> Result<(), Box<dyn std::error::Error>> {
let url = String::from("ws://") + relay + "/ws";
let Ok(mut request) = url.into_client_request() else {
println!("Error: Failed to create request.");
return;
return Err("Failed to create request".into());
};
// Insert the origin into the request headers to prevent
@ -68,11 +80,6 @@ pub async fn start_ws_com(relay: &str, name: &str) {
println!("Attempting to connect...");
// let Ok((socket, _)) = connect_async(request).await else {
// error!("Error: Failed to connect.");
// return;
// };
match connect_async(request).await {
Ok((socket, _)) => receiver::start(socket, name).await,
Err(e) => error!("Error: Failed to connect: {e:?}"),
@ -81,4 +88,5 @@ pub async fn start_ws_com(relay: &str, name: &str) {
// receiver::client module and is the function that interacts with
// the server to receive files.
// receiver::start(socket, name).await
Ok(())
}

View file

@ -2,7 +2,7 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use crate::relay::room::Room;
use crate::relay::transfer::Transfer;
use crate::relay::transfer::TransferResponse;
/// A struct that holds all of the rooms that the server knows about.
///
@ -12,7 +12,7 @@ use crate::relay::transfer::Transfer;
#[derive(Debug, Clone)]
pub struct AppState {
pub rooms: HashMap<String, Room>,
pub transfers: Vec<Transfer>,
pub transfers: Vec<TransferResponse>,
}
impl AppState {

View file

@ -19,7 +19,7 @@ use axum::{
extract::{ws::WebSocket, Json, Path, State, WebSocketUpgrade},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
routing::{get, post, put},
Router,
};
@ -34,9 +34,9 @@ use tokio::{
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::{debug, error, info, warn};
use crate::relay::appstate::AppState;
use crate::relay::client::Client;
use crate::relay::transfer::Transfer;
use crate::relay::transfer::TransferResponse;
use crate::relay::{appstate::AppState, transfer::TransferRequest};
/// This function starts the WebSocket server.
///
@ -98,7 +98,7 @@ pub async fn start_ws(port: Option<&i32>, listen_addr: Option<&String>) {
// Set up the application routes.
let app = Router::new()
.route("/ws", get(ws_handler))
.route("/upload", post(upload_info))
.route("/upload", put(upload_info))
.route("/download/:name", get(download_info))
.route("/download_success/:name", post(download_success))
.with_state(server)
@ -282,21 +282,35 @@ async fn shutdown_signal() {
pub async fn upload_info(
State(shared_state): State<Arc<RwLock<AppState>>>,
// ConnectInfo(addr): ConnectInfo<SocketAddr>,
Json(payload): Json<Transfer>,
Json(payload): Json<TransferRequest>,
) -> impl IntoResponse {
// debug!("Got upload request from {}", addr.ip().to_string());
let mut data = shared_state.write().await;
let t_request = Transfer {
name: payload.name,
ip: payload.ip,
room_id: payload.room_id,
};
data.transfers.push(t_request.clone());
match data
.transfers
.iter_mut()
.find(|request| request.name == payload.name)
{
Some(request) => {
request.room_id.push(payload.room_id);
debug!("Found Transfer and updated");
(StatusCode::OK, Json(request.clone()))
}
None => {
let mut t_request = TransferResponse {
name: payload.name,
ip: payload.ip,
room_id: Vec::new(),
};
t_request.room_id.push(payload.room_id);
data.transfers.push(t_request.clone());
debug!("New TransferRequest created");
debug!("Actual AppState is {:#?}", *data);
debug!("New TransferRequest created");
debug!("Actual AppState is {:#?}", *data);
(StatusCode::CREATED, Json(t_request))
(StatusCode::CREATED, Json(t_request))
}
}
}
pub async fn download_info(
@ -313,10 +327,10 @@ pub async fn download_info(
warn!("couldn't find transfer-name: {}", name);
(
StatusCode::NOT_FOUND,
Json(Transfer {
Json(TransferResponse {
name: String::from(""),
ip: String::from(""),
room_id: String::from(""),
room_id: vec![String::from("")],
}),
)
}

View file

@ -1,33 +1,45 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Transfer {
pub struct TransferRequest {
pub name: String,
pub ip: String,
pub room_id: String,
}
impl Transfer {
impl TransferRequest {
pub fn new(name: String, ip: String, room_id: String) -> Self {
Self { name, ip, room_id }
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct TransferResponse {
pub name: String,
pub ip: String,
pub room_id: Vec<String>,
}
impl TransferResponse {
pub fn new(name: String, ip: String, room_id: Vec<String>) -> Self {
Self { name, ip, room_id }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new() {
let transfer = Transfer {
let transfer = TransferResponse {
name: "Test".to_string(),
ip: "127.0.0.1".to_string(),
room_id: "This_is_a_test_room_id".to_string(),
room_id: vec!["This_is_a_test_room_id".to_string()],
};
assert_eq!(
Transfer::new(
TransferResponse::new(
"Test".to_string(),
"127.0.0.1".to_string(),
"This_is_a_test_room_id".to_string(),
vec!["This_is_a_test_room_id".to_string()],
),
transfer
)

View file

@ -110,6 +110,13 @@ struct Context {
task: Option<JoinHandle<()>>,
}
impl Context {
async fn clean_up(&mut self) {
if let Some(task) = &self.task {
task.abort();
}
}
}
/// This function is called when the client receives a create room packet
/// from the server. The function is responsible for printing a URL to the
/// console that the user can use to join the room.
@ -119,12 +126,13 @@ struct Context {
/// appended to the room id to create a URL. The URL is then printed to the
/// console using the qr2term library. Finally, the function prints a
/// message to the console with the URL.
fn on_create_room(context: &Context, id: String, relay: String) -> Status {
fn on_create_room(context: &Context, id: String, relay: String, transfer_name: String) -> Status {
debug!("Creating room on: {relay}");
let base64 = general_purpose::STANDARD.encode(&context.hmac);
let url = format!("{}-{}", id, base64);
let rand_name = generate_random_name();
let hash_name = hash_random_name(rand_name.clone());
// let rand_name = generate_random_name();
let hash_name = hash_random_name(transfer_name.clone());
let send_url = url.to_string();
let h_name = hash_name.to_string();
@ -149,7 +157,7 @@ fn on_create_room(context: &Context, id: String, relay: String) -> Status {
// error!("Failed to generate QR code: {}", error);
// }
if let Err(error) = qr2term::print_qr(&rand_name) {
if let Err(error) = qr2term::print_qr(&transfer_name) {
error!("Failed to generate QR code: {}", error);
}
// Print a newline to the console to separate the output from the command
@ -158,7 +166,7 @@ fn on_create_room(context: &Context, id: String, relay: String) -> Status {
// Print a message to the console with the URL.
println!("Created room: {}", url);
println!("Transfername is: {}", rand_name);
println!("Transfername is: {}", transfer_name);
// Continue the event loop.
Status::Continue()
@ -549,13 +557,18 @@ fn on_handshake(context: &mut Context, handshake_response: HandshakeResponsePack
/// struct is then matched on to call the appropriate function.
///
/// If the message is invalid, an error is returned.
fn on_message(context: &mut Context, message: WebSocketMessage, relay: String) -> Status {
fn on_message(
context: &mut Context,
message: WebSocketMessage,
relay: String,
transfer_name: String,
) -> Status {
if message.is_text() {
let text = message.into_text().unwrap();
let packet = serde_json::from_str(&text).unwrap();
return match packet {
JsonPacketResponse::Create { id } => on_create_room(context, id, relay),
JsonPacketResponse::Create { id } => on_create_room(context, id, relay, transfer_name),
JsonPacketResponse::Join { size } => on_join_room(context, size),
JsonPacketResponse::Leave { index } => on_leave_room(context, index),
JsonPacketResponse::Error { message } => on_error(message),
@ -602,7 +615,13 @@ fn on_message(context: &mut Context, message: WebSocketMessage, relay: String) -
/// When the function is finished, it will exit and the transfer will be complete. If there
/// is an error during the transfer, the function will print an error message to stdout and
/// exit.
pub async fn start(socket: Socket, paths: Vec<String>, room_id: Option<String>, relay: String) {
pub async fn start(
socket: Socket,
paths: Vec<String>,
room_id: Option<String>,
relay: String,
transfer_name: String,
) {
// Create a vector to store metadata about each file that will be sent.
let mut files = vec![];
@ -696,7 +715,7 @@ pub async fn start(socket: Socket, paths: Vec<String>, room_id: Option<String>,
// client.
let incoming_handler = incoming.try_for_each(|message| {
// Call the `on_message` function to handle the incoming message.
match on_message(&mut context, message, relay.clone()) {
match on_message(&mut context, message, relay.clone(), transfer_name.clone()) {
// If the status is `Status::Exit`, the transfer is complete. Print a message to
// stdout and exit the function.
Status::Exit() => {
@ -819,7 +838,8 @@ mod tests {
&context,
"b531e87d-e51a-4507-94f4-335cbe2d32f3-Nc5skZReq7qJN7INwckyAZLWEEbxsrFfH/692tUNgkM="
.to_string(),
String::from("0.0.0.0:8000")
String::from("0.0.0.0:8000"),
String::from("Test")
),
Status::Continue()
);
@ -905,18 +925,20 @@ mod tests {
on_message(
&mut context,
WebSocketMessage::Text(r#"{"type":"leave","index":5}"#.to_string()),
String::from("0.0.0.0:8000")
String::from("0.0.0.0:8000"),
String::from("Test")
),
Status::Continue()
);
assert_eq!(on_message(&mut context, WebSocketMessage::Text(r#"{"type":"create","id":"b531e87d-e51a-4507-94f4-335cbe2d32f3-Nc5skZReq7qJN7INwckyAZLWEEbxsrFfH/692tUNgkM="}"#.to_string()), String::from("0.0.0.0:8000")), Status::Continue());
assert_eq!(on_message(&mut context, WebSocketMessage::Text(r#"{"type":"create","id":"b531e87d-e51a-4507-94f4-335cbe2d32f3-Nc5skZReq7qJN7INwckyAZLWEEbxsrFfH/692tUNgkM="}"#.to_string()), String::from("0.0.0.0:8000"), String::from("Test")), Status::Continue());
assert_eq!(
on_message(
&mut context,
WebSocketMessage::Text(
r#"{"type":"error","message":"Error Message: Test"}"#.to_string()
),
String::from("0.0.0.0:8000")
String::from("0.0.0.0:8000"),
String::from("Test")
),
Status::Err("Error Message: Test".to_string())
);

View file

@ -32,7 +32,7 @@ pub async fn send_info(relay: &str, name: &str, room_id: &str) -> Result<String>
let result: Result<String> = task::spawn_blocking(move || {
let client = Client::new();
client
.post(format!("{}/upload", url))
.put(format!("{}/upload", url))
.json(&map)
.send()?
.text()?

View file

@ -46,7 +46,7 @@ use std::{net::SocketAddr, sync::Arc};
use crate::{
relay::{appstate::AppState, server::ws_handler},
sender::client as sender,
sender::{client as sender, util::generate_random_name},
};
use axum::{routing::get, Router};
use tokio::{net::TcpListener, task};
@ -61,20 +61,31 @@ use uuid::Uuid;
pub async fn start_sender(relay: Arc<String>, files: Arc<Vec<String>>) {
debug!("Got relay: {relay}");
let room_id = Uuid::new_v4().to_string();
let rand_name = generate_random_name();
let local_room_id = room_id.clone();
let local_files = files.clone();
let local_relay = relay.clone();
let local_rand_name = rand_name.clone();
let local_ws_thread = task::spawn(async move {
start_local_ws().await;
});
let relay_thread =
task::spawn(
async move { connect_to_relay(relay.clone(), files.clone(), Some(room_id)).await },
);
let relay_thread = task::spawn(async move {
connect_to_server(
relay.clone(),
files.clone(),
Some(room_id),
relay.clone(),
Arc::new(rand_name.clone()),
)
.await
});
let local_thread = task::spawn(async move {
connect_to_relay(
connect_to_server(
Arc::new(String::from("0.0.0.0:9000")),
local_files.clone(),
Some(local_room_id),
local_relay.clone(),
Arc::new(local_rand_name.clone()),
)
.await
});
@ -119,8 +130,16 @@ pub async fn start_local_ws() {
}
}
async fn connect_to_relay(relay: Arc<String>, files: Arc<Vec<String>>, room_id: Option<String>) {
async fn connect_to_server(
relay: Arc<String>,
files: Arc<Vec<String>>,
room_id: Option<String>,
message_server: Arc<String>,
transfer_name: Arc<String>,
) {
let url = format!("ws://{}/ws", relay);
let message_relay = format!("{}", message_server);
let transfer_name = format!("{}", transfer_name);
match url.clone().into_client_request() {
Ok(mut request) => {
request
@ -136,7 +155,14 @@ async fn connect_to_relay(relay: Arc<String>, files: Arc<Vec<String>>, room_id:
match connect_async(request).await {
Ok((socket, _)) => {
let paths = files.to_vec();
sender::start(socket, paths, Some(room_id), relay.to_string()).await;
sender::start(
socket,
paths,
Some(room_id),
message_relay.to_string(),
transfer_name.clone(),
)
.await;
}
Err(e) => {
error!("Error: Failed to connect with error: {e}");