diff --git a/src/state.rs b/src/state.rs index 0405245..a71a050 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, io::ErrorKind}; -use log::{error, info}; +use log::{error, info, warn, debug}; use tokio::{ fs::File, io::{AsyncReadExt, AsyncWriteExt}, @@ -51,6 +51,36 @@ pub(crate) mod timestamp { } } +async fn is_valid_entry(key: &str, info: &DownloadableFile) -> bool { + if !crate::util::is_ascii_alphanumeric(&key) { + error!("Invalid key in persistent storage: {}", key); + return false; + } + let file = if let Ok(f) = File::open(storage_dir().join(&key)).await { + f + } else { + error!( + "Unable to open file {} referenced in persistent storage", + key + ); + return false; + }; + let metadata = if let Ok(md) = file.metadata().await { + md + } else { + error!( + "Unable to get metadata for file {} referenced in persistent storage", + key + ); + return false; + }; + if metadata.len() != info.size { + error!("Mismatched file size for file {} referenced in persistent storage: expected {}, found {}", key, info.size, metadata.len()); + return false; + } + true +} + pub(crate) struct PersistentState(HashMap); impl PersistentState { pub(crate) async fn load() -> std::io::Result { @@ -63,35 +93,18 @@ impl PersistentState { info!("Loaded {} file entries from persistent storage", map.len()); let mut filtered: HashMap = HashMap::new(); for (key, info) in map.into_iter() { - if !crate::util::is_ascii_alphanumeric(&key) { - error!("Invalid key in persistent storage: {}", key); - continue; - } - let file = if let Ok(f) = File::open(storage_dir().join(&key)).await { - f + if is_valid_entry(&key, &info).await { + filtered.insert(key, info); } else { - error!( - "Unable to open file {} referenced in persistent storage", - key - ); - continue; - }; - let metadata = if let Ok(md) = file.metadata().await { - md - } else { - error!( - "Unable to get metadata for file {} referenced in persistent storage", - key - ); - continue; - }; - if metadata.len() != info.size { - error!("Mismatched file size for file {} referenced in persistent storage: expected {}, found {}", key, info.size, metadata.len()); - continue; + info!("Deleting invalid file {}", key); + if let Err(e) = tokio::fs::remove_file(storage_dir().join(&key)).await { + warn!("Failed to delete invalid file {}: {}", key, e); + } } - filtered.insert(key, info); } - Ok(Self(filtered)) + let mut loaded = Self(filtered); + loaded.save().await?; + Ok(loaded) } Err(e) => { if let ErrorKind::NotFound = e.kind() { @@ -104,6 +117,7 @@ impl PersistentState { } async fn save(&mut self) -> std::io::Result<()> { + info!("saving updated state: {} entries", self.0.len()); File::create(storage_dir().join(STATE_FILE_NAME)) .await? .write_all(&serde_json::to_vec_pretty(&self.0)?) @@ -124,6 +138,7 @@ impl PersistentState { } pub(crate) async fn remove_file(&mut self, key: &str) -> std::io::Result<()> { + debug!("removing entry {} from state", key); self.0.remove(key); self.save().await } diff --git a/src/upload.rs b/src/upload.rs index a37efde..d8f3003 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -1,6 +1,6 @@ use std::{collections::HashSet, io::Write, task::Waker}; -use actix::{Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler}; +use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler}; use actix_http::ws::{CloseReason, Item}; use actix_web_actors::ws::{self, CloseCode}; use bytes::Bytes; @@ -9,7 +9,7 @@ use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; use time::OffsetDateTime; -use crate::{file::LiveWriter, DownloadableFile, UploadedFile}; +use crate::{file::LiveWriter, DownloadableFile, UploadedFile, storage_dir}; const MAX_FILES: usize = 256; const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] = @@ -115,7 +115,6 @@ impl StreamHandler> for Uploader { Err(e) => { error!("Websocket error: {}", e); self.cleanup_after_error(ctx); - ctx.stop(); return; } }; @@ -128,7 +127,6 @@ impl StreamHandler> for Uploader { description: Some(e.to_string()), })); self.cleanup_after_error(ctx); - ctx.stop(); } Ok(true) => { info!("Finished uploading data"); @@ -140,9 +138,9 @@ impl StreamHandler> for Uploader { let data = self.app_data.clone(); let filename = self.storage_filename.clone(); ctx.wait(actix::fut::wrap_future(async move { + debug!("Spawned future to remove uploader from entry {} before stopping", filename); data.write().await.remove_uploader(&filename); - })); - ctx.stop(); + }).map(|_, _, ctx: &mut Self::Context| ctx.stop())); } _ => (), } @@ -191,7 +189,7 @@ impl Uploader { } let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8); self.storage_filename = storage_filename.clone(); - let storage_path = super::storage_dir().join(storage_filename.clone()); + let storage_path = storage_dir().join(storage_filename.clone()); info!("storing to: {:?}", storage_path); let writer = super::file::LiveFileWriter::new(&storage_path)?; let addr = Some(ctx.address()); @@ -225,6 +223,7 @@ impl Uploader { self.writer = Some(writer); let data = self.app_data.clone(); ctx.spawn(actix::fut::wrap_future(async move { + debug!("Spawned future to add entry {} to state", storage_filename); data.write() .await .add_file(storage_filename, downloadable_file) @@ -275,10 +274,15 @@ impl Uploader { } fn cleanup_after_error(&mut self, ctx: &mut ::Context) { + info!("Cleaning up after failed upload of {}", self.storage_filename); let data = self.app_data.clone(); let filename = self.storage_filename.clone(); - ctx.spawn(actix::fut::wrap_future(async move { + ctx.wait(actix::fut::wrap_future(async move { + debug!("Spawned future to remove entry {} from state", filename); data.write().await.remove_file(&filename).await.unwrap(); - })); + }).map(|_, _, ctx: &mut ::Context| ctx.stop())); + if let Err(e) = std::fs::remove_file(storage_dir().join(&self.storage_filename)) { + error!("Failed to remove file {}: {}", self.storage_filename, e); + } } }