move state to jsondb

This commit is contained in:
xenofem 2022-08-16 04:54:18 -04:00
parent 446c0f0264
commit 073feda920
8 changed files with 145 additions and 143 deletions

14
Cargo.lock generated
View file

@ -896,6 +896,17 @@ dependencies = [
"libc",
]
[[package]]
name = "jsondb"
version = "0.4.0"
source = "git+https://git.xeno.science/xenofem/jsondb#b8d31b77d937e524a72c50bbed93d276741e174b"
dependencies = [
"serde",
"serde_json",
"thiserror",
"tokio",
]
[[package]]
name = "language-tags"
version = "0.3.2"
@ -1585,7 +1596,7 @@ dependencies = [
[[package]]
name = "transbeam"
version = "0.2.0"
version = "0.3.0"
dependencies = [
"actix",
"actix-files",
@ -1601,6 +1612,7 @@ dependencies = [
"env_logger",
"futures-core",
"inotify",
"jsondb",
"log",
"mime",
"mnemonic",

View file

@ -1,12 +1,10 @@
[package]
name = "transbeam"
version = "0.2.0"
version = "0.3.0"
authors = ["xenofem <xenofem@xeno.science>"]
edition = "2021"
license = "MIT"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix = "0.13"
actix-files = "0.6.0"
@ -22,6 +20,7 @@ dotenv = "0.15"
env_logger = "0.9"
futures-core = "0.3"
inotify = "0.10"
jsondb = { version = "0.4.0", git = "https://git.xeno.science/xenofem/jsondb" }
log = "0.4"
mime = "0.3.16"
mnemonic = "1.0.1"

View file

@ -28,6 +28,10 @@ transbeam is configured with the following environment variables:
(default: `./storage`)
- `TRANSBEAM_STATIC_DIR`: path where the web app's static files live
(default: `./static`)
- `TRANSBEAM_STATE_FILE`: path of a JSON file where transbeam will
store file metadata and other persistent state (default:
`$TRANSBEAM_STORAGE_DIR/files.json` if that's already an existing
file (for backwards compatibility), `./transbeam.json` otherwise)
- `TRANSBEAM_BASE_URL`: base URL for this transbeam instance, without
trailing `/`
- `TRANSBEAM_PORT`: port to listen on localhost for http requests

View file

@ -1,4 +1,5 @@
mod download;
mod state;
mod store;
mod timestamp;
mod upload;
@ -16,21 +17,23 @@ use askama_actix::{Template, TemplateToResponse};
use bytesize::ByteSize;
use log::{error, warn};
use serde::{Deserialize, Serialize};
use store::{FileStore, StoredFile};
use tokio::{fs::File, sync::RwLock};
use state::StateDb;
use store::StoredFile;
use tokio::fs::File;
const APP_NAME: &str = "transbeam";
const DATE_DISPLAY_FORMAT: &[time::format_description::FormatItem] =
time::macros::format_description!("[year]-[month]-[day]");
struct AppState {
file_store: RwLock<FileStore>,
struct AppData {
state: state::StateDb,
config: Config,
}
struct Config {
base_url: String,
max_storage_size: u64,
max_upload_size: u64,
max_lifetime: u16,
upload_password: String,
@ -63,7 +66,7 @@ struct IndexPage {
}
#[get("/")]
async fn index(data: web::Data<AppState>) -> impl Responder {
async fn index(data: web::Data<AppData>) -> impl Responder {
IndexPage {
cachebuster: data.config.cachebuster.clone(),
base_url: data.config.base_url.clone(),
@ -96,15 +99,16 @@ struct DownloadInfo {
async fn handle_download(
req: HttpRequest,
query: web::Query<DownloadRequest>,
data: web::Data<AppState>,
data: web::Data<AppData>,
) -> actix_web::Result<HttpResponse> {
let code = &query.code;
if !store::is_valid_storage_code(code) {
return not_found(req, data, true);
}
let info = data.file_store.read().await.lookup_file(code);
let store = data.state.read().await;
let info = store.0.get(code);
let info = if let Some(i) = info {
i
i.clone()
} else {
return not_found(req, data, true);
};
@ -153,15 +157,16 @@ struct InfoQuery {
async fn download_info(
req: HttpRequest,
query: web::Query<InfoQuery>,
data: web::Data<AppState>,
data: web::Data<AppData>,
) -> actix_web::Result<impl Responder> {
let code = &query.code;
if !store::is_valid_storage_code(code) {
return not_found(req, data, true);
}
let info = data.file_store.read().await.lookup_file(code);
let store = data.state.read().await;
let info = store.0.get(code);
let info = if let Some(i) = info {
i
i.clone()
} else {
return not_found(req, data, true);
};
@ -183,7 +188,7 @@ struct NotFoundPage<'a> {
base_url: &'a str,
}
fn not_found<T>(req: HttpRequest, data: web::Data<AppState>, report: bool) -> actix_web::Result<T> {
fn not_found<T>(req: HttpRequest, data: web::Data<AppData>, report: bool) -> actix_web::Result<T> {
if report {
let ip_addr = get_ip_addr(&req, data.config.reverse_proxy);
log_auth_failure(&ip_addr);
@ -202,9 +207,9 @@ fn not_found<T>(req: HttpRequest, data: web::Data<AppState>, report: bool) -> ac
async fn handle_upload(
req: HttpRequest,
stream: web::Payload,
data: web::Data<AppState>,
data: web::Data<AppData>,
) -> impl Responder {
if data.file_store.read().await.full() {
if data.state.read().await.full(data.config.max_storage_size) {
return Ok(HttpResponse::BadRequest().finish());
}
let ip_addr = get_ip_addr(&req, data.config.reverse_proxy);
@ -220,7 +225,7 @@ struct UploadPasswordCheck {
async fn check_upload_password(
req: HttpRequest,
body: web::Json<UploadPasswordCheck>,
data: web::Data<AppState>,
data: web::Data<AppData>,
) -> impl Responder {
let ip_addr = get_ip_addr(&req, data.config.reverse_proxy);
if body.password != data.config.upload_password {
@ -239,10 +244,10 @@ struct UploadLimits {
}
#[get("/upload/limits.json")]
async fn upload_limits(data: web::Data<AppState>) -> impl Responder {
let file_store = data.file_store.read().await;
let open = !file_store.full();
let available_size = file_store.available_size();
async fn upload_limits(data: web::Data<AppData>) -> impl Responder {
let file_store = data.state.read().await;
let open = !file_store.full(data.config.max_storage_size);
let available_size = file_store.available_size(data.config.max_storage_size);
let max_size = std::cmp::min(available_size, data.config.max_upload_size);
web::Json(UploadLimits {
open,
@ -304,11 +309,24 @@ async fn main() -> std::io::Result<()> {
let upload_password: String = env_or_panic("TRANSBEAM_UPLOAD_PASSWORD");
let cachebuster: String = env_or_else("TRANSBEAM_CACHEBUSTER", String::new);
let data = web::Data::new(AppState {
file_store: RwLock::new(FileStore::load(storage_dir.clone(), max_storage_size).await?),
let state_file: PathBuf = match std::env::var("TRANSBEAM_STATE_FILE") {
Ok(v) => v.parse().unwrap_or_else(|_| panic!("Invalid value {} for variable TRANSBEAM_STATE_FILE", v)),
Err(_) => {
let legacy_state_file = storage_dir.join("files.json");
if legacy_state_file.is_file() {
legacy_state_file
} else {
PathBuf::from("transbeam.json")
}
}
};
let data = web::Data::new(AppData {
state: StateDb::load(state_file).await.expect("Failed to load state file"),
config: Config {
base_url,
max_upload_size,
max_storage_size,
max_lifetime,
upload_password,
storage_dir,
@ -317,6 +335,7 @@ async fn main() -> std::io::Result<()> {
cachebuster,
},
});
data.cleanup().await?;
start_reaper(data.clone());
let server = HttpServer::new(move || {
@ -351,12 +370,12 @@ async fn main() -> std::io::Result<()> {
Ok(())
}
fn start_reaper(data: web::Data<AppState>) {
fn start_reaper(data: web::Data<AppData>) {
std::thread::spawn(move || {
actix_web::rt::System::new().block_on(async {
loop {
actix_web::rt::time::sleep(core::time::Duration::from_secs(86400)).await;
if let Err(e) = data.file_store.write().await.remove_expired_files().await {
if let Err(e) = data.remove_expired_files().await {
error!("Error reaping expired files: {}", e);
}
}

12
src/state.rs Normal file
View file

@ -0,0 +1,12 @@
use jsondb::JsonDb;
mod v0 {
pub type State = crate::store::StoredFiles;
impl jsondb::SchemaV0 for State {
const VERSION_OPTIONAL: bool = true;
}
}
pub use v0::State;
pub type StateDb = JsonDb<State>;

View file

@ -1,7 +1,7 @@
use std::{
collections::HashMap,
io::ErrorKind,
path::{Path, PathBuf},
path::{Path, PathBuf}, ops::DerefMut,
};
use log::{debug, error, info};
@ -13,15 +13,11 @@ use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use serde_with::{serde_as, FromInto, PickFirst};
use time::OffsetDateTime;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
};
use tokio::fs::File;
use crate::upload::UploadedFile;
use crate::zip::FileSet;
const STATE_FILE_NAME: &str = "files.json";
const MAX_STORAGE_FILES: usize = 1024;
pub fn gen_storage_code(use_mnemonic: bool) -> String {
@ -40,7 +36,7 @@ pub fn is_valid_storage_code(s: &str) -> bool {
#[serde_as]
#[skip_serializing_none]
#[derive(Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct StoredFile {
pub name: String,
pub size: u64,
@ -53,6 +49,9 @@ pub struct StoredFile {
pub contents: Option<FileSet>,
}
#[derive(Debug, Default, Deserialize, Serialize)]
pub struct StoredFiles(pub HashMap<String, StoredFile>);
async fn is_valid_entry(key: &str, info: &StoredFile, storage_dir: &Path) -> bool {
if info.expiry < OffsetDateTime::now_utc() {
info!("File {} has expired", key);
@ -104,124 +103,87 @@ pub enum FileAddError {
Full,
}
pub struct FileStore {
files: HashMap<String, StoredFile>,
storage_dir: PathBuf,
max_storage_size: u64,
}
impl FileStore {
pub(crate) async fn load(storage_dir: PathBuf, max_storage_size: u64) -> std::io::Result<Self> {
let open_result = File::open(storage_dir.join(STATE_FILE_NAME)).await;
match open_result {
Ok(mut f) => {
let mut buf = String::new();
f.read_to_string(&mut buf).await?;
let map: HashMap<String, StoredFile> = serde_json::from_str(&buf)?;
info!("Loaded {} file entries from persistent storage", map.len());
let mut filtered: HashMap<String, StoredFile> = HashMap::new();
for (key, info) in map.into_iter() {
// Handle this case separately, because we don't
// want to try to delete it if it's not the sort
// of path we're expecting
if !is_valid_storage_code(&key) {
error!("Invalid key in persistent storage: {}", key);
continue;
}
if is_valid_entry(&key, &info, &storage_dir).await {
filtered.insert(key, info);
} else {
info!("Deleting file {}", key);
delete_file_if_exists(&storage_dir.join(&key)).await?;
}
}
let mut loaded = Self {
files: filtered,
storage_dir,
max_storage_size,
};
loaded.save().await?;
Ok(loaded)
impl crate::AppData {
// This must only be run at startup, or it will delete in-progress
// uploads.
pub async fn cleanup(&self) -> std::io::Result<()> {
let mut store = self.state.write().await;
let old = std::mem::take(store.deref_mut());
for (key, info) in old.0.into_iter() {
// Handle this case separately, because we don't
// want to try to delete it if it's not the sort
// of path we're expecting
if !is_valid_storage_code(&key) {
error!("Invalid key in persistent storage: {}", key);
continue;
}
Err(e) => {
if let ErrorKind::NotFound = e.kind() {
Ok(Self {
files: HashMap::new(),
storage_dir,
max_storage_size,
})
} else {
Err(e)
}
if is_valid_entry(&key, &info, &self.config.storage_dir).await {
store.0.insert(key, info);
} else {
info!("Deleting file {}", key);
delete_file_if_exists(&self.config.storage_dir.join(&key)).await?;
}
}
}
fn total_size(&self) -> u64 {
self.files.iter().fold(0, |acc, (_, f)| acc + f.size)
}
pub fn available_size(&self) -> u64 {
self.max_storage_size.saturating_sub(self.total_size())
}
async fn save(&mut self) -> std::io::Result<()> {
info!("saving updated state: {} entries", self.files.len());
File::create(self.storage_dir.join(STATE_FILE_NAME))
.await?
.write_all(&serde_json::to_vec_pretty(&self.files)?)
.await
}
pub fn full(&self) -> bool {
self.available_size() == 0 || self.files.len() >= MAX_STORAGE_FILES
Ok(())
}
/// Attempts to add a file to the store. Returns an I/O error if
/// something's broken, or a u64 of the maximum allowed file size
/// if the file was too big, or a unit if everything worked.
pub(crate) async fn add_file(
&mut self,
pub async fn add_file(
&self,
key: String,
file: StoredFile,
) -> Result<(), FileAddError> {
if self.full() {
let mut store = self.state.write().await;
if store.full(self.config.max_storage_size) {
return Err(FileAddError::Full);
}
let available_size = self.available_size();
let available_size = store.available_size(self.config.max_storage_size);
if file.size > available_size {
return Err(FileAddError::TooBig(available_size));
}
self.files.insert(key, file);
self.save().await?;
store.0.insert(key, file);
Ok(())
}
pub(crate) fn lookup_file(&self, key: &str) -> Option<StoredFile> {
self.files.get(key).cloned()
}
pub(crate) async fn remove_file(&mut self, key: &str) -> std::io::Result<()> {
pub async fn remove_file(&self, key: &str) -> std::io::Result<()> {
debug!("removing entry {} from state", key);
self.files.remove(key);
self.save().await?;
let mut store = self.state.write().await;
store.0.remove(key);
if is_valid_storage_code(key) {
delete_file_if_exists(&self.storage_dir.join(key)).await?;
delete_file_if_exists(&self.config.storage_dir.join(key)).await?;
}
Ok(())
}
pub(crate) async fn remove_expired_files(&mut self) -> std::io::Result<()> {
pub async fn remove_expired_files(&self) -> std::io::Result<()> {
info!("Checking for expired files");
let now = OffsetDateTime::now_utc();
for (key, file) in std::mem::take(&mut self.files).into_iter() {
let mut store = self.state.write().await;
let old = std::mem::take(store.deref_mut());
for (key, file) in old.0.into_iter() {
if file.expiry > now {
self.files.insert(key, file);
store.0.insert(key, file);
} else {
info!("Deleting expired file {}", key);
delete_file_if_exists(&self.storage_dir.join(&key)).await?;
delete_file_if_exists(&self.config.storage_dir.join(&key)).await?;
}
}
self.save().await
Ok(())
}
}
impl StoredFiles {
fn total_size(&self) -> u64 {
self.0.iter().fold(0, |acc, (_, f)| acc + f.size)
}
pub fn available_size(&self, max_storage_size: u64) -> u64 {
max_storage_size.saturating_sub(self.total_size())
}
pub fn full(&self, max_storage_size: u64) -> bool {
self.available_size(max_storage_size) == 0 || self.0.len() >= MAX_STORAGE_FILES
}
}

View file

@ -15,7 +15,7 @@ use crate::{
log_auth_failure,
store::{self, FileAddError, StoredFile},
zip::FileSet,
AppState,
AppData,
};
const MAX_FILES: usize = 256;
@ -86,17 +86,17 @@ impl Error {
pub struct Uploader {
writer: Option<Box<dyn Write>>,
storage_filename: String,
app_state: web::Data<AppState>,
app_data: web::Data<AppData>,
bytes_remaining: u64,
ip_addr: String,
}
impl Uploader {
pub(crate) fn new(app_state: web::Data<AppState>, ip_addr: String) -> Self {
pub(crate) fn new(app_data: web::Data<AppData>, ip_addr: String) -> Self {
Self {
writer: None,
storage_filename: store::gen_storage_code(app_state.config.mnemonic_codes),
app_state,
storage_filename: store::gen_storage_code(app_data.config.mnemonic_codes),
app_data,
bytes_remaining: 0,
ip_addr,
}
@ -109,7 +109,7 @@ impl Actor for Uploader {
type Context = <Uploader as Actor>::Context;
#[derive(Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct UploadedFile {
pub name: String,
pub size: u64,
@ -251,8 +251,8 @@ impl Uploader {
if std::env::var("TRANSBEAM_UPLOAD_PASSWORD") != Ok(password) {
return Err(Error::IncorrectPassword);
}
if lifetime > self.app_state.config.max_lifetime {
return Err(Error::TooLong(self.app_state.config.max_lifetime));
if lifetime > self.app_data.config.max_lifetime {
return Err(Error::TooLong(self.app_data.config.max_lifetime));
}
info!("Received file list: {} files", raw_files.len());
debug!("{:?}", raw_files);
@ -277,11 +277,11 @@ impl Uploader {
self.bytes_remaining += file.size;
files.push(file);
}
if self.bytes_remaining > self.app_state.config.max_upload_size {
return Err(Error::TooBig(self.app_state.config.max_upload_size));
if self.bytes_remaining > self.app_data.config.max_upload_size {
return Err(Error::TooBig(self.app_data.config.max_upload_size));
}
let storage_path = self
.app_state
.app_data
.config
.storage_dir
.join(self.storage_filename.clone());
@ -329,15 +329,12 @@ impl Uploader {
expiry: OffsetDateTime::now_utc() + lifetime * time::Duration::DAY,
contents,
};
let state = self.app_state.clone();
let app_data = self.app_data.clone();
let storage_filename = self.storage_filename.clone();
ctx.spawn(
actix::fut::wrap_future(async move {
debug!("Spawned future to add entry {} to state", storage_filename);
state
.file_store
.write()
.await
app_data
.add_file(storage_filename, stored_file)
.await
})
@ -404,15 +401,12 @@ impl Uploader {
"Cleaning up after failed upload of {}",
self.storage_filename
);
let state = self.app_state.clone();
let app_data = self.app_data.clone();
let filename = self.storage_filename.clone();
ctx.wait(
actix::fut::wrap_future(async move {
debug!("Spawned future to remove entry {} from state", filename);
state
.file_store
.write()
.await
app_data
.remove_file(&filename)
.await
.unwrap();

View file

@ -28,7 +28,7 @@ const EOCD_TOTAL_SIZE: u64 = EOCD64_RECORD_SIZE + EOCD64_LOCATOR_SIZE + EOCD_REC
const EMPTY_STRING_CRC32: u32 = 0;
#[derive(Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileSet {
pub files: Vec<UploadedFile>,
// Optional for backwards compatibility only