better cleanup of storage state
This commit is contained in:
parent
fa1917ac17
commit
32738e4515
71
src/state.rs
71
src/state.rs
|
@ -1,6 +1,6 @@
|
||||||
use std::{collections::HashMap, io::ErrorKind};
|
use std::{collections::HashMap, io::ErrorKind};
|
||||||
|
|
||||||
use log::{error, info};
|
use log::{error, info, warn, debug};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::File,
|
fs::File,
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
|
@ -51,6 +51,36 @@ pub(crate) mod timestamp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn is_valid_entry(key: &str, info: &DownloadableFile) -> bool {
|
||||||
|
if !crate::util::is_ascii_alphanumeric(&key) {
|
||||||
|
error!("Invalid key in persistent storage: {}", key);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
let file = if let Ok(f) = File::open(storage_dir().join(&key)).await {
|
||||||
|
f
|
||||||
|
} else {
|
||||||
|
error!(
|
||||||
|
"Unable to open file {} referenced in persistent storage",
|
||||||
|
key
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
let metadata = if let Ok(md) = file.metadata().await {
|
||||||
|
md
|
||||||
|
} else {
|
||||||
|
error!(
|
||||||
|
"Unable to get metadata for file {} referenced in persistent storage",
|
||||||
|
key
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
if metadata.len() != info.size {
|
||||||
|
error!("Mismatched file size for file {} referenced in persistent storage: expected {}, found {}", key, info.size, metadata.len());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) struct PersistentState(HashMap<String, DownloadableFile>);
|
pub(crate) struct PersistentState(HashMap<String, DownloadableFile>);
|
||||||
impl PersistentState {
|
impl PersistentState {
|
||||||
pub(crate) async fn load() -> std::io::Result<Self> {
|
pub(crate) async fn load() -> std::io::Result<Self> {
|
||||||
|
@ -63,35 +93,18 @@ impl PersistentState {
|
||||||
info!("Loaded {} file entries from persistent storage", map.len());
|
info!("Loaded {} file entries from persistent storage", map.len());
|
||||||
let mut filtered: HashMap<String, DownloadableFile> = HashMap::new();
|
let mut filtered: HashMap<String, DownloadableFile> = HashMap::new();
|
||||||
for (key, info) in map.into_iter() {
|
for (key, info) in map.into_iter() {
|
||||||
if !crate::util::is_ascii_alphanumeric(&key) {
|
if is_valid_entry(&key, &info).await {
|
||||||
error!("Invalid key in persistent storage: {}", key);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let file = if let Ok(f) = File::open(storage_dir().join(&key)).await {
|
|
||||||
f
|
|
||||||
} else {
|
|
||||||
error!(
|
|
||||||
"Unable to open file {} referenced in persistent storage",
|
|
||||||
key
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let metadata = if let Ok(md) = file.metadata().await {
|
|
||||||
md
|
|
||||||
} else {
|
|
||||||
error!(
|
|
||||||
"Unable to get metadata for file {} referenced in persistent storage",
|
|
||||||
key
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
if metadata.len() != info.size {
|
|
||||||
error!("Mismatched file size for file {} referenced in persistent storage: expected {}, found {}", key, info.size, metadata.len());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
filtered.insert(key, info);
|
filtered.insert(key, info);
|
||||||
|
} else {
|
||||||
|
info!("Deleting invalid file {}", key);
|
||||||
|
if let Err(e) = tokio::fs::remove_file(storage_dir().join(&key)).await {
|
||||||
|
warn!("Failed to delete invalid file {}: {}", key, e);
|
||||||
}
|
}
|
||||||
Ok(Self(filtered))
|
}
|
||||||
|
}
|
||||||
|
let mut loaded = Self(filtered);
|
||||||
|
loaded.save().await?;
|
||||||
|
Ok(loaded)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
if let ErrorKind::NotFound = e.kind() {
|
if let ErrorKind::NotFound = e.kind() {
|
||||||
|
@ -104,6 +117,7 @@ impl PersistentState {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save(&mut self) -> std::io::Result<()> {
|
async fn save(&mut self) -> std::io::Result<()> {
|
||||||
|
info!("saving updated state: {} entries", self.0.len());
|
||||||
File::create(storage_dir().join(STATE_FILE_NAME))
|
File::create(storage_dir().join(STATE_FILE_NAME))
|
||||||
.await?
|
.await?
|
||||||
.write_all(&serde_json::to_vec_pretty(&self.0)?)
|
.write_all(&serde_json::to_vec_pretty(&self.0)?)
|
||||||
|
@ -124,6 +138,7 @@ impl PersistentState {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn remove_file(&mut self, key: &str) -> std::io::Result<()> {
|
pub(crate) async fn remove_file(&mut self, key: &str) -> std::io::Result<()> {
|
||||||
|
debug!("removing entry {} from state", key);
|
||||||
self.0.remove(key);
|
self.0.remove(key);
|
||||||
self.save().await
|
self.save().await
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use std::{collections::HashSet, io::Write, task::Waker};
|
use std::{collections::HashSet, io::Write, task::Waker};
|
||||||
|
|
||||||
use actix::{Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler};
|
use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler};
|
||||||
use actix_http::ws::{CloseReason, Item};
|
use actix_http::ws::{CloseReason, Item};
|
||||||
use actix_web_actors::ws::{self, CloseCode};
|
use actix_web_actors::ws::{self, CloseCode};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
@ -9,7 +9,7 @@ use rand::distributions::{Alphanumeric, DistString};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
use crate::{file::LiveWriter, DownloadableFile, UploadedFile};
|
use crate::{file::LiveWriter, DownloadableFile, UploadedFile, storage_dir};
|
||||||
|
|
||||||
const MAX_FILES: usize = 256;
|
const MAX_FILES: usize = 256;
|
||||||
const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] =
|
const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] =
|
||||||
|
@ -115,7 +115,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Websocket error: {}", e);
|
error!("Websocket error: {}", e);
|
||||||
self.cleanup_after_error(ctx);
|
self.cleanup_after_error(ctx);
|
||||||
ctx.stop();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -128,7 +127,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||||
description: Some(e.to_string()),
|
description: Some(e.to_string()),
|
||||||
}));
|
}));
|
||||||
self.cleanup_after_error(ctx);
|
self.cleanup_after_error(ctx);
|
||||||
ctx.stop();
|
|
||||||
}
|
}
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
info!("Finished uploading data");
|
info!("Finished uploading data");
|
||||||
|
@ -140,9 +138,9 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||||
let data = self.app_data.clone();
|
let data = self.app_data.clone();
|
||||||
let filename = self.storage_filename.clone();
|
let filename = self.storage_filename.clone();
|
||||||
ctx.wait(actix::fut::wrap_future(async move {
|
ctx.wait(actix::fut::wrap_future(async move {
|
||||||
|
debug!("Spawned future to remove uploader from entry {} before stopping", filename);
|
||||||
data.write().await.remove_uploader(&filename);
|
data.write().await.remove_uploader(&filename);
|
||||||
}));
|
}).map(|_, _, ctx: &mut Self::Context| ctx.stop()));
|
||||||
ctx.stop();
|
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
}
|
}
|
||||||
|
@ -191,7 +189,7 @@ impl Uploader {
|
||||||
}
|
}
|
||||||
let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
|
let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
|
||||||
self.storage_filename = storage_filename.clone();
|
self.storage_filename = storage_filename.clone();
|
||||||
let storage_path = super::storage_dir().join(storage_filename.clone());
|
let storage_path = storage_dir().join(storage_filename.clone());
|
||||||
info!("storing to: {:?}", storage_path);
|
info!("storing to: {:?}", storage_path);
|
||||||
let writer = super::file::LiveFileWriter::new(&storage_path)?;
|
let writer = super::file::LiveFileWriter::new(&storage_path)?;
|
||||||
let addr = Some(ctx.address());
|
let addr = Some(ctx.address());
|
||||||
|
@ -225,6 +223,7 @@ impl Uploader {
|
||||||
self.writer = Some(writer);
|
self.writer = Some(writer);
|
||||||
let data = self.app_data.clone();
|
let data = self.app_data.clone();
|
||||||
ctx.spawn(actix::fut::wrap_future(async move {
|
ctx.spawn(actix::fut::wrap_future(async move {
|
||||||
|
debug!("Spawned future to add entry {} to state", storage_filename);
|
||||||
data.write()
|
data.write()
|
||||||
.await
|
.await
|
||||||
.add_file(storage_filename, downloadable_file)
|
.add_file(storage_filename, downloadable_file)
|
||||||
|
@ -275,10 +274,15 @@ impl Uploader {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cleanup_after_error(&mut self, ctx: &mut <Self as Actor>::Context) {
|
fn cleanup_after_error(&mut self, ctx: &mut <Self as Actor>::Context) {
|
||||||
|
info!("Cleaning up after failed upload of {}", self.storage_filename);
|
||||||
let data = self.app_data.clone();
|
let data = self.app_data.clone();
|
||||||
let filename = self.storage_filename.clone();
|
let filename = self.storage_filename.clone();
|
||||||
ctx.spawn(actix::fut::wrap_future(async move {
|
ctx.wait(actix::fut::wrap_future(async move {
|
||||||
|
debug!("Spawned future to remove entry {} from state", filename);
|
||||||
data.write().await.remove_file(&filename).await.unwrap();
|
data.write().await.remove_file(&filename).await.unwrap();
|
||||||
}));
|
}).map(|_, _, ctx: &mut <Self as Actor>::Context| ctx.stop()));
|
||||||
|
if let Err(e) = std::fs::remove_file(storage_dir().join(&self.storage_filename)) {
|
||||||
|
error!("Failed to remove file {}: {}", self.storage_filename, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue