add persistent state
This commit is contained in:
parent
70384b04c3
commit
127d7e9c67
17
src/main.rs
17
src/main.rs
|
@ -1,10 +1,11 @@
|
||||||
mod download;
|
mod download;
|
||||||
mod file;
|
mod file;
|
||||||
|
mod state;
|
||||||
mod upload;
|
mod upload;
|
||||||
|
mod util;
|
||||||
mod zip;
|
mod zip;
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
fs::File,
|
fs::File,
|
||||||
};
|
};
|
||||||
|
@ -14,6 +15,8 @@ use actix_web::{
|
||||||
get, middleware::Logger, web, App, HttpRequest, HttpServer, Responder, HttpResponse,
|
get, middleware::Logger, web, App, HttpRequest, HttpServer, Responder, HttpResponse,
|
||||||
};
|
};
|
||||||
use actix_web_actors::ws;
|
use actix_web_actors::ws;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use state::PersistentState;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
|
@ -35,15 +38,17 @@ impl UploadedFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone, Deserialize, Serialize)]
|
||||||
pub struct DownloadableFile {
|
pub struct DownloadableFile {
|
||||||
name: String,
|
name: String,
|
||||||
size: u64,
|
size: u64,
|
||||||
|
#[serde(with = "state::timestamp")]
|
||||||
modtime: OffsetDateTime,
|
modtime: OffsetDateTime,
|
||||||
|
#[serde(skip)]
|
||||||
uploader: Option<Addr<upload::Uploader>>,
|
uploader: Option<Addr<upload::Uploader>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type AppData = web::Data<RwLock<HashMap<String, DownloadableFile>>>;
|
type AppData = web::Data<RwLock<PersistentState>>;
|
||||||
|
|
||||||
fn storage_dir() -> PathBuf {
|
fn storage_dir() -> PathBuf {
|
||||||
PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from("storage")))
|
PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from("storage")))
|
||||||
|
@ -52,11 +57,11 @@ fn storage_dir() -> PathBuf {
|
||||||
#[get("/download/{file_code}")]
|
#[get("/download/{file_code}")]
|
||||||
async fn handle_download(req: HttpRequest, path: web::Path<String>, data: AppData) -> actix_web::Result<HttpResponse> {
|
async fn handle_download(req: HttpRequest, path: web::Path<String>, data: AppData) -> actix_web::Result<HttpResponse> {
|
||||||
let file_code = path.into_inner();
|
let file_code = path.into_inner();
|
||||||
if !file_code.as_bytes().iter().all(|c| c.is_ascii_alphanumeric()) {
|
if !util::is_ascii_alphanumeric(&file_code) {
|
||||||
return Ok(HttpResponse::NotFound().finish());
|
return Ok(HttpResponse::NotFound().finish());
|
||||||
}
|
}
|
||||||
let data = data.read().await;
|
let data = data.read().await;
|
||||||
let info = data.get(&file_code);
|
let info = data.lookup_file(&file_code);
|
||||||
if let Some(info) = info {
|
if let Some(info) = info {
|
||||||
Ok(download::DownloadingFile {
|
Ok(download::DownloadingFile {
|
||||||
file: File::open(storage_dir().join(file_code))?,
|
file: File::open(storage_dir().join(file_code))?,
|
||||||
|
@ -76,7 +81,7 @@ async fn handle_upload(req: HttpRequest, stream: web::Payload, data: AppData) ->
|
||||||
async fn main() -> std::io::Result<()> {
|
async fn main() -> std::io::Result<()> {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let data: AppData = web::Data::new(RwLock::new(HashMap::new()));
|
let data: AppData = web::Data::new(RwLock::new(PersistentState::load().await?));
|
||||||
|
|
||||||
let static_dir =
|
let static_dir =
|
||||||
PathBuf::from(std::env::var("STATIC_DIR").unwrap_or_else(|_| String::from("static")));
|
PathBuf::from(std::env::var("STATIC_DIR").unwrap_or_else(|_| String::from("static")));
|
||||||
|
|
108
src/state.rs
Normal file
108
src/state.rs
Normal file
|
@ -0,0 +1,108 @@
|
||||||
|
use std::{collections::HashMap, io::ErrorKind};
|
||||||
|
|
||||||
|
use log::{info, error};
|
||||||
|
use tokio::{fs::File, io::{AsyncReadExt, AsyncWriteExt}};
|
||||||
|
|
||||||
|
use crate::{DownloadableFile, storage_dir};
|
||||||
|
|
||||||
|
const STATE_FILE_NAME: &str = "files.json";
|
||||||
|
|
||||||
|
pub(crate) mod timestamp {
|
||||||
|
use core::fmt;
|
||||||
|
|
||||||
|
use serde::{Serializer, Deserializer, de::Visitor};
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
|
pub(crate) fn serialize<S: Serializer>(time: &OffsetDateTime, ser: S) -> Result<S::Ok, S::Error> {
|
||||||
|
ser.serialize_i64(time.unix_timestamp())
|
||||||
|
}
|
||||||
|
|
||||||
|
struct I64Visitor;
|
||||||
|
|
||||||
|
impl<'de> Visitor<'de> for I64Visitor {
|
||||||
|
type Value = i64;
|
||||||
|
|
||||||
|
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(formatter, "a signed integer")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> {
|
||||||
|
Ok(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn deserialize<'de, D: Deserializer<'de>>(de: D) -> Result<OffsetDateTime, D::Error> {
|
||||||
|
Ok(OffsetDateTime::from_unix_timestamp(de.deserialize_i64(I64Visitor)?).unwrap_or_else(|_| OffsetDateTime::now_utc()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct PersistentState(HashMap<String, DownloadableFile>);
|
||||||
|
impl PersistentState {
|
||||||
|
pub(crate) async fn load() -> std::io::Result<Self> {
|
||||||
|
let open_result = File::open(storage_dir().join(STATE_FILE_NAME)).await;
|
||||||
|
match open_result {
|
||||||
|
Ok(mut f) => {
|
||||||
|
let mut buf = String::new();
|
||||||
|
f.read_to_string(&mut buf).await?;
|
||||||
|
let map: HashMap<String, DownloadableFile> = serde_json::from_str(&buf)?;
|
||||||
|
info!("Loaded {} file entries from persistent storage", map.len());
|
||||||
|
let mut filtered: HashMap<String, DownloadableFile> = HashMap::new();
|
||||||
|
for (key, info) in map.into_iter() {
|
||||||
|
if !crate::util::is_ascii_alphanumeric(&key) {
|
||||||
|
error!("Invalid key in persistent storage: {}", key);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let file = if let Ok(f) = File::open(storage_dir().join(&key)).await {
|
||||||
|
f
|
||||||
|
} else {
|
||||||
|
error!("Unable to open file {} referenced in persistent storage", key);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let metadata = if let Ok(md) = file.metadata().await {
|
||||||
|
md
|
||||||
|
} else {
|
||||||
|
error!("Unable to get metadata for file {} referenced in persistent storage", key);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
if metadata.len() != info.size {
|
||||||
|
error!("Mismatched file size for file {} referenced in persistent storage: expected {}, found {}", key, info.size, metadata.len());
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
filtered.insert(key, info);
|
||||||
|
}
|
||||||
|
Ok(Self(filtered))
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if let ErrorKind::NotFound = e.kind() {
|
||||||
|
Ok(Self(HashMap::new()))
|
||||||
|
} else {
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn save(&mut self) -> std::io::Result<()> {
|
||||||
|
File::create(storage_dir().join(STATE_FILE_NAME)).await?.write_all(&serde_json::to_vec_pretty(&self.0)?).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn add_file(&mut self, key: String, file: DownloadableFile) -> std::io::Result<()> {
|
||||||
|
self.0.insert(key, file);
|
||||||
|
self.save().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn lookup_file(&self, key: &str) -> Option<DownloadableFile> {
|
||||||
|
self.0.get(key).map(|f| f.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn remove_file(&mut self, key: &str) -> std::io::Result<()> {
|
||||||
|
self.0.remove(key);
|
||||||
|
self.save().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn remove_uploader(&mut self, key: &str) {
|
||||||
|
if let Some(f) = self.0.get_mut(key) {
|
||||||
|
f.uploader.take();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -61,7 +61,7 @@ pub struct Uploader {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Uploader {
|
impl Uploader {
|
||||||
pub fn new(app_data: super::AppData) -> Self {
|
pub(crate) fn new(app_data: super::AppData) -> Self {
|
||||||
Self {
|
Self {
|
||||||
writer: None,
|
writer: None,
|
||||||
storage_filename: String::new(),
|
storage_filename: String::new(),
|
||||||
|
@ -110,6 +110,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||||
Ok(m) => m,
|
Ok(m) => m,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Websocket error: {}", e);
|
error!("Websocket error: {}", e);
|
||||||
|
self.cleanup_after_error(ctx);
|
||||||
ctx.stop();
|
ctx.stop();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -122,6 +123,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||||
code: e.close_code(),
|
code: e.close_code(),
|
||||||
description: Some(e.to_string()),
|
description: Some(e.to_string()),
|
||||||
}));
|
}));
|
||||||
|
self.cleanup_after_error(ctx);
|
||||||
ctx.stop();
|
ctx.stop();
|
||||||
}
|
}
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
|
@ -134,9 +136,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||||
let data = self.app_data.clone();
|
let data = self.app_data.clone();
|
||||||
let filename = self.storage_filename.clone();
|
let filename = self.storage_filename.clone();
|
||||||
ctx.wait(actix::fut::wrap_future(async move {
|
ctx.wait(actix::fut::wrap_future(async move {
|
||||||
if let Some(f) = data.write().await.get_mut(&filename) {
|
data.write().await.remove_uploader(&filename);
|
||||||
f.uploader.take();
|
|
||||||
}
|
|
||||||
}));
|
}));
|
||||||
ctx.stop();
|
ctx.stop();
|
||||||
}
|
}
|
||||||
|
@ -219,10 +219,10 @@ impl Uploader {
|
||||||
data
|
data
|
||||||
.write()
|
.write()
|
||||||
.await
|
.await
|
||||||
.insert(
|
.add_file(
|
||||||
storage_filename_copy,
|
storage_filename_copy,
|
||||||
downloadable_file,
|
downloadable_file,
|
||||||
);
|
).await.unwrap();
|
||||||
}));
|
}));
|
||||||
ctx.text(self.storage_filename.as_str());
|
ctx.text(self.storage_filename.as_str());
|
||||||
}
|
}
|
||||||
|
@ -267,4 +267,15 @@ impl Uploader {
|
||||||
Err(Error::UnexpectedMessageType)
|
Err(Error::UnexpectedMessageType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn cleanup_after_error(&mut self, ctx: &mut <Self as Actor>::Context) {
|
||||||
|
let data = self.app_data.clone();
|
||||||
|
let filename = self.storage_filename.clone();
|
||||||
|
ctx.spawn(actix::fut::wrap_future(async move {
|
||||||
|
data
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.remove_file(&filename).await.unwrap();
|
||||||
|
}));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
3
src/util.rs
Normal file
3
src/util.rs
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
pub(crate) fn is_ascii_alphanumeric(s: &str) -> bool {
|
||||||
|
s.as_bytes().iter().all(|c| c.is_ascii_alphanumeric())
|
||||||
|
}
|
Loading…
Reference in a new issue