2022-04-29 22:36:44 -04:00
|
|
|
use std::{collections::HashSet, fs::File, io::Write};
|
2022-04-26 23:54:29 -04:00
|
|
|
|
2022-04-29 22:36:44 -04:00
|
|
|
use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, StreamHandler};
|
2022-04-27 00:55:36 -04:00
|
|
|
use actix_http::ws::{CloseReason, Item};
|
2022-04-26 23:54:29 -04:00
|
|
|
use actix_web_actors::ws::{self, CloseCode};
|
2022-04-27 20:15:51 -04:00
|
|
|
use bytes::Bytes;
|
2022-04-27 00:55:36 -04:00
|
|
|
use log::{debug, error, info, trace};
|
2022-04-26 23:54:29 -04:00
|
|
|
use rand::distributions::{Alphanumeric, DistString};
|
2022-04-30 01:38:26 -04:00
|
|
|
use serde::{Deserialize, Serialize};
|
2022-04-26 23:54:29 -04:00
|
|
|
use time::OffsetDateTime;
|
2022-04-30 15:14:28 -04:00
|
|
|
use unicode_normalization::UnicodeNormalization;
|
2022-04-26 23:54:29 -04:00
|
|
|
|
2022-04-30 01:38:26 -04:00
|
|
|
use crate::store::{storage_dir, StoredFile, self};
|
2022-04-26 23:54:29 -04:00
|
|
|
|
2022-04-28 00:27:22 -04:00
|
|
|
const MAX_FILES: usize = 256;
|
2022-04-26 23:54:29 -04:00
|
|
|
const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] =
|
|
|
|
time::macros::format_description!("[year]-[month]-[day]-[hour][minute][second]");
|
|
|
|
|
|
|
|
#[derive(thiserror::Error, Debug)]
|
|
|
|
enum Error {
|
|
|
|
#[error("Failed to parse file metadata")]
|
|
|
|
Parse(#[from] serde_json::Error),
|
|
|
|
#[error("Error writing to stored file")]
|
|
|
|
Storage(#[from] std::io::Error),
|
|
|
|
#[error("Time formatting error")]
|
|
|
|
TimeFormat(#[from] time::error::Format),
|
|
|
|
#[error("Duplicate filename could not be deduplicated")]
|
|
|
|
DuplicateFilename,
|
|
|
|
#[error("This message type was not expected at this stage")]
|
|
|
|
UnexpectedMessageType,
|
|
|
|
#[error("Metadata contained an empty list of files")]
|
|
|
|
NoFiles,
|
2022-04-28 00:27:22 -04:00
|
|
|
#[error("Number of files submitted by client exceeded the maximum limit")]
|
|
|
|
TooManyFiles,
|
2022-04-30 01:38:26 -04:00
|
|
|
#[error("Requested lifetime was too long")]
|
|
|
|
TooLong,
|
|
|
|
#[error("Upload size was too large, can be at most {0} bytes")]
|
|
|
|
TooBig(u64),
|
2022-04-26 23:54:29 -04:00
|
|
|
#[error("Websocket was closed by client before completing transfer")]
|
|
|
|
ClosedEarly(Option<CloseReason>),
|
|
|
|
#[error("Client sent more data than they were supposed to")]
|
2022-04-28 00:27:22 -04:00
|
|
|
UnexpectedExtraData,
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Error {
|
|
|
|
fn close_code(&self) -> CloseCode {
|
|
|
|
match self {
|
2022-04-30 01:38:26 -04:00
|
|
|
Self::Storage(_)
|
|
|
|
| Self::TimeFormat(_) => CloseCode::Error,
|
|
|
|
Self::Parse(_)
|
|
|
|
| Self::UnexpectedMessageType
|
|
|
|
| Self::ClosedEarly(_)
|
|
|
|
| Self::UnexpectedExtraData => CloseCode::Invalid,
|
|
|
|
Self::DuplicateFilename
|
|
|
|
| Self::NoFiles
|
|
|
|
| Self::TooManyFiles
|
|
|
|
| Self::TooLong
|
|
|
|
| Self::TooBig(_) => CloseCode::Policy,
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct Uploader {
|
2022-04-29 22:36:44 -04:00
|
|
|
writer: Option<Box<dyn Write>>,
|
2022-04-27 01:17:00 -04:00
|
|
|
storage_filename: String,
|
2022-04-26 23:54:29 -04:00
|
|
|
app_data: super::AppData,
|
2022-04-27 20:15:51 -04:00
|
|
|
bytes_remaining: u64,
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Uploader {
|
2022-04-28 05:13:14 -04:00
|
|
|
pub(crate) fn new(app_data: super::AppData) -> Self {
|
2022-04-26 23:54:29 -04:00
|
|
|
Self {
|
|
|
|
writer: None,
|
2022-04-30 01:38:26 -04:00
|
|
|
storage_filename: Alphanumeric.sample_string(&mut rand::thread_rng(), 8),
|
2022-04-26 23:54:29 -04:00
|
|
|
app_data,
|
|
|
|
bytes_remaining: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Actor for Uploader {
|
|
|
|
type Context = ws::WebsocketContext<Self>;
|
|
|
|
}
|
|
|
|
|
2022-04-30 01:38:26 -04:00
|
|
|
pub struct UploadedFile {
|
|
|
|
pub name: String,
|
|
|
|
pub size: u64,
|
|
|
|
pub modtime: OffsetDateTime,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl UploadedFile {
|
|
|
|
fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self {
|
|
|
|
Self {
|
2022-04-30 15:14:28 -04:00
|
|
|
name: sanitise_file_name::sanitise(&name.nfd().collect::<String>()),
|
2022-04-30 01:38:26 -04:00
|
|
|
size,
|
|
|
|
modtime,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-26 23:54:29 -04:00
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
|
struct RawUploadedFile {
|
|
|
|
name: String,
|
2022-04-27 20:15:51 -04:00
|
|
|
size: u64,
|
2022-04-26 23:54:29 -04:00
|
|
|
modtime: i64,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl RawUploadedFile {
|
|
|
|
fn process(&self) -> UploadedFile {
|
|
|
|
UploadedFile::new(
|
|
|
|
&self.name,
|
|
|
|
self.size,
|
|
|
|
OffsetDateTime::from_unix_timestamp(self.modtime / 1000)
|
|
|
|
.unwrap_or_else(|_| OffsetDateTime::now_utc()),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-30 01:38:26 -04:00
|
|
|
#[derive(Deserialize)]
|
|
|
|
struct UploadManifest {
|
|
|
|
files: Vec<RawUploadedFile>,
|
|
|
|
lifetime: u32,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Serialize)]
|
|
|
|
#[serde(rename_all = "snake_case", tag = "type")]
|
|
|
|
enum ServerMessage {
|
|
|
|
Ready { code: String },
|
|
|
|
TooBig { max_size: u64 },
|
|
|
|
TooLong { max_days: u32 },
|
|
|
|
Error { details: String },
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<&Error> for ServerMessage {
|
|
|
|
fn from(e: &Error) -> Self {
|
|
|
|
match e {
|
|
|
|
Error::TooBig(max_size) => ServerMessage::TooBig { max_size: *max_size },
|
|
|
|
Error::TooLong => ServerMessage::TooLong { max_days: store::max_lifetime() },
|
|
|
|
_ => ServerMessage::Error { details: e.to_string() },
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-29 22:36:44 -04:00
|
|
|
fn stop_and_flush<T>(_: T, u: &mut Uploader, ctx: &mut <Uploader as Actor>::Context) {
|
|
|
|
ctx.stop();
|
|
|
|
if let Some(w) = u.writer.as_mut() {
|
|
|
|
if let Err(e) = w.flush() {
|
|
|
|
error!("Failed to flush writer: {}", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-04-26 23:54:29 -04:00
|
|
|
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
|
|
|
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
|
|
|
let msg = match msg {
|
|
|
|
Ok(m) => m,
|
|
|
|
Err(e) => {
|
2022-04-28 00:27:22 -04:00
|
|
|
error!("Websocket error: {}", e);
|
2022-04-28 05:13:14 -04:00
|
|
|
self.cleanup_after_error(ctx);
|
2022-04-26 23:54:29 -04:00
|
|
|
return;
|
2022-04-27 00:55:36 -04:00
|
|
|
}
|
2022-04-26 23:54:29 -04:00
|
|
|
};
|
|
|
|
|
|
|
|
match self.handle_message(msg, ctx) {
|
|
|
|
Err(e) => {
|
2022-04-30 01:38:26 -04:00
|
|
|
self.notify_error_and_cleanup(e, ctx);
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|
|
|
|
Ok(true) => {
|
|
|
|
info!("Finished uploading data");
|
|
|
|
ctx.close(Some(ws::CloseReason {
|
|
|
|
code: CloseCode::Normal,
|
|
|
|
description: None,
|
|
|
|
}));
|
2022-04-29 22:36:44 -04:00
|
|
|
stop_and_flush((), self, ctx);
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|
2022-04-27 00:55:36 -04:00
|
|
|
_ => (),
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn ack(ctx: &mut <Uploader as Actor>::Context) {
|
|
|
|
ctx.text("ack");
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Uploader {
|
2022-04-30 01:38:26 -04:00
|
|
|
fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut <Self as Actor>::Context) {
|
|
|
|
error!("{}", e);
|
|
|
|
ctx.text(serde_json::to_string(&ServerMessage::from(&e)).unwrap());
|
|
|
|
ctx.close(Some(ws::CloseReason {
|
|
|
|
code: e.close_code(),
|
|
|
|
description: Some(e.to_string()),
|
|
|
|
}));
|
|
|
|
self.cleanup_after_error(ctx);
|
|
|
|
}
|
|
|
|
|
2022-04-27 00:55:36 -04:00
|
|
|
fn handle_message(
|
|
|
|
&mut self,
|
|
|
|
msg: ws::Message,
|
|
|
|
ctx: &mut <Self as Actor>::Context,
|
|
|
|
) -> Result<bool, Error> {
|
2022-04-26 23:54:29 -04:00
|
|
|
trace!("Websocket message: {:?}", msg);
|
|
|
|
match msg {
|
|
|
|
ws::Message::Text(text) => {
|
|
|
|
if self.writer.is_some() {
|
2022-04-27 00:55:36 -04:00
|
|
|
return Err(Error::UnexpectedMessageType);
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|
2022-04-30 01:38:26 -04:00
|
|
|
let UploadManifest { files: raw_files, lifetime, } = serde_json::from_slice(text.as_bytes())?;
|
|
|
|
if lifetime > store::max_lifetime() {
|
|
|
|
return Err(Error::TooLong);
|
|
|
|
}
|
2022-04-26 23:54:29 -04:00
|
|
|
info!("Received file list: {} files", raw_files.len());
|
|
|
|
debug!("{:?}", raw_files);
|
2022-04-28 00:27:22 -04:00
|
|
|
if raw_files.len() > MAX_FILES {
|
|
|
|
return Err(Error::TooManyFiles);
|
|
|
|
}
|
2022-04-30 01:38:26 -04:00
|
|
|
if raw_files.is_empty() {
|
|
|
|
return Err(Error::NoFiles);
|
|
|
|
}
|
2022-04-26 23:54:29 -04:00
|
|
|
let mut filenames: HashSet<String> = HashSet::new();
|
|
|
|
let mut files = Vec::new();
|
|
|
|
for raw_file in raw_files.iter() {
|
|
|
|
let mut file = raw_file.process();
|
|
|
|
while filenames.contains(&file.name) {
|
|
|
|
info!("Duplicate file name: {}", file.name);
|
|
|
|
if file.name.len() >= sanitise_file_name::Options::DEFAULT.length_limit {
|
|
|
|
return Err(Error::DuplicateFilename);
|
|
|
|
}
|
|
|
|
file.name.insert(0, '_');
|
|
|
|
}
|
|
|
|
filenames.insert(file.name.clone());
|
|
|
|
self.bytes_remaining += file.size;
|
|
|
|
files.push(file);
|
|
|
|
}
|
2022-04-30 01:38:26 -04:00
|
|
|
let storage_path = storage_dir().join(self.storage_filename.clone());
|
2022-04-26 23:54:29 -04:00
|
|
|
info!("storing to: {:?}", storage_path);
|
2022-04-29 22:36:44 -04:00
|
|
|
let writer = File::options()
|
|
|
|
.write(true)
|
|
|
|
.create_new(true)
|
|
|
|
.open(&storage_path)?;
|
2022-04-30 01:38:26 -04:00
|
|
|
let (writer, name, size, modtime): (Box<dyn Write>,_,_,_) = if files.len() > 1 {
|
2022-04-26 23:54:29 -04:00
|
|
|
info!("Wrapping in zipfile generator");
|
|
|
|
let now = OffsetDateTime::now_utc();
|
2022-04-27 20:15:51 -04:00
|
|
|
let zip_writer = super::zip::ZipGenerator::new(files, writer);
|
|
|
|
let size = zip_writer.total_size();
|
2022-04-27 00:55:36 -04:00
|
|
|
let download_filename =
|
|
|
|
super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip";
|
2022-04-28 05:18:35 -04:00
|
|
|
(
|
|
|
|
Box::new(zip_writer),
|
2022-04-30 01:38:26 -04:00
|
|
|
download_filename,
|
|
|
|
size,
|
|
|
|
now,
|
2022-04-28 05:18:35 -04:00
|
|
|
)
|
2022-04-26 23:54:29 -04:00
|
|
|
} else {
|
2022-04-28 05:18:35 -04:00
|
|
|
(
|
|
|
|
Box::new(writer),
|
2022-04-30 01:38:26 -04:00
|
|
|
files[0].name.clone(),
|
|
|
|
files[0].size,
|
|
|
|
files[0].modtime,
|
2022-04-28 05:18:35 -04:00
|
|
|
)
|
2022-04-27 20:15:51 -04:00
|
|
|
};
|
|
|
|
self.writer = Some(writer);
|
2022-04-30 01:38:26 -04:00
|
|
|
let stored_file = StoredFile {
|
|
|
|
name,
|
|
|
|
size,
|
|
|
|
modtime,
|
|
|
|
expiry: OffsetDateTime::now_utc() + lifetime*time::Duration::DAY,
|
|
|
|
};
|
2022-04-27 20:15:51 -04:00
|
|
|
let data = self.app_data.clone();
|
2022-04-30 01:38:26 -04:00
|
|
|
let storage_filename = self.storage_filename.clone();
|
2022-04-27 20:15:51 -04:00
|
|
|
ctx.spawn(actix::fut::wrap_future(async move {
|
2022-04-28 06:26:44 -04:00
|
|
|
debug!("Spawned future to add entry {} to state", storage_filename);
|
2022-04-28 05:18:35 -04:00
|
|
|
data.write()
|
|
|
|
.await
|
2022-04-30 01:38:26 -04:00
|
|
|
.add_file(storage_filename, stored_file)
|
2022-04-27 20:15:51 -04:00
|
|
|
.await
|
2022-04-30 01:38:26 -04:00
|
|
|
}).map(|res, u: &mut Self, ctx: &mut <Self as Actor>::Context| {
|
|
|
|
match res {
|
|
|
|
Ok(Ok(())) => ctx.text(serde_json::to_string(&ServerMessage::Ready { code: u.storage_filename.clone() }).unwrap()),
|
|
|
|
Ok(Err(size)) => u.notify_error_and_cleanup(Error::TooBig(size), ctx),
|
|
|
|
Err(e) => u.notify_error_and_cleanup(Error::from(e), ctx)
|
|
|
|
}
|
2022-04-27 20:15:51 -04:00
|
|
|
}));
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|
2022-04-28 05:18:35 -04:00
|
|
|
ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => {
|
2022-04-27 20:15:51 -04:00
|
|
|
let result = self.handle_data(data)?;
|
|
|
|
ack(ctx);
|
|
|
|
return Ok(result);
|
|
|
|
}
|
|
|
|
ws::Message::Continuation(Item::FirstBinary(data))
|
|
|
|
| ws::Message::Continuation(Item::Continue(data)) => {
|
|
|
|
return self.handle_data(data);
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|
|
|
|
ws::Message::Close(reason) => {
|
|
|
|
if self.bytes_remaining > 0 {
|
|
|
|
return Err(Error::ClosedEarly(reason));
|
|
|
|
} else {
|
|
|
|
return Ok(true);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ws::Message::Ping(ping) => {
|
|
|
|
debug!("Ping received, ponging");
|
|
|
|
ctx.pong(&ping);
|
|
|
|
}
|
|
|
|
ws::Message::Nop | ws::Message::Pong(_) => (),
|
|
|
|
_ => {
|
|
|
|
return Err(Error::UnexpectedMessageType);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(false)
|
|
|
|
}
|
2022-04-27 20:15:51 -04:00
|
|
|
|
|
|
|
fn handle_data(&mut self, data: Bytes) -> Result<bool, Error> {
|
|
|
|
if let Some(ref mut writer) = self.writer {
|
|
|
|
if (data.len() as u64) > self.bytes_remaining {
|
2022-04-28 00:27:22 -04:00
|
|
|
return Err(Error::UnexpectedExtraData);
|
2022-04-27 20:15:51 -04:00
|
|
|
}
|
|
|
|
self.bytes_remaining -= data.len() as u64;
|
|
|
|
writer.write_all(&data)?;
|
|
|
|
Ok(self.bytes_remaining == 0)
|
|
|
|
} else {
|
|
|
|
Err(Error::UnexpectedMessageType)
|
|
|
|
}
|
|
|
|
}
|
2022-04-28 05:13:14 -04:00
|
|
|
|
|
|
|
fn cleanup_after_error(&mut self, ctx: &mut <Self as Actor>::Context) {
|
2022-04-29 22:36:44 -04:00
|
|
|
info!(
|
|
|
|
"Cleaning up after failed upload of {}",
|
|
|
|
self.storage_filename
|
|
|
|
);
|
2022-04-28 05:13:14 -04:00
|
|
|
let data = self.app_data.clone();
|
|
|
|
let filename = self.storage_filename.clone();
|
2022-04-29 22:36:44 -04:00
|
|
|
ctx.wait(
|
|
|
|
actix::fut::wrap_future(async move {
|
|
|
|
debug!("Spawned future to remove entry {} from state", filename);
|
|
|
|
data.write().await.remove_file(&filename).await.unwrap();
|
|
|
|
})
|
|
|
|
.map(stop_and_flush),
|
|
|
|
);
|
2022-04-28 06:26:44 -04:00
|
|
|
if let Err(e) = std::fs::remove_file(storage_dir().join(&self.storage_filename)) {
|
|
|
|
error!("Failed to remove file {}: {}", self.storage_filename, e);
|
|
|
|
}
|
2022-04-28 05:13:14 -04:00
|
|
|
}
|
2022-04-26 23:54:29 -04:00
|
|
|
}
|