Compare commits

...

2 commits

Author SHA1 Message Date
xenofem bda6da33e8 cargo clippy and fmt 2022-04-28 05:18:35 -04:00
xenofem 127d7e9c67 add persistent state 2022-04-28 05:13:14 -04:00
7 changed files with 249 additions and 61 deletions

View file

@ -4,8 +4,8 @@ use actix_web::{
body::{self, BoxBody, SizedStream}, body::{self, BoxBody, SizedStream},
http::{ http::{
header::{ header::{
self, ContentDisposition, DispositionParam, self, ContentDisposition, DispositionParam, DispositionType, EntityTag, HeaderValue,
DispositionType, HeaderValue, HttpDate, EntityTag, IfUnmodifiedSince, IfMatch, IfNoneMatch, IfModifiedSince, HttpDate, IfMatch, IfModifiedSince, IfNoneMatch, IfUnmodifiedSince,
}, },
StatusCode, StatusCode,
}, },
@ -52,7 +52,7 @@ impl DownloadingFile {
ContentDisposition { ContentDisposition {
disposition: DispositionType::Attachment, disposition: DispositionType::Attachment,
parameters: vec![DispositionParam::Filename(self.info.name)], parameters: vec![DispositionParam::Filename(self.info.name)],
} },
)); ));
res.insert_header((header::LAST_MODIFIED, last_modified)); res.insert_header((header::LAST_MODIFIED, last_modified));
res.insert_header((header::ETAG, etag)); res.insert_header((header::ETAG, etag));
@ -76,7 +76,12 @@ impl DownloadingFile {
res.insert_header(( res.insert_header((
header::CONTENT_RANGE, header::CONTENT_RANGE,
format!("bytes {}-{}/{}", offset, offset + length - 1, self.info.size), format!(
"bytes {}-{}/{}",
offset,
offset + length - 1,
self.info.size
),
)); ));
} else { } else {
res.insert_header((header::CONTENT_RANGE, format!("bytes */{}", length))); res.insert_header((header::CONTENT_RANGE, format!("bytes */{}", length)));
@ -122,7 +127,9 @@ fn precondition_failed(req: &HttpRequest, etag: &EntityTag, last_modified: &Http
fn not_modified(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool { fn not_modified(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool {
match req.get_header::<IfNoneMatch>() { match req.get_header::<IfNoneMatch>() {
Some(IfNoneMatch::Any) => { return true; } Some(IfNoneMatch::Any) => {
return true;
}
Some(IfNoneMatch::Items(ref items)) => { Some(IfNoneMatch::Items(ref items)) => {
return items.iter().any(|item| item.weak_eq(etag)); return items.iter().any(|item| item.weak_eq(etag));
} }

View file

@ -1,4 +1,12 @@
use std::{cmp, fs::File, future::Future, io::{self, Write}, path::PathBuf, pin::Pin, task::{Context, Poll, Waker}}; use std::{
cmp,
fs::File,
future::Future,
io::{self, Write},
path::PathBuf,
pin::Pin,
task::{Context, Poll, Waker},
};
use actix::Addr; use actix::Addr;
use actix_web::error::{Error, ErrorInternalServerError}; use actix_web::error::{Error, ErrorInternalServerError};
@ -53,7 +61,6 @@ impl Write for LiveFileWriter {
} }
} }
// This implementation of a file responder is copied pretty directly // This implementation of a file responder is copied pretty directly
// from actix-files with some tweaks // from actix-files with some tweaks
@ -104,13 +111,19 @@ async fn live_file_reader_callback(
use io::{Read as _, Seek as _}; use io::{Read as _, Seek as _};
let res = actix_web::web::block(move || { let res = actix_web::web::block(move || {
trace!("reading up to {} bytes of file starting at {}", max_bytes, offset); trace!(
"reading up to {} bytes of file starting at {}",
max_bytes,
offset
);
let mut buf = Vec::with_capacity(max_bytes); let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?; file.seek(io::SeekFrom::Start(offset))?;
let n_bytes = std::io::Read::by_ref(&mut file).take(max_bytes as u64).read_to_end(&mut buf)?; let n_bytes = std::io::Read::by_ref(&mut file)
.take(max_bytes as u64)
.read_to_end(&mut buf)?;
trace!("got {} bytes from file", n_bytes); trace!("got {} bytes from file", n_bytes);
if n_bytes == 0 { if n_bytes == 0 {
Err(io::Error::from(io::ErrorKind::UnexpectedEof)) Err(io::Error::from(io::ErrorKind::UnexpectedEof))
@ -141,12 +154,14 @@ where
if size == counter { if size == counter {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
let inner_file = file let inner_file = file.take().expect("LiveFileReader polled after completion");
.take()
.expect("LiveFileReader polled after completion");
if offset >= *this.available_file_size { if offset >= *this.available_file_size {
trace!("offset {} has reached available file size {}, updating metadata", offset, this.available_file_size); trace!(
"offset {} has reached available file size {}, updating metadata",
offset,
this.available_file_size
);
// If we've hit the end of what was available // If we've hit the end of what was available
// last time we checked, check again // last time we checked, check again
*this.available_file_size = match inner_file.metadata() { *this.available_file_size = match inner_file.metadata() {
@ -165,15 +180,25 @@ where
if let Ok(()) = addr.try_send(WakerMessage(cx.waker().clone())) { if let Ok(()) = addr.try_send(WakerMessage(cx.waker().clone())) {
return Poll::Pending; return Poll::Pending;
} else { } else {
return Poll::Ready(Some(Err(ErrorInternalServerError("Failed to contact file upload actor")))); return Poll::Ready(Some(Err(ErrorInternalServerError(
"Failed to contact file upload actor",
))));
} }
} else { } else {
return Poll::Ready(Some(Err(ErrorInternalServerError("File upload was not completed")))); return Poll::Ready(Some(Err(ErrorInternalServerError(
"File upload was not completed",
))));
} }
} }
} }
let max_bytes = cmp::min(65_536, cmp::min(size.saturating_sub(counter), this.available_file_size.saturating_sub(offset))) as usize; let max_bytes = cmp::min(
65_536,
cmp::min(
size.saturating_sub(counter),
this.available_file_size.saturating_sub(offset),
),
) as usize;
let fut = (this.callback)(inner_file, offset, max_bytes); let fut = (this.callback)(inner_file, offset, max_bytes);

View file

@ -1,23 +1,23 @@
mod download; mod download;
mod file; mod file;
mod state;
mod upload; mod upload;
mod util;
mod zip; mod zip;
use std::{ use std::{fs::File, path::PathBuf};
collections::HashMap,
path::PathBuf,
fs::File,
};
use actix::Addr; use actix::Addr;
use actix_web::{ use actix_web::{
get, middleware::Logger, web, App, HttpRequest, HttpServer, Responder, HttpResponse, get, middleware::Logger, web, App, HttpRequest, HttpResponse, HttpServer, Responder,
}; };
use actix_web_actors::ws; use actix_web_actors::ws;
use serde::{Deserialize, Serialize};
use state::PersistentState;
use time::OffsetDateTime; use time::OffsetDateTime;
use tokio::sync::RwLock; use tokio::sync::RwLock;
const APP_NAME: &'static str = "transbeam"; const APP_NAME: &str = "transbeam";
pub struct UploadedFile { pub struct UploadedFile {
name: String, name: String,
@ -35,33 +35,40 @@ impl UploadedFile {
} }
} }
#[derive(Clone)] #[derive(Clone, Deserialize, Serialize)]
pub struct DownloadableFile { pub struct DownloadableFile {
name: String, name: String,
size: u64, size: u64,
#[serde(with = "state::timestamp")]
modtime: OffsetDateTime, modtime: OffsetDateTime,
#[serde(skip)]
uploader: Option<Addr<upload::Uploader>>, uploader: Option<Addr<upload::Uploader>>,
} }
type AppData = web::Data<RwLock<HashMap<String, DownloadableFile>>>; type AppData = web::Data<RwLock<PersistentState>>;
fn storage_dir() -> PathBuf { fn storage_dir() -> PathBuf {
PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from("storage"))) PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from("storage")))
} }
#[get("/download/{file_code}")] #[get("/download/{file_code}")]
async fn handle_download(req: HttpRequest, path: web::Path<String>, data: AppData) -> actix_web::Result<HttpResponse> { async fn handle_download(
req: HttpRequest,
path: web::Path<String>,
data: AppData,
) -> actix_web::Result<HttpResponse> {
let file_code = path.into_inner(); let file_code = path.into_inner();
if !file_code.as_bytes().iter().all(|c| c.is_ascii_alphanumeric()) { if !util::is_ascii_alphanumeric(&file_code) {
return Ok(HttpResponse::NotFound().finish()); return Ok(HttpResponse::NotFound().finish());
} }
let data = data.read().await; let data = data.read().await;
let info = data.get(&file_code); let info = data.lookup_file(&file_code);
if let Some(info) = info { if let Some(info) = info {
Ok(download::DownloadingFile { Ok(download::DownloadingFile {
file: File::open(storage_dir().join(file_code))?, file: File::open(storage_dir().join(file_code))?,
info: info.clone(), info,
}.into_response(&req)) }
.into_response(&req))
} else { } else {
Ok(HttpResponse::NotFound().finish()) Ok(HttpResponse::NotFound().finish())
} }
@ -76,7 +83,7 @@ async fn handle_upload(req: HttpRequest, stream: web::Payload, data: AppData) ->
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
env_logger::init(); env_logger::init();
let data: AppData = web::Data::new(RwLock::new(HashMap::new())); let data: AppData = web::Data::new(RwLock::new(PersistentState::load().await?));
let static_dir = let static_dir =
PathBuf::from(std::env::var("STATIC_DIR").unwrap_or_else(|_| String::from("static"))); PathBuf::from(std::env::var("STATIC_DIR").unwrap_or_else(|_| String::from("static")));

132
src/state.rs Normal file
View file

@ -0,0 +1,132 @@
use std::{collections::HashMap, io::ErrorKind};
use log::{error, info};
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
};
use crate::{storage_dir, DownloadableFile};
const STATE_FILE_NAME: &str = "files.json";
pub(crate) mod timestamp {
use core::fmt;
use serde::{de::Visitor, Deserializer, Serializer};
use time::OffsetDateTime;
pub(crate) fn serialize<S: Serializer>(
time: &OffsetDateTime,
ser: S,
) -> Result<S::Ok, S::Error> {
ser.serialize_i64(time.unix_timestamp())
}
struct I64Visitor;
impl<'de> Visitor<'de> for I64Visitor {
type Value = i64;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
write!(formatter, "a signed integer")
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> {
Ok(v)
}
}
pub(crate) fn deserialize<'de, D: Deserializer<'de>>(
de: D,
) -> Result<OffsetDateTime, D::Error> {
Ok(
OffsetDateTime::from_unix_timestamp(de.deserialize_i64(I64Visitor)?)
.unwrap_or_else(|_| OffsetDateTime::now_utc()),
)
}
}
pub(crate) struct PersistentState(HashMap<String, DownloadableFile>);
impl PersistentState {
pub(crate) async fn load() -> std::io::Result<Self> {
let open_result = File::open(storage_dir().join(STATE_FILE_NAME)).await;
match open_result {
Ok(mut f) => {
let mut buf = String::new();
f.read_to_string(&mut buf).await?;
let map: HashMap<String, DownloadableFile> = serde_json::from_str(&buf)?;
info!("Loaded {} file entries from persistent storage", map.len());
let mut filtered: HashMap<String, DownloadableFile> = HashMap::new();
for (key, info) in map.into_iter() {
if !crate::util::is_ascii_alphanumeric(&key) {
error!("Invalid key in persistent storage: {}", key);
continue;
}
let file = if let Ok(f) = File::open(storage_dir().join(&key)).await {
f
} else {
error!(
"Unable to open file {} referenced in persistent storage",
key
);
continue;
};
let metadata = if let Ok(md) = file.metadata().await {
md
} else {
error!(
"Unable to get metadata for file {} referenced in persistent storage",
key
);
continue;
};
if metadata.len() != info.size {
error!("Mismatched file size for file {} referenced in persistent storage: expected {}, found {}", key, info.size, metadata.len());
continue;
}
filtered.insert(key, info);
}
Ok(Self(filtered))
}
Err(e) => {
if let ErrorKind::NotFound = e.kind() {
Ok(Self(HashMap::new()))
} else {
Err(e)
}
}
}
}
async fn save(&mut self) -> std::io::Result<()> {
File::create(storage_dir().join(STATE_FILE_NAME))
.await?
.write_all(&serde_json::to_vec_pretty(&self.0)?)
.await
}
pub(crate) async fn add_file(
&mut self,
key: String,
file: DownloadableFile,
) -> std::io::Result<()> {
self.0.insert(key, file);
self.save().await
}
pub(crate) fn lookup_file(&self, key: &str) -> Option<DownloadableFile> {
self.0.get(key).cloned()
}
pub(crate) async fn remove_file(&mut self, key: &str) -> std::io::Result<()> {
self.0.remove(key);
self.save().await
}
pub(crate) fn remove_uploader(&mut self, key: &str) {
if let Some(f) = self.0.get_mut(key) {
f.uploader.take();
}
}
}

View file

@ -1,6 +1,6 @@
use std::{collections::HashSet, io::Write, task::Waker}; use std::{collections::HashSet, io::Write, task::Waker};
use actix::{Actor, ActorContext, AsyncContext, StreamHandler, Message, Handler}; use actix::{Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler};
use actix_http::ws::{CloseReason, Item}; use actix_http::ws::{CloseReason, Item};
use actix_web_actors::ws::{self, CloseCode}; use actix_web_actors::ws::{self, CloseCode};
use bytes::Bytes; use bytes::Bytes;
@ -61,7 +61,7 @@ pub struct Uploader {
} }
impl Uploader { impl Uploader {
pub fn new(app_data: super::AppData) -> Self { pub(crate) fn new(app_data: super::AppData) -> Self {
Self { Self {
writer: None, writer: None,
storage_filename: String::new(), storage_filename: String::new(),
@ -82,7 +82,11 @@ pub(crate) struct WakerMessage(pub Waker);
impl Handler<WakerMessage> for Uploader { impl Handler<WakerMessage> for Uploader {
type Result = (); type Result = ();
fn handle(&mut self, msg: WakerMessage, _: &mut Self::Context) { fn handle(&mut self, msg: WakerMessage, _: &mut Self::Context) {
self.writer.as_mut().map(|w| w.add_waker(msg.0)); if let Some(w) = self.writer.as_mut() {
w.add_waker(msg.0);
} else {
error!("Got a wakeup request before creating a file");
}
} }
} }
@ -110,6 +114,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
Ok(m) => m, Ok(m) => m,
Err(e) => { Err(e) => {
error!("Websocket error: {}", e); error!("Websocket error: {}", e);
self.cleanup_after_error(ctx);
ctx.stop(); ctx.stop();
return; return;
} }
@ -122,6 +127,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
code: e.close_code(), code: e.close_code(),
description: Some(e.to_string()), description: Some(e.to_string()),
})); }));
self.cleanup_after_error(ctx);
ctx.stop(); ctx.stop();
} }
Ok(true) => { Ok(true) => {
@ -134,9 +140,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
let data = self.app_data.clone(); let data = self.app_data.clone();
let filename = self.storage_filename.clone(); let filename = self.storage_filename.clone();
ctx.wait(actix::fut::wrap_future(async move { ctx.wait(actix::fut::wrap_future(async move {
if let Some(f) = data.write().await.get_mut(&filename) { data.write().await.remove_uploader(&filename);
f.uploader.take();
}
})); }));
ctx.stop(); ctx.stop();
} }
@ -198,36 +202,38 @@ impl Uploader {
let size = zip_writer.total_size(); let size = zip_writer.total_size();
let download_filename = let download_filename =
super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip"; super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip";
(Box::new(zip_writer), DownloadableFile { (
Box::new(zip_writer),
DownloadableFile {
name: download_filename, name: download_filename,
size, size,
modtime: now, modtime: now,
uploader: addr, uploader: addr,
}) },
)
} else { } else {
(Box::new(writer), DownloadableFile { (
Box::new(writer),
DownloadableFile {
name: files[0].name.clone(), name: files[0].name.clone(),
size: files[0].size, size: files[0].size,
modtime: files[0].modtime, modtime: files[0].modtime,
uploader: addr, uploader: addr,
}) },
)
}; };
self.writer = Some(writer); self.writer = Some(writer);
let data = self.app_data.clone(); let data = self.app_data.clone();
let storage_filename_copy = storage_filename.clone();
ctx.spawn(actix::fut::wrap_future(async move { ctx.spawn(actix::fut::wrap_future(async move {
data data.write()
.write()
.await .await
.insert( .add_file(storage_filename, downloadable_file)
storage_filename_copy, .await
downloadable_file, .unwrap();
);
})); }));
ctx.text(self.storage_filename.as_str()); ctx.text(self.storage_filename.as_str());
} }
ws::Message::Binary(data) ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => {
| ws::Message::Continuation(Item::Last(data)) => {
let result = self.handle_data(data)?; let result = self.handle_data(data)?;
ack(ctx); ack(ctx);
return Ok(result); return Ok(result);
@ -267,4 +273,12 @@ impl Uploader {
Err(Error::UnexpectedMessageType) Err(Error::UnexpectedMessageType)
} }
} }
fn cleanup_after_error(&mut self, ctx: &mut <Self as Actor>::Context) {
let data = self.app_data.clone();
let filename = self.storage_filename.clone();
ctx.spawn(actix::fut::wrap_future(async move {
data.write().await.remove_file(&filename).await.unwrap();
}));
}
} }

3
src/util.rs Normal file
View file

@ -0,0 +1,3 @@
pub(crate) fn is_ascii_alphanumeric(s: &str) -> bool {
s.as_bytes().iter().all(|c| c.is_ascii_alphanumeric())
}