use std::{collections::HashSet, fmt::Display, io::Write}; use actix::{Actor, StreamHandler, ActorContext, AsyncContext}; use actix_http::ws::{Item, CloseReason}; use actix_web_actors::ws::{self, CloseCode}; use log::{error, debug, info, trace}; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; use time::OffsetDateTime; use crate::{UploadedFile, DownloadableFile, file::LiveWriter}; const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] = time::macros::format_description!("[year]-[month]-[day]-[hour][minute][second]"); #[derive(thiserror::Error, Debug)] enum Error { #[error("Failed to parse file metadata")] Parse(#[from] serde_json::Error), #[error("Error writing to stored file")] Storage(#[from] std::io::Error), #[error("Time formatting error")] TimeFormat(#[from] time::error::Format), #[error("Lock on app state is poisoned")] LockPoisoned, #[error("Duplicate filename could not be deduplicated")] DuplicateFilename, #[error("This message type was not expected at this stage")] UnexpectedMessageType, #[error("Metadata contained an empty list of files")] NoFiles, #[error("Websocket was closed by client before completing transfer")] ClosedEarly(Option), #[error("Client sent more data than they were supposed to")] TooMuchData, } impl Error { fn close_code(&self) -> CloseCode { match self { Self::Parse(_) => CloseCode::Invalid, Self::Storage(_) => CloseCode::Error, Self::TimeFormat(_) => CloseCode::Error, Self::LockPoisoned => CloseCode::Error, Self::DuplicateFilename => CloseCode::Policy, Self::UnexpectedMessageType => CloseCode::Invalid, Self::NoFiles => CloseCode::Policy, Self::ClosedEarly(_) => CloseCode::Invalid, Self::TooMuchData => CloseCode::Invalid, } } } pub struct Uploader { writer: Option>, app_data: super::AppData, bytes_remaining: usize, } impl Uploader { pub fn new(app_data: super::AppData) -> Self { Self { writer: None, app_data, bytes_remaining: 0, } } } impl Actor for Uploader { type Context = ws::WebsocketContext; } #[derive(Debug, Deserialize)] struct RawUploadedFile { name: String, size: usize, modtime: i64, } impl RawUploadedFile { fn process(&self) -> UploadedFile { UploadedFile::new( &self.name, self.size, OffsetDateTime::from_unix_timestamp(self.modtime / 1000) .unwrap_or_else(|_| OffsetDateTime::now_utc()), ) } } impl StreamHandler> for Uploader { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { Ok(m) => m, Err(e) => { error!("Websocket error: {:?}", e); ctx.stop(); return; }, }; match self.handle_message(msg, ctx) { Err(e) => { error!("{:?}", e); ctx.close(Some(ws::CloseReason { code: e.close_code(), description: Some(e.to_string()), })); ctx.stop(); } Ok(true) => { info!("Finished uploading data"); self.writer.as_mut().map(|w| w.flush()); ctx.close(Some(ws::CloseReason { code: CloseCode::Normal, description: None, })); // self.app_data.write().unwrap().entry( ctx.stop(); } _ => () } } } fn ack(ctx: &mut ::Context) { ctx.text("ack"); } impl Uploader { fn handle_message(&mut self, msg: ws::Message, ctx: &mut ::Context) -> Result{ trace!("Websocket message: {:?}", msg); match msg { ws::Message::Text(text) => { if self.writer.is_some() { return Err(Error::UnexpectedMessageType) } let raw_files: Vec = serde_json::from_slice(text.as_bytes())?; info!("Received file list: {} files", raw_files.len()); debug!("{:?}", raw_files); let mut filenames: HashSet = HashSet::new(); let mut files = Vec::new(); for raw_file in raw_files.iter() { let mut file = raw_file.process(); while filenames.contains(&file.name) { info!("Duplicate file name: {}", file.name); if file.name.len() >= sanitise_file_name::Options::DEFAULT.length_limit { return Err(Error::DuplicateFilename); } file.name.insert(0, '_'); } filenames.insert(file.name.clone()); self.bytes_remaining += file.size; files.push(file); } if files.is_empty() { return Err(Error::NoFiles); } let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8); let storage_path = super::storage_dir().join(storage_filename.clone()); info!("storing to: {:?}", storage_path); let writer = super::file::LiveFileWriter::new(&storage_path)?; if files.len() > 1 { info!("Wrapping in zipfile generator"); let now = OffsetDateTime::now_utc(); let writer = super::zip::ZipGenerator::new(files, Box::new(writer)); let size = writer.total_size(); self.writer = Some(Box::new(writer)); let download_filename = super::app_name() + &now.format(FILENAME_DATE_FORMAT)? + ".zip"; let modtime = now; self.app_data.write().map_err(|_| Error::LockPoisoned)?.insert(storage_filename, DownloadableFile { name: download_filename, size, modtime, uploader: Some(ctx.address()), }); } else { self.writer = Some(Box::new(writer)); self.app_data.write().map_err(|_| Error::LockPoisoned)?.insert(storage_filename, DownloadableFile { name: files[0].name.clone(), size: files[0].size, modtime: files[0].modtime, uploader: Some(ctx.address()), }); } ack(ctx); } ws::Message::Binary(data) | ws::Message::Continuation(Item::FirstBinary(data)) | ws::Message::Continuation(Item::Continue(data)) | ws::Message::Continuation(Item::Last(data)) => { if let Some(ref mut writer) = self.writer { if data.len() > self.bytes_remaining { return Err(Error::TooMuchData); } self.bytes_remaining -= data.len(); writer.write_all(&data)?; ack(ctx); if self.bytes_remaining == 0 { return Ok(true); } } else { return Err(Error::UnexpectedMessageType); } } ws::Message::Close(reason) => { if self.bytes_remaining > 0 { return Err(Error::ClosedEarly(reason)); } else { return Ok(true); } } ws::Message::Ping(ping) => { debug!("Ping received, ponging"); ctx.pong(&ping); } ws::Message::Nop | ws::Message::Pong(_) => (), _ => { return Err(Error::UnexpectedMessageType); } } Ok(false) } }