diff --git a/API.md b/API.md new file mode 100644 index 0000000..4f01332 --- /dev/null +++ b/API.md @@ -0,0 +1,53 @@ +# transbeam websocket api + +- After opening the connection, the client sends an upload manifest to + the server. This is a JSON object containing the following keys: + - `files`: a list of metadata objects for all the files to be + uploaded, in the exact order they will be sent. This list must + contain at least 1 file and at most 256 files. Each file metadata + object has the following keys, all required: + - `name`: The name of the file. This will be sanitised on the + server side, but the sanitisation library isn't especially + restrictive; most Unicode code points will be allowed through + as-is. + - `size`: The exact size of the file, in bytes. + - `modtime`: The modification time of the file, as milliseconds + since the Unix epoch. + - `lifetime`: an integer number of days the files should be kept + for. + +- Once the server receives the metadata, it will respond with a + JSON-encoded object containing at least the field `type`, and + possibly other fields as well. The types of message, and their + associated extra fields if any, are as follows: + - `ready`: The server will accept the upload and is ready to receive + data. + - `code`: A code string that can be used to download the files, + starting now. + - `too_big`: The upload is rejected because the total size of the + files is bigger than the server is willing to accept. + - `max_size`: The maximum total upload size the server will + accept. This is subject to change if the admin changes the + config, or if the server's storage space is filling up. + - `too_long`: The upload is rejected because the requested lifetime + is longer than the server will allow. + - `max_days`: The maximum number of days the client can request + files be kept for. + - `error`: A miscellaneous error has occurred. + - `details`: A string with more information about the error. + + If the message type is anything other than `ready`, the connection + will be closed by the server. + +- If the server is ready to receive files, the client begins sending + chunks of data from the files, as raw binary blobs. The client must + transmit each file's data in order from start to finish, and must + transmit the files in the same order they were listed in the + metadata. The size of the chunks isn't currently specified, and + it's fine for a chunk to span the end of one file and the start of + the next. After sending each chunk (that is, each complete + websocket message), the client must wait for the server to + acknowledge the chunk by sending back the string "ack", and then + send the next chunk if there is one. Once all chunks have been sent + and acknowledged, or once the server has sent a message other than + "ack" to indicate an error, the connection will be closed. diff --git a/src/download.rs b/src/download.rs index 3bde8de..196545a 100644 --- a/src/download.rs +++ b/src/download.rs @@ -30,14 +30,14 @@ use actix_web::{ use actix_files::HttpRange; -use crate::DownloadableFile; +use crate::store::StoredFile; // This is copied substantially from actix-files, with some tweaks pub(crate) struct DownloadingFile { pub(crate) file: File, pub(crate) storage_path: PathBuf, - pub(crate) info: DownloadableFile, + pub(crate) info: StoredFile, } impl DownloadingFile { diff --git a/src/main.rs b/src/main.rs index ca48349..c753dcc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ mod download; -mod state; +mod store; mod upload; mod util; mod zip; @@ -10,42 +10,13 @@ use actix_web::{ get, middleware::Logger, web, App, HttpRequest, HttpResponse, HttpServer, Responder, }; use actix_web_actors::ws; -use serde::{Deserialize, Serialize}; -use state::PersistentState; -use time::OffsetDateTime; +use log::error; +use store::FileStore; use tokio::sync::RwLock; const APP_NAME: &str = "transbeam"; -pub struct UploadedFile { - name: String, - size: u64, - modtime: OffsetDateTime, -} - -impl UploadedFile { - fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self { - Self { - name: sanitise_file_name::sanitise(name), - size, - modtime, - } - } -} - -#[derive(Clone, Deserialize, Serialize)] -pub struct DownloadableFile { - name: String, - size: u64, - #[serde(with = "state::timestamp")] - modtime: OffsetDateTime, -} - -type AppData = web::Data>; - -fn storage_dir() -> PathBuf { - PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from("storage"))) -} +type AppData = web::Data>; #[get("/download/{file_code}")] async fn handle_download( @@ -60,7 +31,7 @@ async fn handle_download( let data = data.read().await; let info = data.lookup_file(&file_code); if let Some(info) = info { - let storage_path = storage_dir().join(file_code); + let storage_path = store::storage_dir().join(file_code); let file = File::open(&storage_path)?; Ok(download::DownloadingFile { file, @@ -82,7 +53,8 @@ async fn handle_upload(req: HttpRequest, stream: web::Payload, data: AppData) -> async fn main() -> std::io::Result<()> { env_logger::init(); - let data: AppData = web::Data::new(RwLock::new(PersistentState::load().await?)); + let data: AppData = web::Data::new(RwLock::new(FileStore::load().await?)); + start_reaper(data.clone()); let static_dir = PathBuf::from(std::env::var("STATIC_DIR").unwrap_or_else(|_| String::from("static"))); @@ -104,3 +76,16 @@ async fn main() -> std::io::Result<()> { .await?; Ok(()) } + +fn start_reaper(data: AppData) { + std::thread::spawn(move || { + actix_web::rt::System::new().block_on(async { + loop { + actix_web::rt::time::sleep(core::time::Duration::from_secs(86400)).await; + if let Err(e) = data.write().await.remove_expired_files().await { + error!("Error reaping expired files: {}", e); + } + } + }); + }); +} diff --git a/src/state.rs b/src/store.rs similarity index 51% rename from src/state.rs rename to src/store.rs index 4bcecb3..ee39794 100644 --- a/src/state.rs +++ b/src/store.rs @@ -1,14 +1,50 @@ -use std::{collections::HashMap, io::ErrorKind}; +use std::{collections::HashMap, io::ErrorKind, path::PathBuf, str::FromStr}; use log::{debug, error, info, warn}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; use tokio::{ fs::File, io::{AsyncReadExt, AsyncWriteExt}, }; -use crate::{storage_dir, DownloadableFile}; - const STATE_FILE_NAME: &str = "files.json"; +const DEFAULT_STORAGE_DIR: &str = "storage"; +const DEFAULT_MAX_LIFETIME: u32 = 30; +const GIGA: u64 = 1024*1024*1024; +const DEFAULT_MAX_SINGLE_SIZE: u64 = 16*GIGA; +const DEFAULT_MAX_TOTAL_SIZE: u64 = 64*GIGA; + + +pub(crate) fn storage_dir() -> PathBuf { + PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from(DEFAULT_STORAGE_DIR))) +} + +fn parse_env_var(var: &str, default: T) -> T { + std::env::var(var).ok().and_then(|val| val.parse::().ok()).unwrap_or(default) +} + +pub(crate) fn max_lifetime() -> u32 { + parse_env_var("TRANSBEAM_MAX_LIFETIME", DEFAULT_MAX_LIFETIME) +} + +pub(crate) fn max_single_size() -> u64 { + parse_env_var("TRANSBEAM_MAX_SINGLE_FILE_SIZE", DEFAULT_MAX_SINGLE_SIZE) +} + +pub(crate) fn max_total_size() -> u64 { + parse_env_var("TRANSBEAM_MAX_TOTAL_FILE_SIZE", DEFAULT_MAX_TOTAL_SIZE) +} + +#[derive(Clone, Deserialize, Serialize)] +pub struct StoredFile { + pub name: String, + pub size: u64, + #[serde(with = "timestamp")] + pub modtime: OffsetDateTime, + #[serde(with = "timestamp")] + pub expiry: OffsetDateTime, +} pub(crate) mod timestamp { use core::fmt; @@ -51,11 +87,12 @@ pub(crate) mod timestamp { } } -async fn is_valid_entry(key: &str, info: &DownloadableFile) -> bool { - if !crate::util::is_ascii_alphanumeric(key) { - error!("Invalid key in persistent storage: {}", key); +async fn is_valid_entry(key: &str, info: &StoredFile) -> bool { + if info.expiry < OffsetDateTime::now_utc() { + info!("File {} has expired", key); return false; } + let file = if let Ok(f) = File::open(storage_dir().join(&key)).await { f } else { @@ -81,24 +118,31 @@ async fn is_valid_entry(key: &str, info: &DownloadableFile) -> bool { true } -pub(crate) struct PersistentState(HashMap); -impl PersistentState { +pub(crate) struct FileStore(HashMap); +impl FileStore { pub(crate) async fn load() -> std::io::Result { let open_result = File::open(storage_dir().join(STATE_FILE_NAME)).await; match open_result { Ok(mut f) => { let mut buf = String::new(); f.read_to_string(&mut buf).await?; - let map: HashMap = serde_json::from_str(&buf)?; + let map: HashMap = serde_json::from_str(&buf)?; info!("Loaded {} file entries from persistent storage", map.len()); - let mut filtered: HashMap = HashMap::new(); + let mut filtered: HashMap = HashMap::new(); for (key, info) in map.into_iter() { + // Handle this case separately, because we don't + // want to try to delete it if it's not the sort + // of path we're expecting + if !crate::util::is_ascii_alphanumeric(&key) { + error!("Invalid key in persistent storage: {}", key); + continue; + } if is_valid_entry(&key, &info).await { filtered.insert(key, info); } else { - info!("Deleting invalid file {}", key); + info!("Deleting file {}", key); if let Err(e) = tokio::fs::remove_file(storage_dir().join(&key)).await { - warn!("Failed to delete invalid file {}: {}", key, e); + warn!("Failed to delete file {}: {}", key, e); } } } @@ -116,6 +160,10 @@ impl PersistentState { } } + fn total_size(&self) -> u64 { + self.0.iter().fold(0, |acc, (_, f)| acc + f.size) + } + async fn save(&mut self) -> std::io::Result<()> { info!("saving updated state: {} entries", self.0.len()); File::create(storage_dir().join(STATE_FILE_NAME)) @@ -124,16 +172,22 @@ impl PersistentState { .await } + /// Attempts to add a file to the store. Returns an I/O error if + /// something's broken, or a u64 of the maximum allowed file size + /// if the file was too big, or a unit if everything worked. pub(crate) async fn add_file( &mut self, key: String, - file: DownloadableFile, - ) -> std::io::Result<()> { + file: StoredFile, + ) -> std::io::Result> { + let remaining_size = max_total_size().saturating_sub(self.total_size()); + let allowed_size = std::cmp::min(remaining_size, max_single_size()); + if file.size > allowed_size { return Ok(Err(allowed_size)); } self.0.insert(key, file); - self.save().await + self.save().await.map(Ok) } - pub(crate) fn lookup_file(&self, key: &str) -> Option { + pub(crate) fn lookup_file(&self, key: &str) -> Option { self.0.get(key).cloned() } @@ -142,4 +196,20 @@ impl PersistentState { self.0.remove(key); self.save().await } + + pub(crate) async fn remove_expired_files(&mut self) -> std::io::Result<()> { + info!("Checking for expired files"); + let now = OffsetDateTime::now_utc(); + for (key, file) in std::mem::replace(&mut self.0, HashMap::new()).into_iter() { + if file.expiry > now { + self.0.insert(key, file); + } else { + info!("Deleting expired file {}", key); + if let Err(e) = tokio::fs::remove_file(storage_dir().join(&key)).await { + warn!("Failed to delete expired file {}: {}", key, e); + } + } + } + self.save().await + } } diff --git a/src/upload.rs b/src/upload.rs index ad7bb1f..7191e5f 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -6,10 +6,10 @@ use actix_web_actors::ws::{self, CloseCode}; use bytes::Bytes; use log::{debug, error, info, trace}; use rand::distributions::{Alphanumeric, DistString}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use crate::{storage_dir, DownloadableFile, UploadedFile}; +use crate::store::{storage_dir, StoredFile, self}; const MAX_FILES: usize = 256; const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] = @@ -31,6 +31,10 @@ enum Error { NoFiles, #[error("Number of files submitted by client exceeded the maximum limit")] TooManyFiles, + #[error("Requested lifetime was too long")] + TooLong, + #[error("Upload size was too large, can be at most {0} bytes")] + TooBig(u64), #[error("Websocket was closed by client before completing transfer")] ClosedEarly(Option), #[error("Client sent more data than they were supposed to")] @@ -40,15 +44,17 @@ enum Error { impl Error { fn close_code(&self) -> CloseCode { match self { - Self::Parse(_) => CloseCode::Invalid, - Self::Storage(_) => CloseCode::Error, - Self::TimeFormat(_) => CloseCode::Error, - Self::DuplicateFilename => CloseCode::Policy, - Self::UnexpectedMessageType => CloseCode::Invalid, - Self::NoFiles => CloseCode::Policy, - Self::TooManyFiles => CloseCode::Policy, - Self::ClosedEarly(_) => CloseCode::Invalid, - Self::UnexpectedExtraData => CloseCode::Invalid, + Self::Storage(_) + | Self::TimeFormat(_) => CloseCode::Error, + Self::Parse(_) + | Self::UnexpectedMessageType + | Self::ClosedEarly(_) + | Self::UnexpectedExtraData => CloseCode::Invalid, + Self::DuplicateFilename + | Self::NoFiles + | Self::TooManyFiles + | Self::TooLong + | Self::TooBig(_) => CloseCode::Policy, } } } @@ -64,7 +70,7 @@ impl Uploader { pub(crate) fn new(app_data: super::AppData) -> Self { Self { writer: None, - storage_filename: String::new(), + storage_filename: Alphanumeric.sample_string(&mut rand::thread_rng(), 8), app_data, bytes_remaining: 0, } @@ -75,6 +81,22 @@ impl Actor for Uploader { type Context = ws::WebsocketContext; } +pub struct UploadedFile { + pub name: String, + pub size: u64, + pub modtime: OffsetDateTime, +} + +impl UploadedFile { + fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self { + Self { + name: sanitise_file_name::sanitise(name), + size, + modtime, + } + } +} + #[derive(Debug, Deserialize)] struct RawUploadedFile { name: String, @@ -93,6 +115,31 @@ impl RawUploadedFile { } } +#[derive(Deserialize)] +struct UploadManifest { + files: Vec, + lifetime: u32, +} + +#[derive(Serialize)] +#[serde(rename_all = "snake_case", tag = "type")] +enum ServerMessage { + Ready { code: String }, + TooBig { max_size: u64 }, + TooLong { max_days: u32 }, + Error { details: String }, +} + +impl From<&Error> for ServerMessage { + fn from(e: &Error) -> Self { + match e { + Error::TooBig(max_size) => ServerMessage::TooBig { max_size: *max_size }, + Error::TooLong => ServerMessage::TooLong { max_days: store::max_lifetime() }, + _ => ServerMessage::Error { details: e.to_string() }, + } + } +} + fn stop_and_flush(_: T, u: &mut Uploader, ctx: &mut ::Context) { ctx.stop(); if let Some(w) = u.writer.as_mut() { @@ -115,12 +162,7 @@ impl StreamHandler> for Uploader { match self.handle_message(msg, ctx) { Err(e) => { - error!("{}", e); - ctx.close(Some(ws::CloseReason { - code: e.close_code(), - description: Some(e.to_string()), - })); - self.cleanup_after_error(ctx); + self.notify_error_and_cleanup(e, ctx); } Ok(true) => { info!("Finished uploading data"); @@ -140,6 +182,16 @@ fn ack(ctx: &mut ::Context) { } impl Uploader { + fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut ::Context) { + error!("{}", e); + ctx.text(serde_json::to_string(&ServerMessage::from(&e)).unwrap()); + ctx.close(Some(ws::CloseReason { + code: e.close_code(), + description: Some(e.to_string()), + })); + self.cleanup_after_error(ctx); + } + fn handle_message( &mut self, msg: ws::Message, @@ -151,12 +203,18 @@ impl Uploader { if self.writer.is_some() { return Err(Error::UnexpectedMessageType); } - let raw_files: Vec = serde_json::from_slice(text.as_bytes())?; + let UploadManifest { files: raw_files, lifetime, } = serde_json::from_slice(text.as_bytes())?; + if lifetime > store::max_lifetime() { + return Err(Error::TooLong); + } info!("Received file list: {} files", raw_files.len()); debug!("{:?}", raw_files); if raw_files.len() > MAX_FILES { return Err(Error::TooManyFiles); } + if raw_files.is_empty() { + return Err(Error::NoFiles); + } let mut filenames: HashSet = HashSet::new(); let mut files = Vec::new(); for raw_file in raw_files.iter() { @@ -172,18 +230,13 @@ impl Uploader { self.bytes_remaining += file.size; files.push(file); } - if files.is_empty() { - return Err(Error::NoFiles); - } - let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8); - self.storage_filename = storage_filename.clone(); - let storage_path = storage_dir().join(storage_filename.clone()); + let storage_path = storage_dir().join(self.storage_filename.clone()); info!("storing to: {:?}", storage_path); let writer = File::options() .write(true) .create_new(true) .open(&storage_path)?; - let (writer, downloadable_file): (Box, _) = if files.len() > 1 { + let (writer, name, size, modtime): (Box,_,_,_) = if files.len() > 1 { info!("Wrapping in zipfile generator"); let now = OffsetDateTime::now_utc(); let zip_writer = super::zip::ZipGenerator::new(files, writer); @@ -192,33 +245,40 @@ impl Uploader { super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip"; ( Box::new(zip_writer), - DownloadableFile { - name: download_filename, - size, - modtime: now, - }, + download_filename, + size, + now, ) } else { ( Box::new(writer), - DownloadableFile { - name: files[0].name.clone(), - size: files[0].size, - modtime: files[0].modtime, - }, + files[0].name.clone(), + files[0].size, + files[0].modtime, ) }; self.writer = Some(writer); + let stored_file = StoredFile { + name, + size, + modtime, + expiry: OffsetDateTime::now_utc() + lifetime*time::Duration::DAY, + }; let data = self.app_data.clone(); + let storage_filename = self.storage_filename.clone(); ctx.spawn(actix::fut::wrap_future(async move { debug!("Spawned future to add entry {} to state", storage_filename); data.write() .await - .add_file(storage_filename, downloadable_file) + .add_file(storage_filename, stored_file) .await - .unwrap(); + }).map(|res, u: &mut Self, ctx: &mut ::Context| { + match res { + Ok(Ok(())) => ctx.text(serde_json::to_string(&ServerMessage::Ready { code: u.storage_filename.clone() }).unwrap()), + Ok(Err(size)) => u.notify_error_and_cleanup(Error::TooBig(size), ctx), + Err(e) => u.notify_error_and_cleanup(Error::from(e), ctx) + } })); - ctx.text(self.storage_filename.as_str()); } ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => { let result = self.handle_data(data)?; diff --git a/src/zip.rs b/src/zip.rs index de8b1a9..dbb2106 100644 --- a/src/zip.rs +++ b/src/zip.rs @@ -4,7 +4,7 @@ use crc32fast::Hasher; use log::debug; use time::OffsetDateTime; -use crate::UploadedFile; +use crate::upload::UploadedFile; const SIGNATURE_SIZE: u64 = 4; const SHARED_FIELDS_SIZE: u64 = 26; diff --git a/static/index.html b/static/index.html index d70d060..bf42ad4 100644 --- a/static/index.html +++ b/static/index.html @@ -18,6 +18,17 @@ +