1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
pub mod client;
pub mod http_client;
pub mod util;
use std::{net::SocketAddr, sync::Arc};
use crate::{
relay::{appstate::AppState, server::ws_handler},
sender::client as sender,
};
use axum::{routing::get, Router};
use tokio::{net::TcpListener, sync::mpsc, task};
use tokio_tungstenite::{
connect_async,
tungstenite::{client::IntoClientRequest, http::HeaderValue},
};
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use tracing::{debug, error, info};
use uuid::Uuid;
/// Start the sender process.
///
/// This function starts the sender process which connects to a relay and
/// initiates file transfers. The sender also starts a local WebSocket server.
///
/// # Arguments
///
/// * `name` - The name of the sender.
/// * `relay` - The relay to connect to.
/// * `files` - The files to transfer.
pub async fn start_sender(name: String, relay: Arc<String>, files: Arc<Vec<String>>) {
// Log the name of the sender
debug!("Got name: {:?}", name);
// Create a channel for communication between threads
let (tx, mut rx) = mpsc::channel(1);
// Generate a unique room ID
let room_id = Uuid::new_v4().to_string();
let local_room_id = room_id.clone();
let local_files = files.clone();
let local_relay = relay.clone();
let local_rand_name = name.clone();
let local_tx = tx.clone();
// Start a local WebSocket server
let local_ws_thread = task::spawn(async move {
start_local_ws().await;
});
// Connect to the relay
let relay_thread = task::spawn(async move {
connect_to_server(
relay.clone(),
files.clone(),
Some(room_id),
relay.clone(),
Arc::new(name.clone()),
tx.clone(),
false,
)
.await
});
// Connect to the local WebSocket server
let local_thread = task::spawn(async move {
connect_to_server(
Arc::new(String::from("ws://0.0.0.0:9000")),
local_files.clone(),
Some(local_room_id),
local_relay.clone(),
Arc::new(local_rand_name.clone()),
local_tx.clone(),
true,
)
.await
});
// Wait for the sender threads to finish
rx.recv().await.unwrap();
// Abort the local WebSocket server thread
local_ws_thread.abort();
// Abort the relay thread
relay_thread.abort();
// Abort the local thread
local_thread.abort();
}
/// Start a local WebSocket server.
///
/// This function initializes and runs a WebSocket server on the specified host and port.
/// It creates an instance of the `AppState` struct and uses it as the state for the router.
/// The `ws_handler` function is registered as the handler for the "/ws" route.
///
/// # Arguments
///
/// None
///
/// # Returns
///
/// This function does not return anything.
pub async fn start_local_ws() {
// The host and port the server will listen on.
let app_host = "0.0.0.0";
let app_port = "9000";
// Create an instance of the application state.
let server = AppState::new();
// Create the axum application.
// The `ws_handler` function is registered as the handler for the "/ws" route.
// The `AppState` instance is used as the state for the router.
let app = Router::new()
.route("/ws", get(ws_handler))
.with_state(server)
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
);
// Try to bind the server to the specified host and port.
if let Ok(listener) = TcpListener::bind(&format!("{}:{}", app_host, app_port)).await {
// Log the address the server is listening on.
info!(
"Local WebSocket listening on: {}",
listener.local_addr().unwrap()
);
// Serve the application using the listener.
// The `connect_info` parameter is used to include the client's socket address in the tracing spans.
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await
.unwrap();
} else {
// Log an error if the server fails to bind.
error!("Failed to listen on: {}:{}", app_host, app_port);
}
}
/// Connects to the specified server and starts the file transfer.
///
/// # Arguments
///
/// * `relay` - The relay server URL.
/// * `files` - The files to be transferred.
/// * `room_id` - The room ID for the transfer. If `None`, a random UUID is generated.
/// * `message_server` - The message server URL.
/// * `transfer_name` - The name of the transfer.
/// * `tx` - The sender end of a channel to signal the completion of the transfer.
/// * `is_local` - Whether the transfer is local or not.
async fn connect_to_server(
relay: Arc<String>,
files: Arc<Vec<String>>,
room_id: Option<String>,
message_server: Arc<String>,
transfer_name: Arc<String>,
tx: mpsc::Sender<()>,
is_local: bool,
) {
// Construct the server URL.
let url = format!("{}/ws", relay);
// Construct the message server URL.
let message_relay = format!("{}", message_server);
// Construct the transfer name.
let transfer_name = format!("{}", transfer_name);
// Create a request to the server.
match url.clone().into_client_request() {
Ok(mut request) => {
// Set the "Origin" header.
request
.headers_mut()
.insert("Origin", HeaderValue::from_str(relay.as_ref()).unwrap());
// Log the connection attempt.
debug!("Attempting to connect to {url}...");
// Generate a room ID if not provided.
let room_id = match room_id {
Some(id) => id,
None => Uuid::new_v4().to_string(),
};
// Connect to the server and start the file transfer.
match connect_async(request).await {
Ok((socket, _)) => {
let paths = files.to_vec();
sender::start(
socket,
paths,
Some(room_id),
message_relay.to_string(),
transfer_name.clone(),
is_local,
)
.await;
// Signal the completion of the transfer.
tx.send(()).await.unwrap();
}
Err(e) => {
// Log the connection error.
error!("Error: Failed to connect with error: {e}");
}
}
}
Err(e) => {
// Log the request creation error.
error!("Error: failed to create request with reason: {e:?}");
}
}
}