diff --git a/Cargo.lock b/Cargo.lock index 9e6ead0..f87259c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1778,7 +1778,7 @@ dependencies = [ [[package]] name = "transbeam" -version = "0.3.0" +version = "0.4.0" dependencies = [ "actix", "actix-files", diff --git a/Cargo.toml b/Cargo.toml index 58d0229..ee3496f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "transbeam" -version = "0.3.0" +version = "0.4.0" authors = ["xenofem "] edition = "2021" license = "MIT" diff --git a/cli/transbeam-cli b/cli/transbeam-cli index 7299398..331a725 100755 --- a/cli/transbeam-cli +++ b/cli/transbeam-cli @@ -51,10 +51,6 @@ async def send(paths, host, password, lifetime, collection_name=None): loader = file_loader([(paths[i], fileMetadata[i]["size"]) for i in range(len(paths))]) for data in loader: await ws.send(data) - resp = await ws.recv() - if resp != "ack": - tqdm.write("unexpected response: {}".format(resp)) - exit(1) parser = argparse.ArgumentParser(description="Upload files to transbeam") parser.add_argument("-l", "--lifetime", type=int, default=7, help="Lifetime in days for files (default 7)") diff --git a/src/upload.rs b/src/upload.rs index 94f80b8..4f59490 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -211,10 +211,6 @@ impl StreamHandler> for Uploader { } } -fn ack(ctx: &mut Context) { - ctx.text("ack"); -} - impl Uploader { fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) { error!("{}", e); @@ -350,13 +346,10 @@ impl Uploader { }), ); } - ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => { - let result = self.handle_data(data)?; - ack(ctx); - return Ok(result); - } - ws::Message::Continuation(Item::FirstBinary(data)) - | ws::Message::Continuation(Item::Continue(data)) => { + ws::Message::Binary(data) + | ws::Message::Continuation(Item::FirstBinary(data)) + | ws::Message::Continuation(Item::Continue(data)) + | ws::Message::Continuation(Item::Last(data)) => { return self.handle_data(data); } ws::Message::Close(reason) => { diff --git a/static/js/upload.js b/static/js/upload.js index debf733..9ec6e79 100644 --- a/static/js/upload.js +++ b/static/js/upload.js @@ -2,6 +2,8 @@ const FILE_CHUNK_SIZE = 16384; const MAX_FILES = 256; const SAMPLE_WINDOW = 100; const STALL_THRESHOLD = 1000; +const MAX_WS_BUFFER = 1048576; +const WS_BUFFER_DELAY = 10; let files = []; @@ -204,47 +206,42 @@ function sendManifest() { } function handleMessage(msg) { - if (bytesSent === 0) { - let reply; - try { - reply = JSON.parse(msg.data); - } catch (error) { - socket.close(); - displayError('Received an invalid response from the server'); - console.error(error); - return; - } - if (reply.type === 'ready') { - downloadCode.textContent = reply.code; - updateProgress(); - document.body.className = 'uploading'; - sendData(); - return; - } + if (bytesSent > 0) { + console.warn('Received unexpected message from server during upload', msg.data); + return; + } - // we're going to display a more useful error message - socket.removeEventListener('close', handleClose); + let reply; + try { + reply = JSON.parse(msg.data); + } catch (error) { socket.close(); - if (reply.type === 'too_big') { - maxSize = reply.max_size; - updateFiles(); - } else if (reply.type === 'too_long') { - updateMaxLifetime(reply.max_lifetime); - displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`); - } else if (reply.type === 'incorrect_password') { - messageBox.textContent = ('Incorrect password'); - document.body.className = 'error landing'; - } else if (reply.type === 'error') { - displayError(reply.details); - } - } else { - if (msg.data === 'ack') { - sendData(); - } else { - console.error('Received unexpected message from server instead of ack', msg.data); - displayError(); - socket.close(); - } + displayError('Received an invalid response from the server'); + console.error(error); + return; + } + if (reply.type === 'ready') { + downloadCode.textContent = reply.code; + updateProgress(); + document.body.className = 'uploading'; + sendData(); + return; + } + + // we're going to display a more useful error message + socket.removeEventListener('close', handleClose); + socket.close(); + if (reply.type === 'too_big') { + maxSize = reply.max_size; + updateFiles(); + } else if (reply.type === 'too_long') { + updateMaxLifetime(reply.max_lifetime); + displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`); + } else if (reply.type === 'incorrect_password') { + messageBox.textContent = ('Incorrect password'); + document.body.className = 'error landing'; + } else if (reply.type === 'error') { + displayError(reply.details); } } @@ -262,32 +259,43 @@ function updateMaxLifetime(lifetime) { } function sendData() { - if (fileIndex >= files.length) { - finishSending(); + if (socket.readyState !== 1) { return; } - const currentFile = files[fileIndex]; - if (byteIndex < currentFile.size) { - const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size); - const data = currentFile.slice(byteIndex, endpoint); - socket.send(data); - byteIndex = endpoint; - bytesSent += data.size; - // It's ok if the monotonically increasing fields like - // percentage are updating super quickly, but it's awkward for - // rate and ETA - const now = Date.now() / 1000; - if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) { - timestamps.push([now, bytesSent]); - if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); } + while (true) { + if (fileIndex >= files.length) { + finishSending(); + return; } - updateProgress(); - } else { - fileIndex += 1; - byteIndex = 0; - sendData(); + if (socket.bufferedAmount >= MAX_WS_BUFFER) { + setTimeout(sendData, WS_BUFFER_DELAY); + return; + } + + const currentFile = files[fileIndex]; + if (byteIndex < currentFile.size) { + const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size); + const data = currentFile.slice(byteIndex, endpoint); + socket.send(data); + byteIndex = endpoint; + bytesSent += data.size; + + // It's ok if the monotonically increasing fields like + // percentage are updating super quickly, but it's awkward for + // rate and ETA + const now = Date.now() / 1000; + if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) { + timestamps.push([now, bytesSent]); + if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); } + } + + updateProgress(); + } else { + fileIndex += 1; + byteIndex = 0; + } } }