From 073feda920a0aa9a42aabfb75c9ffe6c7bc06ea9 Mon Sep 17 00:00:00 2001 From: xenofem Date: Tue, 16 Aug 2022 04:54:18 -0400 Subject: [PATCH] move state to jsondb --- Cargo.lock | 14 ++++- Cargo.toml | 5 +- README.md | 4 ++ src/main.rs | 65 ++++++++++++++-------- src/state.rs | 12 ++++ src/store.rs | 150 +++++++++++++++++++------------------------------- src/upload.rs | 36 +++++------- src/zip.rs | 2 +- 8 files changed, 145 insertions(+), 143 deletions(-) create mode 100644 src/state.rs diff --git a/Cargo.lock b/Cargo.lock index cd65f09..d506ae1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -896,6 +896,17 @@ dependencies = [ "libc", ] +[[package]] +name = "jsondb" +version = "0.4.0" +source = "git+https://git.xeno.science/xenofem/jsondb#b8d31b77d937e524a72c50bbed93d276741e174b" +dependencies = [ + "serde", + "serde_json", + "thiserror", + "tokio", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -1585,7 +1596,7 @@ dependencies = [ [[package]] name = "transbeam" -version = "0.2.0" +version = "0.3.0" dependencies = [ "actix", "actix-files", @@ -1601,6 +1612,7 @@ dependencies = [ "env_logger", "futures-core", "inotify", + "jsondb", "log", "mime", "mnemonic", diff --git a/Cargo.toml b/Cargo.toml index 9cd2f20..ae4d097 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,12 +1,10 @@ [package] name = "transbeam" -version = "0.2.0" +version = "0.3.0" authors = ["xenofem "] edition = "2021" license = "MIT" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] actix = "0.13" actix-files = "0.6.0" @@ -22,6 +20,7 @@ dotenv = "0.15" env_logger = "0.9" futures-core = "0.3" inotify = "0.10" +jsondb = { version = "0.4.0", git = "https://git.xeno.science/xenofem/jsondb" } log = "0.4" mime = "0.3.16" mnemonic = "1.0.1" diff --git a/README.md b/README.md index 08fa58d..dba10bb 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,10 @@ transbeam is configured with the following environment variables: (default: `./storage`) - `TRANSBEAM_STATIC_DIR`: path where the web app's static files live (default: `./static`) +- `TRANSBEAM_STATE_FILE`: path of a JSON file where transbeam will + store file metadata and other persistent state (default: + `$TRANSBEAM_STORAGE_DIR/files.json` if that's already an existing + file (for backwards compatibility), `./transbeam.json` otherwise) - `TRANSBEAM_BASE_URL`: base URL for this transbeam instance, without trailing `/` - `TRANSBEAM_PORT`: port to listen on localhost for http requests diff --git a/src/main.rs b/src/main.rs index 2aa7fdf..3805cec 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ mod download; +mod state; mod store; mod timestamp; mod upload; @@ -16,21 +17,23 @@ use askama_actix::{Template, TemplateToResponse}; use bytesize::ByteSize; use log::{error, warn}; use serde::{Deserialize, Serialize}; -use store::{FileStore, StoredFile}; -use tokio::{fs::File, sync::RwLock}; +use state::StateDb; +use store::StoredFile; +use tokio::fs::File; const APP_NAME: &str = "transbeam"; const DATE_DISPLAY_FORMAT: &[time::format_description::FormatItem] = time::macros::format_description!("[year]-[month]-[day]"); -struct AppState { - file_store: RwLock, +struct AppData { + state: state::StateDb, config: Config, } struct Config { base_url: String, + max_storage_size: u64, max_upload_size: u64, max_lifetime: u16, upload_password: String, @@ -63,7 +66,7 @@ struct IndexPage { } #[get("/")] -async fn index(data: web::Data) -> impl Responder { +async fn index(data: web::Data) -> impl Responder { IndexPage { cachebuster: data.config.cachebuster.clone(), base_url: data.config.base_url.clone(), @@ -96,15 +99,16 @@ struct DownloadInfo { async fn handle_download( req: HttpRequest, query: web::Query, - data: web::Data, + data: web::Data, ) -> actix_web::Result { let code = &query.code; if !store::is_valid_storage_code(code) { return not_found(req, data, true); } - let info = data.file_store.read().await.lookup_file(code); + let store = data.state.read().await; + let info = store.0.get(code); let info = if let Some(i) = info { - i + i.clone() } else { return not_found(req, data, true); }; @@ -153,15 +157,16 @@ struct InfoQuery { async fn download_info( req: HttpRequest, query: web::Query, - data: web::Data, + data: web::Data, ) -> actix_web::Result { let code = &query.code; if !store::is_valid_storage_code(code) { return not_found(req, data, true); } - let info = data.file_store.read().await.lookup_file(code); + let store = data.state.read().await; + let info = store.0.get(code); let info = if let Some(i) = info { - i + i.clone() } else { return not_found(req, data, true); }; @@ -183,7 +188,7 @@ struct NotFoundPage<'a> { base_url: &'a str, } -fn not_found(req: HttpRequest, data: web::Data, report: bool) -> actix_web::Result { +fn not_found(req: HttpRequest, data: web::Data, report: bool) -> actix_web::Result { if report { let ip_addr = get_ip_addr(&req, data.config.reverse_proxy); log_auth_failure(&ip_addr); @@ -202,9 +207,9 @@ fn not_found(req: HttpRequest, data: web::Data, report: bool) -> ac async fn handle_upload( req: HttpRequest, stream: web::Payload, - data: web::Data, + data: web::Data, ) -> impl Responder { - if data.file_store.read().await.full() { + if data.state.read().await.full(data.config.max_storage_size) { return Ok(HttpResponse::BadRequest().finish()); } let ip_addr = get_ip_addr(&req, data.config.reverse_proxy); @@ -220,7 +225,7 @@ struct UploadPasswordCheck { async fn check_upload_password( req: HttpRequest, body: web::Json, - data: web::Data, + data: web::Data, ) -> impl Responder { let ip_addr = get_ip_addr(&req, data.config.reverse_proxy); if body.password != data.config.upload_password { @@ -239,10 +244,10 @@ struct UploadLimits { } #[get("/upload/limits.json")] -async fn upload_limits(data: web::Data) -> impl Responder { - let file_store = data.file_store.read().await; - let open = !file_store.full(); - let available_size = file_store.available_size(); +async fn upload_limits(data: web::Data) -> impl Responder { + let file_store = data.state.read().await; + let open = !file_store.full(data.config.max_storage_size); + let available_size = file_store.available_size(data.config.max_storage_size); let max_size = std::cmp::min(available_size, data.config.max_upload_size); web::Json(UploadLimits { open, @@ -304,11 +309,24 @@ async fn main() -> std::io::Result<()> { let upload_password: String = env_or_panic("TRANSBEAM_UPLOAD_PASSWORD"); let cachebuster: String = env_or_else("TRANSBEAM_CACHEBUSTER", String::new); - let data = web::Data::new(AppState { - file_store: RwLock::new(FileStore::load(storage_dir.clone(), max_storage_size).await?), + let state_file: PathBuf = match std::env::var("TRANSBEAM_STATE_FILE") { + Ok(v) => v.parse().unwrap_or_else(|_| panic!("Invalid value {} for variable TRANSBEAM_STATE_FILE", v)), + Err(_) => { + let legacy_state_file = storage_dir.join("files.json"); + if legacy_state_file.is_file() { + legacy_state_file + } else { + PathBuf::from("transbeam.json") + } + } + }; + + let data = web::Data::new(AppData { + state: StateDb::load(state_file).await.expect("Failed to load state file"), config: Config { base_url, max_upload_size, + max_storage_size, max_lifetime, upload_password, storage_dir, @@ -317,6 +335,7 @@ async fn main() -> std::io::Result<()> { cachebuster, }, }); + data.cleanup().await?; start_reaper(data.clone()); let server = HttpServer::new(move || { @@ -351,12 +370,12 @@ async fn main() -> std::io::Result<()> { Ok(()) } -fn start_reaper(data: web::Data) { +fn start_reaper(data: web::Data) { std::thread::spawn(move || { actix_web::rt::System::new().block_on(async { loop { actix_web::rt::time::sleep(core::time::Duration::from_secs(86400)).await; - if let Err(e) = data.file_store.write().await.remove_expired_files().await { + if let Err(e) = data.remove_expired_files().await { error!("Error reaping expired files: {}", e); } } diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..1a700f9 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,12 @@ +use jsondb::JsonDb; + +mod v0 { + pub type State = crate::store::StoredFiles; + + impl jsondb::SchemaV0 for State { + const VERSION_OPTIONAL: bool = true; + } +} + +pub use v0::State; +pub type StateDb = JsonDb; diff --git a/src/store.rs b/src/store.rs index 8291ad2..45d807d 100644 --- a/src/store.rs +++ b/src/store.rs @@ -1,7 +1,7 @@ use std::{ collections::HashMap, io::ErrorKind, - path::{Path, PathBuf}, + path::{Path, PathBuf}, ops::DerefMut, }; use log::{debug, error, info}; @@ -13,15 +13,11 @@ use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; use serde_with::{serde_as, FromInto, PickFirst}; use time::OffsetDateTime; -use tokio::{ - fs::File, - io::{AsyncReadExt, AsyncWriteExt}, -}; +use tokio::fs::File; use crate::upload::UploadedFile; use crate::zip::FileSet; -const STATE_FILE_NAME: &str = "files.json"; const MAX_STORAGE_FILES: usize = 1024; pub fn gen_storage_code(use_mnemonic: bool) -> String { @@ -40,7 +36,7 @@ pub fn is_valid_storage_code(s: &str) -> bool { #[serde_as] #[skip_serializing_none] -#[derive(Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct StoredFile { pub name: String, pub size: u64, @@ -53,6 +49,9 @@ pub struct StoredFile { pub contents: Option, } +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct StoredFiles(pub HashMap); + async fn is_valid_entry(key: &str, info: &StoredFile, storage_dir: &Path) -> bool { if info.expiry < OffsetDateTime::now_utc() { info!("File {} has expired", key); @@ -104,124 +103,87 @@ pub enum FileAddError { Full, } -pub struct FileStore { - files: HashMap, - storage_dir: PathBuf, - max_storage_size: u64, -} - -impl FileStore { - pub(crate) async fn load(storage_dir: PathBuf, max_storage_size: u64) -> std::io::Result { - let open_result = File::open(storage_dir.join(STATE_FILE_NAME)).await; - match open_result { - Ok(mut f) => { - let mut buf = String::new(); - f.read_to_string(&mut buf).await?; - let map: HashMap = serde_json::from_str(&buf)?; - info!("Loaded {} file entries from persistent storage", map.len()); - let mut filtered: HashMap = HashMap::new(); - for (key, info) in map.into_iter() { - // Handle this case separately, because we don't - // want to try to delete it if it's not the sort - // of path we're expecting - if !is_valid_storage_code(&key) { - error!("Invalid key in persistent storage: {}", key); - continue; - } - if is_valid_entry(&key, &info, &storage_dir).await { - filtered.insert(key, info); - } else { - info!("Deleting file {}", key); - delete_file_if_exists(&storage_dir.join(&key)).await?; - } - } - let mut loaded = Self { - files: filtered, - storage_dir, - max_storage_size, - }; - loaded.save().await?; - Ok(loaded) +impl crate::AppData { + // This must only be run at startup, or it will delete in-progress + // uploads. + pub async fn cleanup(&self) -> std::io::Result<()> { + let mut store = self.state.write().await; + let old = std::mem::take(store.deref_mut()); + for (key, info) in old.0.into_iter() { + // Handle this case separately, because we don't + // want to try to delete it if it's not the sort + // of path we're expecting + if !is_valid_storage_code(&key) { + error!("Invalid key in persistent storage: {}", key); + continue; } - Err(e) => { - if let ErrorKind::NotFound = e.kind() { - Ok(Self { - files: HashMap::new(), - storage_dir, - max_storage_size, - }) - } else { - Err(e) - } + if is_valid_entry(&key, &info, &self.config.storage_dir).await { + store.0.insert(key, info); + } else { + info!("Deleting file {}", key); + delete_file_if_exists(&self.config.storage_dir.join(&key)).await?; } } - } - - fn total_size(&self) -> u64 { - self.files.iter().fold(0, |acc, (_, f)| acc + f.size) - } - - pub fn available_size(&self) -> u64 { - self.max_storage_size.saturating_sub(self.total_size()) - } - - async fn save(&mut self) -> std::io::Result<()> { - info!("saving updated state: {} entries", self.files.len()); - File::create(self.storage_dir.join(STATE_FILE_NAME)) - .await? - .write_all(&serde_json::to_vec_pretty(&self.files)?) - .await - } - - pub fn full(&self) -> bool { - self.available_size() == 0 || self.files.len() >= MAX_STORAGE_FILES + Ok(()) } /// Attempts to add a file to the store. Returns an I/O error if /// something's broken, or a u64 of the maximum allowed file size /// if the file was too big, or a unit if everything worked. - pub(crate) async fn add_file( - &mut self, + pub async fn add_file( + &self, key: String, file: StoredFile, ) -> Result<(), FileAddError> { - if self.full() { + let mut store = self.state.write().await; + if store.full(self.config.max_storage_size) { return Err(FileAddError::Full); } - let available_size = self.available_size(); + let available_size = store.available_size(self.config.max_storage_size); if file.size > available_size { return Err(FileAddError::TooBig(available_size)); } - self.files.insert(key, file); - self.save().await?; + store.0.insert(key, file); Ok(()) } - pub(crate) fn lookup_file(&self, key: &str) -> Option { - self.files.get(key).cloned() - } - - pub(crate) async fn remove_file(&mut self, key: &str) -> std::io::Result<()> { + pub async fn remove_file(&self, key: &str) -> std::io::Result<()> { debug!("removing entry {} from state", key); - self.files.remove(key); - self.save().await?; + let mut store = self.state.write().await; + store.0.remove(key); if is_valid_storage_code(key) { - delete_file_if_exists(&self.storage_dir.join(key)).await?; + delete_file_if_exists(&self.config.storage_dir.join(key)).await?; } Ok(()) } - pub(crate) async fn remove_expired_files(&mut self) -> std::io::Result<()> { + pub async fn remove_expired_files(&self) -> std::io::Result<()> { info!("Checking for expired files"); let now = OffsetDateTime::now_utc(); - for (key, file) in std::mem::take(&mut self.files).into_iter() { + let mut store = self.state.write().await; + let old = std::mem::take(store.deref_mut()); + for (key, file) in old.0.into_iter() { if file.expiry > now { - self.files.insert(key, file); + store.0.insert(key, file); } else { info!("Deleting expired file {}", key); - delete_file_if_exists(&self.storage_dir.join(&key)).await?; + delete_file_if_exists(&self.config.storage_dir.join(&key)).await?; } } - self.save().await + Ok(()) + } +} + +impl StoredFiles { + fn total_size(&self) -> u64 { + self.0.iter().fold(0, |acc, (_, f)| acc + f.size) + } + + pub fn available_size(&self, max_storage_size: u64) -> u64 { + max_storage_size.saturating_sub(self.total_size()) + } + + pub fn full(&self, max_storage_size: u64) -> bool { + self.available_size(max_storage_size) == 0 || self.0.len() >= MAX_STORAGE_FILES } } diff --git a/src/upload.rs b/src/upload.rs index 4680cee..dd84448 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -15,7 +15,7 @@ use crate::{ log_auth_failure, store::{self, FileAddError, StoredFile}, zip::FileSet, - AppState, + AppData, }; const MAX_FILES: usize = 256; @@ -86,17 +86,17 @@ impl Error { pub struct Uploader { writer: Option>, storage_filename: String, - app_state: web::Data, + app_data: web::Data, bytes_remaining: u64, ip_addr: String, } impl Uploader { - pub(crate) fn new(app_state: web::Data, ip_addr: String) -> Self { + pub(crate) fn new(app_data: web::Data, ip_addr: String) -> Self { Self { writer: None, - storage_filename: store::gen_storage_code(app_state.config.mnemonic_codes), - app_state, + storage_filename: store::gen_storage_code(app_data.config.mnemonic_codes), + app_data, bytes_remaining: 0, ip_addr, } @@ -109,7 +109,7 @@ impl Actor for Uploader { type Context = ::Context; -#[derive(Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct UploadedFile { pub name: String, pub size: u64, @@ -251,8 +251,8 @@ impl Uploader { if std::env::var("TRANSBEAM_UPLOAD_PASSWORD") != Ok(password) { return Err(Error::IncorrectPassword); } - if lifetime > self.app_state.config.max_lifetime { - return Err(Error::TooLong(self.app_state.config.max_lifetime)); + if lifetime > self.app_data.config.max_lifetime { + return Err(Error::TooLong(self.app_data.config.max_lifetime)); } info!("Received file list: {} files", raw_files.len()); debug!("{:?}", raw_files); @@ -277,11 +277,11 @@ impl Uploader { self.bytes_remaining += file.size; files.push(file); } - if self.bytes_remaining > self.app_state.config.max_upload_size { - return Err(Error::TooBig(self.app_state.config.max_upload_size)); + if self.bytes_remaining > self.app_data.config.max_upload_size { + return Err(Error::TooBig(self.app_data.config.max_upload_size)); } let storage_path = self - .app_state + .app_data .config .storage_dir .join(self.storage_filename.clone()); @@ -329,15 +329,12 @@ impl Uploader { expiry: OffsetDateTime::now_utc() + lifetime * time::Duration::DAY, contents, }; - let state = self.app_state.clone(); + let app_data = self.app_data.clone(); let storage_filename = self.storage_filename.clone(); ctx.spawn( actix::fut::wrap_future(async move { debug!("Spawned future to add entry {} to state", storage_filename); - state - .file_store - .write() - .await + app_data .add_file(storage_filename, stored_file) .await }) @@ -404,15 +401,12 @@ impl Uploader { "Cleaning up after failed upload of {}", self.storage_filename ); - let state = self.app_state.clone(); + let app_data = self.app_data.clone(); let filename = self.storage_filename.clone(); ctx.wait( actix::fut::wrap_future(async move { debug!("Spawned future to remove entry {} from state", filename); - state - .file_store - .write() - .await + app_data .remove_file(&filename) .await .unwrap(); diff --git a/src/zip.rs b/src/zip.rs index 99d0995..b9c775e 100644 --- a/src/zip.rs +++ b/src/zip.rs @@ -28,7 +28,7 @@ const EOCD_TOTAL_SIZE: u64 = EOCD64_RECORD_SIZE + EOCD64_LOCATOR_SIZE + EOCD_REC const EMPTY_STRING_CRC32: u32 = 0; -#[derive(Clone, Deserialize, Serialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct FileSet { pub files: Vec, // Optional for backwards compatibility only