Compare commits

...

2 Commits

Author SHA1 Message Date
xenofem 8b001911f7 clippy and format 2023-09-09 17:04:45 -04:00
xenofem 2e29825a3d no more websocket ack messages, just use lower-level backpressure 2023-09-09 17:03:35 -04:00
8 changed files with 127 additions and 101 deletions

2
Cargo.lock generated
View File

@ -1778,7 +1778,7 @@ dependencies = [
[[package]]
name = "transbeam"
version = "0.3.0"
version = "0.4.0"
dependencies = [
"actix",
"actix-files",

View File

@ -1,6 +1,6 @@
[package]
name = "transbeam"
version = "0.3.0"
version = "0.4.0"
authors = ["xenofem <xenofem@xeno.science>"]
edition = "2021"
license = "MIT"

View File

@ -51,10 +51,6 @@ async def send(paths, host, password, lifetime, collection_name=None):
loader = file_loader([(paths[i], fileMetadata[i]["size"]) for i in range(len(paths))])
for data in loader:
await ws.send(data)
resp = await ws.recv()
if resp != "ack":
tqdm.write("unexpected response: {}".format(resp))
exit(1)
parser = argparse.ArgumentParser(description="Upload files to transbeam")
parser.add_argument("-l", "--lifetime", type=int, default=7, help="Lifetime in days for files (default 7)")

View File

@ -4,13 +4,13 @@ mod store;
mod upload;
mod zip;
use std::{fmt::Debug, path::PathBuf, str::FromStr, ops::Deref};
use std::{fmt::Debug, ops::Deref, path::PathBuf, str::FromStr};
use actix_http::StatusCode;
use actix_session::{SessionMiddleware, storage::CookieSessionStore, Session};
use actix_session::{storage::CookieSessionStore, Session, SessionMiddleware};
use actix_web::{
error::InternalError, get, middleware::Logger, post, web, App, HttpRequest, HttpResponse,
HttpServer, Responder, cookie,
cookie, error::InternalError, get, middleware::Logger, post, web, App, HttpRequest,
HttpResponse, HttpServer, Responder,
};
use actix_web_actors::ws;
use argon2::{Argon2, PasswordVerifier};
@ -19,7 +19,7 @@ use bytesize::ByteSize;
use log::{error, warn};
use password_hash::PasswordHashString;
use serde::{Deserialize, Serialize};
use state::{StateDb, prelude::SizedFile};
use state::{prelude::SizedFile, StateDb};
use store::{StoredFile, StoredFiles};
use tokio::fs::File;
@ -93,19 +93,24 @@ struct AdminPage<'a> {
}
#[get("/admin")]
async fn admin_panel(data: web::Data<AppData>, session: Session) -> actix_web::Result<HttpResponse> {
async fn admin_panel(
data: web::Data<AppData>,
session: Session,
) -> actix_web::Result<HttpResponse> {
if let Some(true) = session.get::<bool>("admin")? {
Ok(AdminPage {
cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(),
stored_files: data.state.read().await.deref(),
}.to_response())
}
.to_response())
} else {
Ok(SignedOutAdminPage {
cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(),
incorrect_password: false,
}.to_response())
}
.to_response())
}
}
@ -115,14 +120,26 @@ struct AdminPanelSignin {
}
#[post("/admin")]
async fn admin_signin(req: HttpRequest, data: web::Data<AppData>, form: web::Form<AdminPanelSignin>, session: Session) -> actix_web::Result<HttpResponse> {
if Argon2::default().verify_password(form.password.as_bytes(), &data.config.admin_password_hash.password_hash()).is_ok() {
async fn admin_signin(
req: HttpRequest,
data: web::Data<AppData>,
form: web::Form<AdminPanelSignin>,
session: Session,
) -> actix_web::Result<HttpResponse> {
if Argon2::default()
.verify_password(
form.password.as_bytes(),
&data.config.admin_password_hash.password_hash(),
)
.is_ok()
{
session.insert("admin", true)?;
Ok(AdminPage {
cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(),
stored_files: data.state.read().await.deref(),
}.to_response())
}
.to_response())
} else {
let ip_addr = get_ip_addr(&req, data.config.reverse_proxy);
log_auth_failure(&ip_addr);
@ -130,7 +147,8 @@ async fn admin_signin(req: HttpRequest, data: web::Data<AppData>, form: web::For
cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(),
incorrect_password: true,
}.to_response();
}
.to_response();
*resp.status_mut() = StatusCode::FORBIDDEN;
Err(InternalError::from_response("Incorrect password", resp).into())
@ -375,12 +393,13 @@ async fn main() -> std::io::Result<()> {
let admin_password_hash: PasswordHashString = env_or_panic("TRANSBEAM_ADMIN_PASSWORD_HASH");
let cookie_secret_base64: String = env_or_panic("TRANSBEAM_COOKIE_SECRET");
let cookie_key = cookie::Key::from(
&base64::decode(&cookie_secret_base64)
.unwrap_or_else(
|_| panic!("Value {} for TRANSBEAM_COOKIE_SECRET is not valid base64", cookie_secret_base64)
let cookie_key =
cookie::Key::from(&base64::decode(&cookie_secret_base64).unwrap_or_else(|_| {
panic!(
"Value {} for TRANSBEAM_COOKIE_SECRET is not valid base64",
cookie_secret_base64
)
);
}));
let state_file: PathBuf = match std::env::var("TRANSBEAM_STATE_FILE") {
Ok(v) => v
@ -424,7 +443,10 @@ async fn main() -> std::io::Result<()> {
} else {
Logger::default()
})
.wrap(SessionMiddleware::new(CookieSessionStore::default(), cookie_key.clone()))
.wrap(SessionMiddleware::new(
CookieSessionStore::default(),
cookie_key.clone(),
))
.service(index)
.service(handle_download)
.service(download_info)

View File

@ -13,12 +13,11 @@ pub mod prelude {
fn size(&self) -> u64;
fn formatted_size(&self) -> String {
bytesize::to_string(self.size(), false).replace(" ", "")
bytesize::to_string(self.size(), false).replace(' ', "")
}
}
}
mod v0;
pub mod v1 {
@ -52,7 +51,9 @@ pub mod v1 {
}
}
impl SizedFile for UploadedFile {
fn size(&self) -> u64 { self.size }
fn size(&self) -> u64 {
self.size
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
@ -83,7 +84,9 @@ pub mod v1 {
pub contents: Option<FileSet>,
}
impl SizedFile for StoredFile {
fn size(&self) -> u64 { self.size }
fn size(&self) -> u64 {
self.size
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]

View File

@ -36,7 +36,7 @@ async fn is_valid_entry(key: &str, info: &StoredFile, storage_dir: &Path) -> boo
return false;
}
let file = if let Ok(f) = File::open(storage_dir.join(&key)).await {
let file = if let Ok(f) = File::open(storage_dir.join(key)).await {
f
} else {
error!(
@ -108,7 +108,11 @@ impl crate::AppData {
/// Attempts to add a file to the store. Returns an I/O error if
/// something's broken, or a u64 of the maximum allowed file size
/// if the file was too big, or a unit if everything worked.
pub async fn add_file(&self, key: String, entry: StoredFileWithPassword) -> Result<(), FileAddError> {
pub async fn add_file(
&self,
key: String,
entry: StoredFileWithPassword,
) -> Result<(), FileAddError> {
let mut store = self.state.write().await;
if store.full(self.config.max_storage_size) {
return Err(FileAddError::Full);

View File

@ -211,10 +211,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
}
}
fn ack(ctx: &mut Context) {
ctx.text("ack");
}
impl Uploader {
fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) {
error!("{}", e);
@ -350,13 +346,10 @@ impl Uploader {
}),
);
}
ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => {
let result = self.handle_data(data)?;
ack(ctx);
return Ok(result);
}
ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data)) => {
ws::Message::Binary(data)
| ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data))
| ws::Message::Continuation(Item::Last(data)) => {
return self.handle_data(data);
}
ws::Message::Close(reason) => {

View File

@ -2,6 +2,8 @@ const FILE_CHUNK_SIZE = 16384;
const MAX_FILES = 256;
const SAMPLE_WINDOW = 100;
const STALL_THRESHOLD = 1000;
const MAX_WS_BUFFER = 1048576;
const WS_BUFFER_DELAY = 10;
let files = [];
@ -204,47 +206,42 @@ function sendManifest() {
}
function handleMessage(msg) {
if (bytesSent === 0) {
let reply;
try {
reply = JSON.parse(msg.data);
} catch (error) {
socket.close();
displayError('Received an invalid response from the server');
console.error(error);
return;
}
if (reply.type === 'ready') {
downloadCode.textContent = reply.code;
updateProgress();
document.body.className = 'uploading';
sendData();
return;
}
if (bytesSent > 0) {
console.warn('Received unexpected message from server during upload', msg.data);
return;
}
// we're going to display a more useful error message
socket.removeEventListener('close', handleClose);
let reply;
try {
reply = JSON.parse(msg.data);
} catch (error) {
socket.close();
if (reply.type === 'too_big') {
maxSize = reply.max_size;
updateFiles();
} else if (reply.type === 'too_long') {
updateMaxLifetime(reply.max_lifetime);
displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`);
} else if (reply.type === 'incorrect_password') {
messageBox.textContent = ('Incorrect password');
document.body.className = 'error landing';
} else if (reply.type === 'error') {
displayError(reply.details);
}
} else {
if (msg.data === 'ack') {
sendData();
} else {
console.error('Received unexpected message from server instead of ack', msg.data);
displayError();
socket.close();
}
displayError('Received an invalid response from the server');
console.error(error);
return;
}
if (reply.type === 'ready') {
downloadCode.textContent = reply.code;
updateProgress();
document.body.className = 'uploading';
sendData();
return;
}
// we're going to display a more useful error message
socket.removeEventListener('close', handleClose);
socket.close();
if (reply.type === 'too_big') {
maxSize = reply.max_size;
updateFiles();
} else if (reply.type === 'too_long') {
updateMaxLifetime(reply.max_lifetime);
displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`);
} else if (reply.type === 'incorrect_password') {
messageBox.textContent = ('Incorrect password');
document.body.className = 'error landing';
} else if (reply.type === 'error') {
displayError(reply.details);
}
}
@ -262,32 +259,43 @@ function updateMaxLifetime(lifetime) {
}
function sendData() {
if (fileIndex >= files.length) {
finishSending();
if (socket.readyState !== 1) {
return;
}
const currentFile = files[fileIndex];
if (byteIndex < currentFile.size) {
const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size);
const data = currentFile.slice(byteIndex, endpoint);
socket.send(data);
byteIndex = endpoint;
bytesSent += data.size;
// It's ok if the monotonically increasing fields like
// percentage are updating super quickly, but it's awkward for
// rate and ETA
const now = Date.now() / 1000;
if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) {
timestamps.push([now, bytesSent]);
if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); }
while (true) {
if (fileIndex >= files.length) {
finishSending();
return;
}
updateProgress();
} else {
fileIndex += 1;
byteIndex = 0;
sendData();
if (socket.bufferedAmount >= MAX_WS_BUFFER) {
setTimeout(sendData, WS_BUFFER_DELAY);
return;
}
const currentFile = files[fileIndex];
if (byteIndex < currentFile.size) {
const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size);
const data = currentFile.slice(byteIndex, endpoint);
socket.send(data);
byteIndex = endpoint;
bytesSent += data.size;
// It's ok if the monotonically increasing fields like
// percentage are updating super quickly, but it's awkward for
// rate and ETA
const now = Date.now() / 1000;
if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) {
timestamps.push([now, bytesSent]);
if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); }
}
updateProgress();
} else {
fileIndex += 1;
byteIndex = 0;
}
}
}