no more websocket ack messages, just use lower-level backpressure

main
xenofem 2023-09-09 17:03:35 -04:00
parent 1b38a2ee30
commit 2e29825a3d
5 changed files with 74 additions and 77 deletions

2
Cargo.lock generated
View File

@ -1778,7 +1778,7 @@ dependencies = [
[[package]]
name = "transbeam"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"actix",
"actix-files",

View File

@ -1,6 +1,6 @@
[package]
name = "transbeam"
version = "0.3.0"
version = "0.4.0"
authors = ["xenofem <xenofem@xeno.science>"]
edition = "2021"
license = "MIT"

View File

@ -51,10 +51,6 @@ async def send(paths, host, password, lifetime, collection_name=None):
loader = file_loader([(paths[i], fileMetadata[i]["size"]) for i in range(len(paths))])
for data in loader:
await ws.send(data)
resp = await ws.recv()
if resp != "ack":
tqdm.write("unexpected response: {}".format(resp))
exit(1)
parser = argparse.ArgumentParser(description="Upload files to transbeam")
parser.add_argument("-l", "--lifetime", type=int, default=7, help="Lifetime in days for files (default 7)")

View File

@ -211,10 +211,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
}
}
fn ack(ctx: &mut Context) {
ctx.text("ack");
}
impl Uploader {
fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) {
error!("{}", e);
@ -350,13 +346,10 @@ impl Uploader {
}),
);
}
ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => {
let result = self.handle_data(data)?;
ack(ctx);
return Ok(result);
}
ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data)) => {
ws::Message::Binary(data)
| ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data))
| ws::Message::Continuation(Item::Last(data)) => {
return self.handle_data(data);
}
ws::Message::Close(reason) => {

View File

@ -2,6 +2,8 @@ const FILE_CHUNK_SIZE = 16384;
const MAX_FILES = 256;
const SAMPLE_WINDOW = 100;
const STALL_THRESHOLD = 1000;
const MAX_WS_BUFFER = 1048576;
const WS_BUFFER_DELAY = 10;
let files = [];
@ -204,47 +206,42 @@ function sendManifest() {
}
function handleMessage(msg) {
if (bytesSent === 0) {
let reply;
try {
reply = JSON.parse(msg.data);
} catch (error) {
socket.close();
displayError('Received an invalid response from the server');
console.error(error);
return;
}
if (reply.type === 'ready') {
downloadCode.textContent = reply.code;
updateProgress();
document.body.className = 'uploading';
sendData();
return;
}
if (bytesSent > 0) {
console.warn('Received unexpected message from server during upload', msg.data);
return;
}
// we're going to display a more useful error message
socket.removeEventListener('close', handleClose);
let reply;
try {
reply = JSON.parse(msg.data);
} catch (error) {
socket.close();
if (reply.type === 'too_big') {
maxSize = reply.max_size;
updateFiles();
} else if (reply.type === 'too_long') {
updateMaxLifetime(reply.max_lifetime);
displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`);
} else if (reply.type === 'incorrect_password') {
messageBox.textContent = ('Incorrect password');
document.body.className = 'error landing';
} else if (reply.type === 'error') {
displayError(reply.details);
}
} else {
if (msg.data === 'ack') {
sendData();
} else {
console.error('Received unexpected message from server instead of ack', msg.data);
displayError();
socket.close();
}
displayError('Received an invalid response from the server');
console.error(error);
return;
}
if (reply.type === 'ready') {
downloadCode.textContent = reply.code;
updateProgress();
document.body.className = 'uploading';
sendData();
return;
}
// we're going to display a more useful error message
socket.removeEventListener('close', handleClose);
socket.close();
if (reply.type === 'too_big') {
maxSize = reply.max_size;
updateFiles();
} else if (reply.type === 'too_long') {
updateMaxLifetime(reply.max_lifetime);
displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`);
} else if (reply.type === 'incorrect_password') {
messageBox.textContent = ('Incorrect password');
document.body.className = 'error landing';
} else if (reply.type === 'error') {
displayError(reply.details);
}
}
@ -262,32 +259,43 @@ function updateMaxLifetime(lifetime) {
}
function sendData() {
if (fileIndex >= files.length) {
finishSending();
if (socket.readyState !== 1) {
return;
}
const currentFile = files[fileIndex];
if (byteIndex < currentFile.size) {
const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size);
const data = currentFile.slice(byteIndex, endpoint);
socket.send(data);
byteIndex = endpoint;
bytesSent += data.size;
// It's ok if the monotonically increasing fields like
// percentage are updating super quickly, but it's awkward for
// rate and ETA
const now = Date.now() / 1000;
if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) {
timestamps.push([now, bytesSent]);
if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); }
while (true) {
if (fileIndex >= files.length) {
finishSending();
return;
}
updateProgress();
} else {
fileIndex += 1;
byteIndex = 0;
sendData();
if (socket.bufferedAmount >= MAX_WS_BUFFER) {
setTimeout(sendData, WS_BUFFER_DELAY);
return;
}
const currentFile = files[fileIndex];
if (byteIndex < currentFile.size) {
const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size);
const data = currentFile.slice(byteIndex, endpoint);
socket.send(data);
byteIndex = endpoint;
bytesSent += data.size;
// It's ok if the monotonically increasing fields like
// percentage are updating super quickly, but it's awkward for
// rate and ETA
const now = Date.now() / 1000;
if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) {
timestamps.push([now, bytesSent]);
if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); }
}
updateProgress();
} else {
fileIndex += 1;
byteIndex = 0;
}
}
}