Compare commits

...

2 commits

8 changed files with 127 additions and 101 deletions

2
Cargo.lock generated
View file

@ -1778,7 +1778,7 @@ dependencies = [
[[package]] [[package]]
name = "transbeam" name = "transbeam"
version = "0.3.0" version = "0.4.0"
dependencies = [ dependencies = [
"actix", "actix",
"actix-files", "actix-files",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "transbeam" name = "transbeam"
version = "0.3.0" version = "0.4.0"
authors = ["xenofem <xenofem@xeno.science>"] authors = ["xenofem <xenofem@xeno.science>"]
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"

View file

@ -51,10 +51,6 @@ async def send(paths, host, password, lifetime, collection_name=None):
loader = file_loader([(paths[i], fileMetadata[i]["size"]) for i in range(len(paths))]) loader = file_loader([(paths[i], fileMetadata[i]["size"]) for i in range(len(paths))])
for data in loader: for data in loader:
await ws.send(data) await ws.send(data)
resp = await ws.recv()
if resp != "ack":
tqdm.write("unexpected response: {}".format(resp))
exit(1)
parser = argparse.ArgumentParser(description="Upload files to transbeam") parser = argparse.ArgumentParser(description="Upload files to transbeam")
parser.add_argument("-l", "--lifetime", type=int, default=7, help="Lifetime in days for files (default 7)") parser.add_argument("-l", "--lifetime", type=int, default=7, help="Lifetime in days for files (default 7)")

View file

@ -4,13 +4,13 @@ mod store;
mod upload; mod upload;
mod zip; mod zip;
use std::{fmt::Debug, path::PathBuf, str::FromStr, ops::Deref}; use std::{fmt::Debug, ops::Deref, path::PathBuf, str::FromStr};
use actix_http::StatusCode; use actix_http::StatusCode;
use actix_session::{SessionMiddleware, storage::CookieSessionStore, Session}; use actix_session::{storage::CookieSessionStore, Session, SessionMiddleware};
use actix_web::{ use actix_web::{
error::InternalError, get, middleware::Logger, post, web, App, HttpRequest, HttpResponse, cookie, error::InternalError, get, middleware::Logger, post, web, App, HttpRequest,
HttpServer, Responder, cookie, HttpResponse, HttpServer, Responder,
}; };
use actix_web_actors::ws; use actix_web_actors::ws;
use argon2::{Argon2, PasswordVerifier}; use argon2::{Argon2, PasswordVerifier};
@ -19,7 +19,7 @@ use bytesize::ByteSize;
use log::{error, warn}; use log::{error, warn};
use password_hash::PasswordHashString; use password_hash::PasswordHashString;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use state::{StateDb, prelude::SizedFile}; use state::{prelude::SizedFile, StateDb};
use store::{StoredFile, StoredFiles}; use store::{StoredFile, StoredFiles};
use tokio::fs::File; use tokio::fs::File;
@ -93,19 +93,24 @@ struct AdminPage<'a> {
} }
#[get("/admin")] #[get("/admin")]
async fn admin_panel(data: web::Data<AppData>, session: Session) -> actix_web::Result<HttpResponse> { async fn admin_panel(
data: web::Data<AppData>,
session: Session,
) -> actix_web::Result<HttpResponse> {
if let Some(true) = session.get::<bool>("admin")? { if let Some(true) = session.get::<bool>("admin")? {
Ok(AdminPage { Ok(AdminPage {
cachebuster: data.config.cachebuster.clone(), cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(), base_url: data.config.base_url.clone(),
stored_files: data.state.read().await.deref(), stored_files: data.state.read().await.deref(),
}.to_response()) }
.to_response())
} else { } else {
Ok(SignedOutAdminPage { Ok(SignedOutAdminPage {
cachebuster: data.config.cachebuster.clone(), cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(), base_url: data.config.base_url.clone(),
incorrect_password: false, incorrect_password: false,
}.to_response()) }
.to_response())
} }
} }
@ -115,14 +120,26 @@ struct AdminPanelSignin {
} }
#[post("/admin")] #[post("/admin")]
async fn admin_signin(req: HttpRequest, data: web::Data<AppData>, form: web::Form<AdminPanelSignin>, session: Session) -> actix_web::Result<HttpResponse> { async fn admin_signin(
if Argon2::default().verify_password(form.password.as_bytes(), &data.config.admin_password_hash.password_hash()).is_ok() { req: HttpRequest,
data: web::Data<AppData>,
form: web::Form<AdminPanelSignin>,
session: Session,
) -> actix_web::Result<HttpResponse> {
if Argon2::default()
.verify_password(
form.password.as_bytes(),
&data.config.admin_password_hash.password_hash(),
)
.is_ok()
{
session.insert("admin", true)?; session.insert("admin", true)?;
Ok(AdminPage { Ok(AdminPage {
cachebuster: data.config.cachebuster.clone(), cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(), base_url: data.config.base_url.clone(),
stored_files: data.state.read().await.deref(), stored_files: data.state.read().await.deref(),
}.to_response()) }
.to_response())
} else { } else {
let ip_addr = get_ip_addr(&req, data.config.reverse_proxy); let ip_addr = get_ip_addr(&req, data.config.reverse_proxy);
log_auth_failure(&ip_addr); log_auth_failure(&ip_addr);
@ -130,7 +147,8 @@ async fn admin_signin(req: HttpRequest, data: web::Data<AppData>, form: web::For
cachebuster: data.config.cachebuster.clone(), cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(), base_url: data.config.base_url.clone(),
incorrect_password: true, incorrect_password: true,
}.to_response(); }
.to_response();
*resp.status_mut() = StatusCode::FORBIDDEN; *resp.status_mut() = StatusCode::FORBIDDEN;
Err(InternalError::from_response("Incorrect password", resp).into()) Err(InternalError::from_response("Incorrect password", resp).into())
@ -375,12 +393,13 @@ async fn main() -> std::io::Result<()> {
let admin_password_hash: PasswordHashString = env_or_panic("TRANSBEAM_ADMIN_PASSWORD_HASH"); let admin_password_hash: PasswordHashString = env_or_panic("TRANSBEAM_ADMIN_PASSWORD_HASH");
let cookie_secret_base64: String = env_or_panic("TRANSBEAM_COOKIE_SECRET"); let cookie_secret_base64: String = env_or_panic("TRANSBEAM_COOKIE_SECRET");
let cookie_key = cookie::Key::from( let cookie_key =
&base64::decode(&cookie_secret_base64) cookie::Key::from(&base64::decode(&cookie_secret_base64).unwrap_or_else(|_| {
.unwrap_or_else( panic!(
|_| panic!("Value {} for TRANSBEAM_COOKIE_SECRET is not valid base64", cookie_secret_base64) "Value {} for TRANSBEAM_COOKIE_SECRET is not valid base64",
cookie_secret_base64
) )
); }));
let state_file: PathBuf = match std::env::var("TRANSBEAM_STATE_FILE") { let state_file: PathBuf = match std::env::var("TRANSBEAM_STATE_FILE") {
Ok(v) => v Ok(v) => v
@ -424,7 +443,10 @@ async fn main() -> std::io::Result<()> {
} else { } else {
Logger::default() Logger::default()
}) })
.wrap(SessionMiddleware::new(CookieSessionStore::default(), cookie_key.clone())) .wrap(SessionMiddleware::new(
CookieSessionStore::default(),
cookie_key.clone(),
))
.service(index) .service(index)
.service(handle_download) .service(handle_download)
.service(download_info) .service(download_info)

View file

@ -13,12 +13,11 @@ pub mod prelude {
fn size(&self) -> u64; fn size(&self) -> u64;
fn formatted_size(&self) -> String { fn formatted_size(&self) -> String {
bytesize::to_string(self.size(), false).replace(" ", "") bytesize::to_string(self.size(), false).replace(' ', "")
} }
} }
} }
mod v0; mod v0;
pub mod v1 { pub mod v1 {
@ -52,7 +51,9 @@ pub mod v1 {
} }
} }
impl SizedFile for UploadedFile { impl SizedFile for UploadedFile {
fn size(&self) -> u64 { self.size } fn size(&self) -> u64 {
self.size
}
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
@ -83,7 +84,9 @@ pub mod v1 {
pub contents: Option<FileSet>, pub contents: Option<FileSet>,
} }
impl SizedFile for StoredFile { impl SizedFile for StoredFile {
fn size(&self) -> u64 { self.size } fn size(&self) -> u64 {
self.size
}
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]

View file

@ -36,7 +36,7 @@ async fn is_valid_entry(key: &str, info: &StoredFile, storage_dir: &Path) -> boo
return false; return false;
} }
let file = if let Ok(f) = File::open(storage_dir.join(&key)).await { let file = if let Ok(f) = File::open(storage_dir.join(key)).await {
f f
} else { } else {
error!( error!(
@ -108,7 +108,11 @@ impl crate::AppData {
/// Attempts to add a file to the store. Returns an I/O error if /// Attempts to add a file to the store. Returns an I/O error if
/// something's broken, or a u64 of the maximum allowed file size /// something's broken, or a u64 of the maximum allowed file size
/// if the file was too big, or a unit if everything worked. /// if the file was too big, or a unit if everything worked.
pub async fn add_file(&self, key: String, entry: StoredFileWithPassword) -> Result<(), FileAddError> { pub async fn add_file(
&self,
key: String,
entry: StoredFileWithPassword,
) -> Result<(), FileAddError> {
let mut store = self.state.write().await; let mut store = self.state.write().await;
if store.full(self.config.max_storage_size) { if store.full(self.config.max_storage_size) {
return Err(FileAddError::Full); return Err(FileAddError::Full);

View file

@ -211,10 +211,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
} }
} }
fn ack(ctx: &mut Context) {
ctx.text("ack");
}
impl Uploader { impl Uploader {
fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) { fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) {
error!("{}", e); error!("{}", e);
@ -350,13 +346,10 @@ impl Uploader {
}), }),
); );
} }
ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => { ws::Message::Binary(data)
let result = self.handle_data(data)?; | ws::Message::Continuation(Item::FirstBinary(data))
ack(ctx); | ws::Message::Continuation(Item::Continue(data))
return Ok(result); | ws::Message::Continuation(Item::Last(data)) => {
}
ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data)) => {
return self.handle_data(data); return self.handle_data(data);
} }
ws::Message::Close(reason) => { ws::Message::Close(reason) => {

View file

@ -2,6 +2,8 @@ const FILE_CHUNK_SIZE = 16384;
const MAX_FILES = 256; const MAX_FILES = 256;
const SAMPLE_WINDOW = 100; const SAMPLE_WINDOW = 100;
const STALL_THRESHOLD = 1000; const STALL_THRESHOLD = 1000;
const MAX_WS_BUFFER = 1048576;
const WS_BUFFER_DELAY = 10;
let files = []; let files = [];
@ -204,7 +206,11 @@ function sendManifest() {
} }
function handleMessage(msg) { function handleMessage(msg) {
if (bytesSent === 0) { if (bytesSent > 0) {
console.warn('Received unexpected message from server during upload', msg.data);
return;
}
let reply; let reply;
try { try {
reply = JSON.parse(msg.data); reply = JSON.parse(msg.data);
@ -237,15 +243,6 @@ function handleMessage(msg) {
} else if (reply.type === 'error') { } else if (reply.type === 'error') {
displayError(reply.details); displayError(reply.details);
} }
} else {
if (msg.data === 'ack') {
sendData();
} else {
console.error('Received unexpected message from server instead of ack', msg.data);
displayError();
socket.close();
}
}
} }
function updateMaxLifetime(lifetime) { function updateMaxLifetime(lifetime) {
@ -262,10 +259,21 @@ function updateMaxLifetime(lifetime) {
} }
function sendData() { function sendData() {
if (socket.readyState !== 1) {
return;
}
while (true) {
if (fileIndex >= files.length) { if (fileIndex >= files.length) {
finishSending(); finishSending();
return; return;
} }
if (socket.bufferedAmount >= MAX_WS_BUFFER) {
setTimeout(sendData, WS_BUFFER_DELAY);
return;
}
const currentFile = files[fileIndex]; const currentFile = files[fileIndex];
if (byteIndex < currentFile.size) { if (byteIndex < currentFile.size) {
const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size); const endpoint = Math.min(byteIndex+FILE_CHUNK_SIZE, currentFile.size);
@ -287,7 +295,7 @@ function sendData() {
} else { } else {
fileIndex += 1; fileIndex += 1;
byteIndex = 0; byteIndex = 0;
sendData(); }
} }
} }