transbeam/src/upload.rs

243 lines
8.6 KiB
Rust
Raw Normal View History

2022-04-26 23:54:29 -04:00
use std::{collections::HashSet, fmt::Display, io::Write};
2022-04-27 00:55:36 -04:00
use actix::{Actor, ActorContext, AsyncContext, StreamHandler};
use actix_http::ws::{CloseReason, Item};
2022-04-26 23:54:29 -04:00
use actix_web_actors::ws::{self, CloseCode};
2022-04-27 00:55:36 -04:00
use log::{debug, error, info, trace};
2022-04-26 23:54:29 -04:00
use rand::distributions::{Alphanumeric, DistString};
use serde::Deserialize;
use time::OffsetDateTime;
2022-04-27 00:55:36 -04:00
use crate::{file::LiveWriter, DownloadableFile, UploadedFile};
2022-04-26 23:54:29 -04:00
const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] =
time::macros::format_description!("[year]-[month]-[day]-[hour][minute][second]");
#[derive(thiserror::Error, Debug)]
enum Error {
#[error("Failed to parse file metadata")]
Parse(#[from] serde_json::Error),
#[error("Error writing to stored file")]
Storage(#[from] std::io::Error),
#[error("Time formatting error")]
TimeFormat(#[from] time::error::Format),
#[error("Lock on app state is poisoned")]
LockPoisoned,
#[error("Duplicate filename could not be deduplicated")]
DuplicateFilename,
#[error("This message type was not expected at this stage")]
UnexpectedMessageType,
#[error("Metadata contained an empty list of files")]
NoFiles,
#[error("Websocket was closed by client before completing transfer")]
ClosedEarly(Option<CloseReason>),
#[error("Client sent more data than they were supposed to")]
TooMuchData,
}
impl Error {
fn close_code(&self) -> CloseCode {
match self {
Self::Parse(_) => CloseCode::Invalid,
Self::Storage(_) => CloseCode::Error,
Self::TimeFormat(_) => CloseCode::Error,
Self::LockPoisoned => CloseCode::Error,
Self::DuplicateFilename => CloseCode::Policy,
Self::UnexpectedMessageType => CloseCode::Invalid,
Self::NoFiles => CloseCode::Policy,
Self::ClosedEarly(_) => CloseCode::Invalid,
Self::TooMuchData => CloseCode::Invalid,
}
}
}
pub struct Uploader {
writer: Option<Box<dyn LiveWriter>>,
app_data: super::AppData,
bytes_remaining: usize,
}
impl Uploader {
pub fn new(app_data: super::AppData) -> Self {
Self {
writer: None,
app_data,
bytes_remaining: 0,
}
}
}
impl Actor for Uploader {
type Context = ws::WebsocketContext<Self>;
}
#[derive(Debug, Deserialize)]
struct RawUploadedFile {
name: String,
size: usize,
modtime: i64,
}
impl RawUploadedFile {
fn process(&self) -> UploadedFile {
UploadedFile::new(
&self.name,
self.size,
OffsetDateTime::from_unix_timestamp(self.modtime / 1000)
.unwrap_or_else(|_| OffsetDateTime::now_utc()),
)
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
Ok(m) => m,
Err(e) => {
error!("Websocket error: {:?}", e);
ctx.stop();
return;
2022-04-27 00:55:36 -04:00
}
2022-04-26 23:54:29 -04:00
};
match self.handle_message(msg, ctx) {
Err(e) => {
error!("{:?}", e);
ctx.close(Some(ws::CloseReason {
code: e.close_code(),
description: Some(e.to_string()),
}));
ctx.stop();
}
Ok(true) => {
info!("Finished uploading data");
self.writer.as_mut().map(|w| w.flush());
ctx.close(Some(ws::CloseReason {
code: CloseCode::Normal,
description: None,
}));
2022-04-27 00:55:36 -04:00
// self.app_data.write().unwrap().entry(
2022-04-26 23:54:29 -04:00
ctx.stop();
}
2022-04-27 00:55:36 -04:00
_ => (),
2022-04-26 23:54:29 -04:00
}
}
}
fn ack(ctx: &mut <Uploader as Actor>::Context) {
ctx.text("ack");
}
impl Uploader {
2022-04-27 00:55:36 -04:00
fn handle_message(
&mut self,
msg: ws::Message,
ctx: &mut <Self as Actor>::Context,
) -> Result<bool, Error> {
2022-04-26 23:54:29 -04:00
trace!("Websocket message: {:?}", msg);
match msg {
ws::Message::Text(text) => {
if self.writer.is_some() {
2022-04-27 00:55:36 -04:00
return Err(Error::UnexpectedMessageType);
2022-04-26 23:54:29 -04:00
}
let raw_files: Vec<RawUploadedFile> = serde_json::from_slice(text.as_bytes())?;
info!("Received file list: {} files", raw_files.len());
debug!("{:?}", raw_files);
let mut filenames: HashSet<String> = HashSet::new();
let mut files = Vec::new();
for raw_file in raw_files.iter() {
let mut file = raw_file.process();
while filenames.contains(&file.name) {
info!("Duplicate file name: {}", file.name);
if file.name.len() >= sanitise_file_name::Options::DEFAULT.length_limit {
return Err(Error::DuplicateFilename);
}
file.name.insert(0, '_');
}
filenames.insert(file.name.clone());
self.bytes_remaining += file.size;
files.push(file);
}
if files.is_empty() {
return Err(Error::NoFiles);
}
let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
let storage_path = super::storage_dir().join(storage_filename.clone());
info!("storing to: {:?}", storage_path);
let writer = super::file::LiveFileWriter::new(&storage_path)?;
if files.len() > 1 {
info!("Wrapping in zipfile generator");
let now = OffsetDateTime::now_utc();
let writer = super::zip::ZipGenerator::new(files, Box::new(writer));
let size = writer.total_size();
self.writer = Some(Box::new(writer));
2022-04-27 00:55:36 -04:00
let download_filename =
super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip";
2022-04-26 23:54:29 -04:00
let modtime = now;
2022-04-27 00:55:36 -04:00
self.app_data
.write()
.map_err(|_| Error::LockPoisoned)?
.insert(
storage_filename,
DownloadableFile {
name: download_filename,
size,
modtime,
uploader: Some(ctx.address()),
},
);
2022-04-26 23:54:29 -04:00
} else {
self.writer = Some(Box::new(writer));
2022-04-27 00:55:36 -04:00
self.app_data
.write()
.map_err(|_| Error::LockPoisoned)?
.insert(
storage_filename,
DownloadableFile {
name: files[0].name.clone(),
size: files[0].size,
modtime: files[0].modtime,
uploader: Some(ctx.address()),
},
);
2022-04-26 23:54:29 -04:00
}
ack(ctx);
}
ws::Message::Binary(data)
2022-04-27 00:55:36 -04:00
| ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data))
| ws::Message::Continuation(Item::Last(data)) => {
2022-04-26 23:54:29 -04:00
if let Some(ref mut writer) = self.writer {
if data.len() > self.bytes_remaining {
return Err(Error::TooMuchData);
}
self.bytes_remaining -= data.len();
writer.write_all(&data)?;
ack(ctx);
if self.bytes_remaining == 0 {
return Ok(true);
}
} else {
return Err(Error::UnexpectedMessageType);
}
}
ws::Message::Close(reason) => {
if self.bytes_remaining > 0 {
return Err(Error::ClosedEarly(reason));
} else {
return Ok(true);
}
}
ws::Message::Ping(ping) => {
debug!("Ping received, ponging");
ctx.pong(&ping);
}
ws::Message::Nop | ws::Message::Pong(_) => (),
_ => {
return Err(Error::UnexpectedMessageType);
}
}
Ok(false)
}
}