From 2e29825a3d468b10128615935cbb0821bd4e9496 Mon Sep 17 00:00:00 2001 From: xenofem Date: Sat, 9 Sep 2023 17:03:35 -0400 Subject: [PATCH 1/2] no more websocket ack messages, just use lower-level backpressure --- Cargo.lock | 2 +- Cargo.toml | 2 +- cli/transbeam-cli | 4 -- src/upload.rs | 15 ++---- static/js/upload.js | 128 +++++++++++++++++++++++--------------------- 5 files changed, 74 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e6ead0..f87259c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1778,7 +1778,7 @@ dependencies = [ [[package]] name = "transbeam" -version = "0.3.0" +version = "0.4.0" dependencies = [ "actix", "actix-files", diff --git a/Cargo.toml b/Cargo.toml index 58d0229..ee3496f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "transbeam" -version = "0.3.0" +version = "0.4.0" authors = ["xenofem "] edition = "2021" license = "MIT" diff --git a/cli/transbeam-cli b/cli/transbeam-cli index 7299398..331a725 100755 --- a/cli/transbeam-cli +++ b/cli/transbeam-cli @@ -51,10 +51,6 @@ async def send(paths, host, password, lifetime, collection_name=None): loader = file_loader([(paths[i], fileMetadata[i]["size"]) for i in range(len(paths))]) for data in loader: await ws.send(data) - resp = await ws.recv() - if resp != "ack": - tqdm.write("unexpected response: {}".format(resp)) - exit(1) parser = argparse.ArgumentParser(description="Upload files to transbeam") parser.add_argument("-l", "--lifetime", type=int, default=7, help="Lifetime in days for files (default 7)") diff --git a/src/upload.rs b/src/upload.rs index 94f80b8..4f59490 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -211,10 +211,6 @@ impl StreamHandler> for Uploader { } } -fn ack(ctx: &mut Context) { - ctx.text("ack"); -} - impl Uploader { fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) { error!("{}", e); @@ -350,13 +346,10 @@ impl Uploader { }), ); } - ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => { - let result = self.handle_data(data)?; - ack(ctx); - return Ok(result); - } - ws::Message::Continuation(Item::FirstBinary(data)) - | ws::Message::Continuation(Item::Continue(data)) => { + ws::Message::Binary(data) + | ws::Message::Continuation(Item::FirstBinary(data)) + | ws::Message::Continuation(Item::Continue(data)) + | ws::Message::Continuation(Item::Last(data)) => { return self.handle_data(data); } ws::Message::Close(reason) => { diff --git a/static/js/upload.js b/static/js/upload.js index debf733..9ec6e79 100644 --- a/static/js/upload.js +++ b/static/js/upload.js @@ -2,6 +2,8 @@ const FILE_CHUNK_SIZE = 16384; const MAX_FILES = 256; const SAMPLE_WINDOW = 100; const STALL_THRESHOLD = 1000; +const MAX_WS_BUFFER = 1048576; +const WS_BUFFER_DELAY = 10; let files = []; @@ -204,47 +206,42 @@ function sendManifest() { } function handleMessage(msg) { - if (bytesSent === 0) { - let reply; - try { - reply = JSON.parse(msg.data); - } catch (error) { - socket.close(); - displayError('Received an invalid response from the server'); - console.error(error); - return; - } - if (reply.type === 'ready') { - downloadCode.textContent = reply.code; - updateProgress(); - document.body.className = 'uploading'; - sendData(); - return; - } + if (bytesSent > 0) { + console.warn('Received unexpected message from server during upload', msg.data); + return; + } - // we're going to display a more useful error message - socket.removeEventListener('close', handleClose); + let reply; + try { + reply = JSON.parse(msg.data); + } catch (error) { socket.close(); - if (reply.type === 'too_big') { - maxSize = reply.max_size; - updateFiles(); - } else if (reply.type === 'too_long') { - updateMaxLifetime(reply.max_lifetime); - displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`); - } else if (reply.type === 'incorrect_password') { - messageBox.textContent = ('Incorrect password'); - document.body.className = 'error landing'; - } else if (reply.type === 'error') { - displayError(reply.details); - } - } else { - if (msg.data === 'ack') { - sendData(); - } else { - console.error('Received unexpected message from server instead of ack', msg.data); - displayError(); - socket.close(); - } + displayError('Received an invalid response from the server'); + console.error(error); + return; + } + if (reply.type === 'ready') { + downloadCode.textContent = reply.code; + updateProgress(); + document.body.className = 'uploading'; + sendData(); + return; + } + + // we're going to display a more useful error message + socket.removeEventListener('close', handleClose); + socket.close(); + if (reply.type === 'too_big') { + maxSize = reply.max_size; + updateFiles(); + } else if (reply.type === 'too_long') { + updateMaxLifetime(reply.max_lifetime); + displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`); + } else if (reply.type === 'incorrect_password') { + messageBox.textContent = ('Incorrect password'); + document.body.className = 'error landing'; + } else if (reply.type === 'error') { + displayError(reply.details); } } @@ -262,32 +259,43 @@ function updateMaxLifetime(lifetime) { } function sendData() { - if (fileIndex >= files.length) { - finishSending(); + if (socket.readyState !== 1) { return; } - const currentFile = files[fileIndex]; - if (byteIndex < currentFile.size) { - const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size); - const data = currentFile.slice(byteIndex, endpoint); - socket.send(data); - byteIndex = endpoint; - bytesSent += data.size; - // It's ok if the monotonically increasing fields like - // percentage are updating super quickly, but it's awkward for - // rate and ETA - const now = Date.now() / 1000; - if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) { - timestamps.push([now, bytesSent]); - if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); } + while (true) { + if (fileIndex >= files.length) { + finishSending(); + return; } - updateProgress(); - } else { - fileIndex += 1; - byteIndex = 0; - sendData(); + if (socket.bufferedAmount >= MAX_WS_BUFFER) { + setTimeout(sendData, WS_BUFFER_DELAY); + return; + } + + const currentFile = files[fileIndex]; + if (byteIndex < currentFile.size) { + const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size); + const data = currentFile.slice(byteIndex, endpoint); + socket.send(data); + byteIndex = endpoint; + bytesSent += data.size; + + // It's ok if the monotonically increasing fields like + // percentage are updating super quickly, but it's awkward for + // rate and ETA + const now = Date.now() / 1000; + if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) { + timestamps.push([now, bytesSent]); + if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); } + } + + updateProgress(); + } else { + fileIndex += 1; + byteIndex = 0; + } } } From 8b001911f7777228b8839bc377a4796541cf7b44 Mon Sep 17 00:00:00 2001 From: xenofem Date: Sat, 9 Sep 2023 17:04:45 -0400 Subject: [PATCH 2/2] clippy and format --- src/main.rs | 58 ++++++++++++++++++++++++++++++++++++---------------- src/state.rs | 11 ++++++---- src/store.rs | 8 ++++++-- 3 files changed, 53 insertions(+), 24 deletions(-) diff --git a/src/main.rs b/src/main.rs index 99a4b11..21e6aa6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,13 +4,13 @@ mod store; mod upload; mod zip; -use std::{fmt::Debug, path::PathBuf, str::FromStr, ops::Deref}; +use std::{fmt::Debug, ops::Deref, path::PathBuf, str::FromStr}; use actix_http::StatusCode; -use actix_session::{SessionMiddleware, storage::CookieSessionStore, Session}; +use actix_session::{storage::CookieSessionStore, Session, SessionMiddleware}; use actix_web::{ - error::InternalError, get, middleware::Logger, post, web, App, HttpRequest, HttpResponse, - HttpServer, Responder, cookie, + cookie, error::InternalError, get, middleware::Logger, post, web, App, HttpRequest, + HttpResponse, HttpServer, Responder, }; use actix_web_actors::ws; use argon2::{Argon2, PasswordVerifier}; @@ -19,7 +19,7 @@ use bytesize::ByteSize; use log::{error, warn}; use password_hash::PasswordHashString; use serde::{Deserialize, Serialize}; -use state::{StateDb, prelude::SizedFile}; +use state::{prelude::SizedFile, StateDb}; use store::{StoredFile, StoredFiles}; use tokio::fs::File; @@ -93,19 +93,24 @@ struct AdminPage<'a> { } #[get("/admin")] -async fn admin_panel(data: web::Data, session: Session) -> actix_web::Result { +async fn admin_panel( + data: web::Data, + session: Session, +) -> actix_web::Result { if let Some(true) = session.get::("admin")? { Ok(AdminPage { cachebuster: data.config.cachebuster.clone(), base_url: data.config.base_url.clone(), stored_files: data.state.read().await.deref(), - }.to_response()) + } + .to_response()) } else { Ok(SignedOutAdminPage { cachebuster: data.config.cachebuster.clone(), base_url: data.config.base_url.clone(), incorrect_password: false, - }.to_response()) + } + .to_response()) } } @@ -115,14 +120,26 @@ struct AdminPanelSignin { } #[post("/admin")] -async fn admin_signin(req: HttpRequest, data: web::Data, form: web::Form, session: Session) -> actix_web::Result { - if Argon2::default().verify_password(form.password.as_bytes(), &data.config.admin_password_hash.password_hash()).is_ok() { +async fn admin_signin( + req: HttpRequest, + data: web::Data, + form: web::Form, + session: Session, +) -> actix_web::Result { + if Argon2::default() + .verify_password( + form.password.as_bytes(), + &data.config.admin_password_hash.password_hash(), + ) + .is_ok() + { session.insert("admin", true)?; Ok(AdminPage { cachebuster: data.config.cachebuster.clone(), base_url: data.config.base_url.clone(), stored_files: data.state.read().await.deref(), - }.to_response()) + } + .to_response()) } else { let ip_addr = get_ip_addr(&req, data.config.reverse_proxy); log_auth_failure(&ip_addr); @@ -130,7 +147,8 @@ async fn admin_signin(req: HttpRequest, data: web::Data, form: web::For cachebuster: data.config.cachebuster.clone(), base_url: data.config.base_url.clone(), incorrect_password: true, - }.to_response(); + } + .to_response(); *resp.status_mut() = StatusCode::FORBIDDEN; Err(InternalError::from_response("Incorrect password", resp).into()) @@ -375,12 +393,13 @@ async fn main() -> std::io::Result<()> { let admin_password_hash: PasswordHashString = env_or_panic("TRANSBEAM_ADMIN_PASSWORD_HASH"); let cookie_secret_base64: String = env_or_panic("TRANSBEAM_COOKIE_SECRET"); - let cookie_key = cookie::Key::from( - &base64::decode(&cookie_secret_base64) - .unwrap_or_else( - |_| panic!("Value {} for TRANSBEAM_COOKIE_SECRET is not valid base64", cookie_secret_base64) + let cookie_key = + cookie::Key::from(&base64::decode(&cookie_secret_base64).unwrap_or_else(|_| { + panic!( + "Value {} for TRANSBEAM_COOKIE_SECRET is not valid base64", + cookie_secret_base64 ) - ); + })); let state_file: PathBuf = match std::env::var("TRANSBEAM_STATE_FILE") { Ok(v) => v @@ -424,7 +443,10 @@ async fn main() -> std::io::Result<()> { } else { Logger::default() }) - .wrap(SessionMiddleware::new(CookieSessionStore::default(), cookie_key.clone())) + .wrap(SessionMiddleware::new( + CookieSessionStore::default(), + cookie_key.clone(), + )) .service(index) .service(handle_download) .service(download_info) diff --git a/src/state.rs b/src/state.rs index 352971a..b45813f 100644 --- a/src/state.rs +++ b/src/state.rs @@ -13,12 +13,11 @@ pub mod prelude { fn size(&self) -> u64; fn formatted_size(&self) -> String { - bytesize::to_string(self.size(), false).replace(" ", "") + bytesize::to_string(self.size(), false).replace(' ', "") } } } - mod v0; pub mod v1 { @@ -52,7 +51,9 @@ pub mod v1 { } } impl SizedFile for UploadedFile { - fn size(&self) -> u64 { self.size } + fn size(&self) -> u64 { + self.size + } } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -83,7 +84,9 @@ pub mod v1 { pub contents: Option, } impl SizedFile for StoredFile { - fn size(&self) -> u64 { self.size } + fn size(&self) -> u64 { + self.size + } } #[derive(Debug, Clone, Deserialize, Serialize)] diff --git a/src/store.rs b/src/store.rs index 1bd611e..5cde34c 100644 --- a/src/store.rs +++ b/src/store.rs @@ -36,7 +36,7 @@ async fn is_valid_entry(key: &str, info: &StoredFile, storage_dir: &Path) -> boo return false; } - let file = if let Ok(f) = File::open(storage_dir.join(&key)).await { + let file = if let Ok(f) = File::open(storage_dir.join(key)).await { f } else { error!( @@ -108,7 +108,11 @@ impl crate::AppData { /// Attempts to add a file to the store. Returns an I/O error if /// something's broken, or a u64 of the maximum allowed file size /// if the file was too big, or a unit if everything worked. - pub async fn add_file(&self, key: String, entry: StoredFileWithPassword) -> Result<(), FileAddError> { + pub async fn add_file( + &self, + key: String, + entry: StoredFileWithPassword, + ) -> Result<(), FileAddError> { let mut store = self.state.write().await; if store.full(self.config.max_storage_size) { return Err(FileAddError::Full);