Compare commits

..

2 commits

8 changed files with 127 additions and 101 deletions

2
Cargo.lock generated
View file

@ -1778,7 +1778,7 @@ dependencies = [
[[package]] [[package]]
name = "transbeam" name = "transbeam"
version = "0.3.0" version = "0.4.0"
dependencies = [ dependencies = [
"actix", "actix",
"actix-files", "actix-files",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "transbeam" name = "transbeam"
version = "0.3.0" version = "0.4.0"
authors = ["xenofem <xenofem@xeno.science>"] authors = ["xenofem <xenofem@xeno.science>"]
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"

View file

@ -51,10 +51,6 @@ async def send(paths, host, password, lifetime, collection_name=None):
loader = file_loader([(paths[i], fileMetadata[i]["size"]) for i in range(len(paths))]) loader = file_loader([(paths[i], fileMetadata[i]["size"]) for i in range(len(paths))])
for data in loader: for data in loader:
await ws.send(data) await ws.send(data)
resp = await ws.recv()
if resp != "ack":
tqdm.write("unexpected response: {}".format(resp))
exit(1)
parser = argparse.ArgumentParser(description="Upload files to transbeam") parser = argparse.ArgumentParser(description="Upload files to transbeam")
parser.add_argument("-l", "--lifetime", type=int, default=7, help="Lifetime in days for files (default 7)") parser.add_argument("-l", "--lifetime", type=int, default=7, help="Lifetime in days for files (default 7)")

View file

@ -4,13 +4,13 @@ mod store;
mod upload; mod upload;
mod zip; mod zip;
use std::{fmt::Debug, path::PathBuf, str::FromStr, ops::Deref}; use std::{fmt::Debug, ops::Deref, path::PathBuf, str::FromStr};
use actix_http::StatusCode; use actix_http::StatusCode;
use actix_session::{SessionMiddleware, storage::CookieSessionStore, Session}; use actix_session::{storage::CookieSessionStore, Session, SessionMiddleware};
use actix_web::{ use actix_web::{
error::InternalError, get, middleware::Logger, post, web, App, HttpRequest, HttpResponse, cookie, error::InternalError, get, middleware::Logger, post, web, App, HttpRequest,
HttpServer, Responder, cookie, HttpResponse, HttpServer, Responder,
}; };
use actix_web_actors::ws; use actix_web_actors::ws;
use argon2::{Argon2, PasswordVerifier}; use argon2::{Argon2, PasswordVerifier};
@ -19,7 +19,7 @@ use bytesize::ByteSize;
use log::{error, warn}; use log::{error, warn};
use password_hash::PasswordHashString; use password_hash::PasswordHashString;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use state::{StateDb, prelude::SizedFile}; use state::{prelude::SizedFile, StateDb};
use store::{StoredFile, StoredFiles}; use store::{StoredFile, StoredFiles};
use tokio::fs::File; use tokio::fs::File;
@ -93,19 +93,24 @@ struct AdminPage<'a> {
} }
#[get("/admin")] #[get("/admin")]
async fn admin_panel(data: web::Data<AppData>, session: Session) -> actix_web::Result<HttpResponse> { async fn admin_panel(
data: web::Data<AppData>,
session: Session,
) -> actix_web::Result<HttpResponse> {
if let Some(true) = session.get::<bool>("admin")? { if let Some(true) = session.get::<bool>("admin")? {
Ok(AdminPage { Ok(AdminPage {
cachebuster: data.config.cachebuster.clone(), cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(), base_url: data.config.base_url.clone(),
stored_files: data.state.read().await.deref(), stored_files: data.state.read().await.deref(),
}.to_response()) }
.to_response())
} else { } else {
Ok(SignedOutAdminPage { Ok(SignedOutAdminPage {
cachebuster: data.config.cachebuster.clone(), cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(), base_url: data.config.base_url.clone(),
incorrect_password: false, incorrect_password: false,
}.to_response()) }
.to_response())
} }
} }
@ -115,14 +120,26 @@ struct AdminPanelSignin {
} }
#[post("/admin")] #[post("/admin")]
async fn admin_signin(req: HttpRequest, data: web::Data<AppData>, form: web::Form<AdminPanelSignin>, session: Session) -> actix_web::Result<HttpResponse> { async fn admin_signin(
if Argon2::default().verify_password(form.password.as_bytes(), &data.config.admin_password_hash.password_hash()).is_ok() { req: HttpRequest,
data: web::Data<AppData>,
form: web::Form<AdminPanelSignin>,
session: Session,
) -> actix_web::Result<HttpResponse> {
if Argon2::default()
.verify_password(
form.password.as_bytes(),
&data.config.admin_password_hash.password_hash(),
)
.is_ok()
{
session.insert("admin", true)?; session.insert("admin", true)?;
Ok(AdminPage { Ok(AdminPage {
cachebuster: data.config.cachebuster.clone(), cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(), base_url: data.config.base_url.clone(),
stored_files: data.state.read().await.deref(), stored_files: data.state.read().await.deref(),
}.to_response()) }
.to_response())
} else { } else {
let ip_addr = get_ip_addr(&req, data.config.reverse_proxy); let ip_addr = get_ip_addr(&req, data.config.reverse_proxy);
log_auth_failure(&ip_addr); log_auth_failure(&ip_addr);
@ -130,7 +147,8 @@ async fn admin_signin(req: HttpRequest, data: web::Data<AppData>, form: web::For
cachebuster: data.config.cachebuster.clone(), cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(), base_url: data.config.base_url.clone(),
incorrect_password: true, incorrect_password: true,
}.to_response(); }
.to_response();
*resp.status_mut() = StatusCode::FORBIDDEN; *resp.status_mut() = StatusCode::FORBIDDEN;
Err(InternalError::from_response("Incorrect password", resp).into()) Err(InternalError::from_response("Incorrect password", resp).into())
@ -375,12 +393,13 @@ async fn main() -> std::io::Result<()> {
let admin_password_hash: PasswordHashString = env_or_panic("TRANSBEAM_ADMIN_PASSWORD_HASH"); let admin_password_hash: PasswordHashString = env_or_panic("TRANSBEAM_ADMIN_PASSWORD_HASH");
let cookie_secret_base64: String = env_or_panic("TRANSBEAM_COOKIE_SECRET"); let cookie_secret_base64: String = env_or_panic("TRANSBEAM_COOKIE_SECRET");
let cookie_key = cookie::Key::from( let cookie_key =
&base64::decode(&cookie_secret_base64) cookie::Key::from(&base64::decode(&cookie_secret_base64).unwrap_or_else(|_| {
.unwrap_or_else( panic!(
|_| panic!("Value {} for TRANSBEAM_COOKIE_SECRET is not valid base64", cookie_secret_base64) "Value {} for TRANSBEAM_COOKIE_SECRET is not valid base64",
cookie_secret_base64
) )
); }));
let state_file: PathBuf = match std::env::var("TRANSBEAM_STATE_FILE") { let state_file: PathBuf = match std::env::var("TRANSBEAM_STATE_FILE") {
Ok(v) => v Ok(v) => v
@ -424,7 +443,10 @@ async fn main() -> std::io::Result<()> {
} else { } else {
Logger::default() Logger::default()
}) })
.wrap(SessionMiddleware::new(CookieSessionStore::default(), cookie_key.clone())) .wrap(SessionMiddleware::new(
CookieSessionStore::default(),
cookie_key.clone(),
))
.service(index) .service(index)
.service(handle_download) .service(handle_download)
.service(download_info) .service(download_info)

View file

@ -13,12 +13,11 @@ pub mod prelude {
fn size(&self) -> u64; fn size(&self) -> u64;
fn formatted_size(&self) -> String { fn formatted_size(&self) -> String {
bytesize::to_string(self.size(), false).replace(" ", "") bytesize::to_string(self.size(), false).replace(' ', "")
} }
} }
} }
mod v0; mod v0;
pub mod v1 { pub mod v1 {
@ -52,7 +51,9 @@ pub mod v1 {
} }
} }
impl SizedFile for UploadedFile { impl SizedFile for UploadedFile {
fn size(&self) -> u64 { self.size } fn size(&self) -> u64 {
self.size
}
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
@ -83,7 +84,9 @@ pub mod v1 {
pub contents: Option<FileSet>, pub contents: Option<FileSet>,
} }
impl SizedFile for StoredFile { impl SizedFile for StoredFile {
fn size(&self) -> u64 { self.size } fn size(&self) -> u64 {
self.size
}
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]

View file

@ -36,7 +36,7 @@ async fn is_valid_entry(key: &str, info: &StoredFile, storage_dir: &Path) -> boo
return false; return false;
} }
let file = if let Ok(f) = File::open(storage_dir.join(&key)).await { let file = if let Ok(f) = File::open(storage_dir.join(key)).await {
f f
} else { } else {
error!( error!(
@ -108,7 +108,11 @@ impl crate::AppData {
/// Attempts to add a file to the store. Returns an I/O error if /// Attempts to add a file to the store. Returns an I/O error if
/// something's broken, or a u64 of the maximum allowed file size /// something's broken, or a u64 of the maximum allowed file size
/// if the file was too big, or a unit if everything worked. /// if the file was too big, or a unit if everything worked.
pub async fn add_file(&self, key: String, entry: StoredFileWithPassword) -> Result<(), FileAddError> { pub async fn add_file(
&self,
key: String,
entry: StoredFileWithPassword,
) -> Result<(), FileAddError> {
let mut store = self.state.write().await; let mut store = self.state.write().await;
if store.full(self.config.max_storage_size) { if store.full(self.config.max_storage_size) {
return Err(FileAddError::Full); return Err(FileAddError::Full);

View file

@ -211,10 +211,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
} }
} }
fn ack(ctx: &mut Context) {
ctx.text("ack");
}
impl Uploader { impl Uploader {
fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) { fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) {
error!("{}", e); error!("{}", e);
@ -350,13 +346,10 @@ impl Uploader {
}), }),
); );
} }
ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => { ws::Message::Binary(data)
let result = self.handle_data(data)?; | ws::Message::Continuation(Item::FirstBinary(data))
ack(ctx); | ws::Message::Continuation(Item::Continue(data))
return Ok(result); | ws::Message::Continuation(Item::Last(data)) => {
}
ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data)) => {
return self.handle_data(data); return self.handle_data(data);
} }
ws::Message::Close(reason) => { ws::Message::Close(reason) => {

View file

@ -2,6 +2,8 @@ const FILE_CHUNK_SIZE = 16384;
const MAX_FILES = 256; const MAX_FILES = 256;
const SAMPLE_WINDOW = 100; const SAMPLE_WINDOW = 100;
const STALL_THRESHOLD = 1000; const STALL_THRESHOLD = 1000;
const MAX_WS_BUFFER = 1048576;
const WS_BUFFER_DELAY = 10;
let files = []; let files = [];
@ -204,47 +206,42 @@ function sendManifest() {
} }
function handleMessage(msg) { function handleMessage(msg) {
if (bytesSent === 0) { if (bytesSent > 0) {
let reply; console.warn('Received unexpected message from server during upload', msg.data);
try { return;
reply = JSON.parse(msg.data); }
} catch (error) {
socket.close();
displayError('Received an invalid response from the server');
console.error(error);
return;
}
if (reply.type === 'ready') {
downloadCode.textContent = reply.code;
updateProgress();
document.body.className = 'uploading';
sendData();
return;
}
// we're going to display a more useful error message let reply;
socket.removeEventListener('close', handleClose); try {
reply = JSON.parse(msg.data);
} catch (error) {
socket.close(); socket.close();
if (reply.type === 'too_big') { displayError('Received an invalid response from the server');
maxSize = reply.max_size; console.error(error);
updateFiles(); return;
} else if (reply.type === 'too_long') { }
updateMaxLifetime(reply.max_lifetime); if (reply.type === 'ready') {
displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`); downloadCode.textContent = reply.code;
} else if (reply.type === 'incorrect_password') { updateProgress();
messageBox.textContent = ('Incorrect password'); document.body.className = 'uploading';
document.body.className = 'error landing'; sendData();
} else if (reply.type === 'error') { return;
displayError(reply.details); }
}
} else { // we're going to display a more useful error message
if (msg.data === 'ack') { socket.removeEventListener('close', handleClose);
sendData(); socket.close();
} else { if (reply.type === 'too_big') {
console.error('Received unexpected message from server instead of ack', msg.data); maxSize = reply.max_size;
displayError(); updateFiles();
socket.close(); } else if (reply.type === 'too_long') {
} updateMaxLifetime(reply.max_lifetime);
displayError(`The maximum retention time for uploads is ${reply.max_lifetime} days`);
} else if (reply.type === 'incorrect_password') {
messageBox.textContent = ('Incorrect password');
document.body.className = 'error landing';
} else if (reply.type === 'error') {
displayError(reply.details);
} }
} }
@ -262,32 +259,43 @@ function updateMaxLifetime(lifetime) {
} }
function sendData() { function sendData() {
if (fileIndex >= files.length) { if (socket.readyState !== 1) {
finishSending();
return; return;
} }
const currentFile = files[fileIndex];
if (byteIndex < currentFile.size) {
const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size);
const data = currentFile.slice(byteIndex, endpoint);
socket.send(data);
byteIndex = endpoint;
bytesSent += data.size;
// It's ok if the monotonically increasing fields like while (true) {
// percentage are updating super quickly, but it's awkward for if (fileIndex >= files.length) {
// rate and ETA finishSending();
const now = Date.now() / 1000; return;
if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) {
timestamps.push([now, bytesSent]);
if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); }
} }
updateProgress(); if (socket.bufferedAmount >= MAX_WS_BUFFER) {
} else { setTimeout(sendData, WS_BUFFER_DELAY);
fileIndex += 1; return;
byteIndex = 0; }
sendData();
const currentFile = files[fileIndex];
if (byteIndex < currentFile.size) {
const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size);
const data = currentFile.slice(byteIndex, endpoint);
socket.send(data);
byteIndex = endpoint;
bytesSent += data.size;
// It's ok if the monotonically increasing fields like
// percentage are updating super quickly, but it's awkward for
// rate and ETA
const now = Date.now() / 1000;
if (timestamps.length === 0 || now - timestamps.at(-1)[0] > 1) {
timestamps.push([now, bytesSent]);
if (timestamps.length > SAMPLE_WINDOW) { timestamps.shift(); }
}
updateProgress();
} else {
fileIndex += 1;
byteIndex = 0;
}
} }
} }