feature(sender): enabled start of multiple threads for local ws server and local and relay sender connection

actually the local connection isnt working properly at the moment.
This commit is contained in:
Patryk Hegenberg 2024-05-01 14:42:10 +02:00
parent 336ea18ad9
commit 300f688111
2 changed files with 88 additions and 17 deletions

View file

@ -2,7 +2,7 @@ use crate::receiver;
use crate::relay; use crate::relay;
use crate::sender; use crate::sender;
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use std::env; use std::{env, sync::Arc};
use tracing::debug; use tracing::debug;
/// This struct defines the CLI arguments and subcommands for the caesar command line application. /// This struct defines the CLI arguments and subcommands for the caesar command line application.
@ -147,15 +147,16 @@ impl Args {
// `sender` module with the list of files that the user wants to // `sender` module with the list of files that the user wants to
// send. // send.
Some(Commands::Send { relay, files }) => { Some(Commands::Send { relay, files }) => {
sender::start_sender( let relay_string: String = relay
relay.as_deref().unwrap_or( .as_deref()
env::var("APP_ORIGIN") .unwrap_or(
.unwrap_or("wss://caesar-transfer-iu.shuttleapp.rs/ws".to_string()) &env::var("APP_ORIGIN")
.as_str(), .unwrap_or("wss://caesar-transfer-iu.shuttleapp.rs/ws".to_string()),
), )
files, .to_string();
) let relay_arc = Arc::new(relay_string);
.await; let files_arc = Arc::new(files.to_vec());
sender::start_sender(relay_arc, files_arc).await;
} }
// If the user wants to receive files, call `start_receiver()` in the // If the user wants to receive files, call `start_receiver()` in the
// `receiver` module with the name of the transfer that the user // `receiver` module with the name of the transfer that the user

View file

@ -42,31 +42,101 @@ pub mod client;
pub mod http_client; pub mod http_client;
pub mod util; pub mod util;
use crate::sender::client as sender; use std::{net::SocketAddr, sync::Arc};
use crate::{
relay::{appstate::AppState, server::ws_handler},
sender::client as sender,
};
use axum::{routing::get, Router};
use tokio::{net::TcpListener, task};
use tokio_tungstenite::{ use tokio_tungstenite::{
connect_async, connect_async,
tungstenite::{client::IntoClientRequest, http::HeaderValue}, tungstenite::{client::IntoClientRequest, http::HeaderValue},
}; };
use tracing::{debug, error}; use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::{debug, error, info};
use uuid::Uuid; use uuid::Uuid;
pub async fn start_sender(relay: &str, files: &[String]) { pub async fn start_sender(relay: Arc<String>, files: Arc<Vec<String>>) {
debug!("Got relay: {relay}"); debug!("Got relay: {relay}");
let url = String::from("ws://") + relay + "/ws"; let room_id = Uuid::new_v4().to_string();
let local_room_id = room_id.clone();
let local_files = files.clone();
let local_ws_thread = task::spawn(async move {
start_local_ws().await;
});
let relay_thread =
task::spawn(
async move { connect_to_relay(relay.clone(), files.clone(), Some(room_id)).await },
);
let local_thread = task::spawn(async move {
connect_to_relay(
Arc::new(String::from("0.0.0.0:9000")),
local_files.clone(),
Some(local_room_id),
)
.await
});
local_ws_thread.await.unwrap();
relay_thread.await.unwrap();
local_thread.await.unwrap();
}
pub async fn start_local_ws() {
let app_host = "0.0.0.0";
let app_port = "9000";
// Create a new server data structure.
let server = AppState::new();
// Set up the application routes.
let app = Router::new()
.route("/ws", get(ws_handler))
.with_state(server)
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
);
if let Ok(listener) = TcpListener::bind(&format!("{}:{}", app_host, app_port)).await {
info!(
"Local Websocket listening on: {}",
listener.local_addr().unwrap()
);
// Run the server.
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await
.unwrap();
} else {
// Log binding failure and exit.
error!("Failed to listen on: {}:{}", app_host, app_port);
}
}
async fn connect_to_relay(relay: Arc<String>, files: Arc<Vec<String>>, room_id: Option<String>) {
let url = format!("ws://{}/ws", relay);
match url.clone().into_client_request() { match url.clone().into_client_request() {
Ok(mut request) => { Ok(mut request) => {
request request
.headers_mut() .headers_mut()
.insert("Origin", HeaderValue::from_str(relay).unwrap()); .insert("Origin", HeaderValue::from_str(relay.as_ref()).unwrap());
debug!("Attempting to connect to {url}..."); debug!("Attempting to connect to {url}...");
let room_id = Uuid::new_v4().to_string(); let room_id = match room_id {
Some(id) => id,
None => Uuid::new_v4().to_string(),
};
match connect_async(request).await { match connect_async(request).await {
Ok((socket, _)) => { Ok((socket, _)) => {
let paths = files.to_vec(); let paths = files.to_vec();
sender::start(socket, paths, Some(room_id), relay.to_string()).await; sender::start(socket, paths, Some(room_id), relay.to_string()).await;
// sender::start(socket, paths).await;
} }
Err(e) => { Err(e) => {
error!("Error: Failed to connect with error: {e}"); error!("Error: Failed to connect with error: {e}");