use std::{collections::HashSet, io::Write, task::Waker}; use actix::{Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler}; use actix_http::ws::{CloseReason, Item}; use actix_web_actors::ws::{self, CloseCode}; use bytes::Bytes; use log::{debug, error, info, trace}; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; use time::OffsetDateTime; use crate::{file::LiveWriter, DownloadableFile, UploadedFile}; const MAX_FILES: usize = 256; const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] = time::macros::format_description!("[year]-[month]-[day]-[hour][minute][second]"); #[derive(thiserror::Error, Debug)] enum Error { #[error("Failed to parse file metadata")] Parse(#[from] serde_json::Error), #[error("Error writing to stored file")] Storage(#[from] std::io::Error), #[error("Time formatting error")] TimeFormat(#[from] time::error::Format), #[error("Duplicate filename could not be deduplicated")] DuplicateFilename, #[error("This message type was not expected at this stage")] UnexpectedMessageType, #[error("Metadata contained an empty list of files")] NoFiles, #[error("Number of files submitted by client exceeded the maximum limit")] TooManyFiles, #[error("Websocket was closed by client before completing transfer")] ClosedEarly(Option), #[error("Client sent more data than they were supposed to")] UnexpectedExtraData, } impl Error { fn close_code(&self) -> CloseCode { match self { Self::Parse(_) => CloseCode::Invalid, Self::Storage(_) => CloseCode::Error, Self::TimeFormat(_) => CloseCode::Error, Self::DuplicateFilename => CloseCode::Policy, Self::UnexpectedMessageType => CloseCode::Invalid, Self::NoFiles => CloseCode::Policy, Self::TooManyFiles => CloseCode::Policy, Self::ClosedEarly(_) => CloseCode::Invalid, Self::UnexpectedExtraData => CloseCode::Invalid, } } } pub struct Uploader { writer: Option>, storage_filename: String, app_data: super::AppData, bytes_remaining: u64, } impl Uploader { pub(crate) fn new(app_data: super::AppData) -> Self { Self { writer: None, storage_filename: String::new(), app_data, bytes_remaining: 0, } } } impl Actor for Uploader { type Context = ws::WebsocketContext; } #[derive(Message)] #[rtype(result = "()")] pub(crate) struct WakerMessage(pub Waker); impl Handler for Uploader { type Result = (); fn handle(&mut self, msg: WakerMessage, _: &mut Self::Context) { if let Some(w) = self.writer.as_mut() { w.add_waker(msg.0); } else { error!("Got a wakeup request before creating a file"); } } } #[derive(Debug, Deserialize)] struct RawUploadedFile { name: String, size: u64, modtime: i64, } impl RawUploadedFile { fn process(&self) -> UploadedFile { UploadedFile::new( &self.name, self.size, OffsetDateTime::from_unix_timestamp(self.modtime / 1000) .unwrap_or_else(|_| OffsetDateTime::now_utc()), ) } } impl StreamHandler> for Uploader { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { Ok(m) => m, Err(e) => { error!("Websocket error: {}", e); self.cleanup_after_error(ctx); ctx.stop(); return; } }; match self.handle_message(msg, ctx) { Err(e) => { error!("{}", e); ctx.close(Some(ws::CloseReason { code: e.close_code(), description: Some(e.to_string()), })); self.cleanup_after_error(ctx); ctx.stop(); } Ok(true) => { info!("Finished uploading data"); self.writer.as_mut().map(|w| w.flush()); ctx.close(Some(ws::CloseReason { code: CloseCode::Normal, description: None, })); let data = self.app_data.clone(); let filename = self.storage_filename.clone(); ctx.wait(actix::fut::wrap_future(async move { data.write().await.remove_uploader(&filename); })); ctx.stop(); } _ => (), } } } fn ack(ctx: &mut ::Context) { ctx.text("ack"); } impl Uploader { fn handle_message( &mut self, msg: ws::Message, ctx: &mut ::Context, ) -> Result { trace!("Websocket message: {:?}", msg); match msg { ws::Message::Text(text) => { if self.writer.is_some() { return Err(Error::UnexpectedMessageType); } let raw_files: Vec = serde_json::from_slice(text.as_bytes())?; info!("Received file list: {} files", raw_files.len()); debug!("{:?}", raw_files); if raw_files.len() > MAX_FILES { return Err(Error::TooManyFiles); } let mut filenames: HashSet = HashSet::new(); let mut files = Vec::new(); for raw_file in raw_files.iter() { let mut file = raw_file.process(); while filenames.contains(&file.name) { info!("Duplicate file name: {}", file.name); if file.name.len() >= sanitise_file_name::Options::DEFAULT.length_limit { return Err(Error::DuplicateFilename); } file.name.insert(0, '_'); } filenames.insert(file.name.clone()); self.bytes_remaining += file.size; files.push(file); } if files.is_empty() { return Err(Error::NoFiles); } let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8); self.storage_filename = storage_filename.clone(); let storage_path = super::storage_dir().join(storage_filename.clone()); info!("storing to: {:?}", storage_path); let writer = super::file::LiveFileWriter::new(&storage_path)?; let addr = Some(ctx.address()); let (writer, downloadable_file): (Box, _) = if files.len() > 1 { info!("Wrapping in zipfile generator"); let now = OffsetDateTime::now_utc(); let zip_writer = super::zip::ZipGenerator::new(files, writer); let size = zip_writer.total_size(); let download_filename = super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip"; ( Box::new(zip_writer), DownloadableFile { name: download_filename, size, modtime: now, uploader: addr, }, ) } else { ( Box::new(writer), DownloadableFile { name: files[0].name.clone(), size: files[0].size, modtime: files[0].modtime, uploader: addr, }, ) }; self.writer = Some(writer); let data = self.app_data.clone(); ctx.spawn(actix::fut::wrap_future(async move { data.write() .await .add_file(storage_filename, downloadable_file) .await .unwrap(); })); ctx.text(self.storage_filename.as_str()); } ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => { let result = self.handle_data(data)?; ack(ctx); return Ok(result); } ws::Message::Continuation(Item::FirstBinary(data)) | ws::Message::Continuation(Item::Continue(data)) => { return self.handle_data(data); } ws::Message::Close(reason) => { if self.bytes_remaining > 0 { return Err(Error::ClosedEarly(reason)); } else { return Ok(true); } } ws::Message::Ping(ping) => { debug!("Ping received, ponging"); ctx.pong(&ping); } ws::Message::Nop | ws::Message::Pong(_) => (), _ => { return Err(Error::UnexpectedMessageType); } } Ok(false) } fn handle_data(&mut self, data: Bytes) -> Result { if let Some(ref mut writer) = self.writer { if (data.len() as u64) > self.bytes_remaining { return Err(Error::UnexpectedExtraData); } self.bytes_remaining -= data.len() as u64; writer.write_all(&data)?; Ok(self.bytes_remaining == 0) } else { Err(Error::UnexpectedMessageType) } } fn cleanup_after_error(&mut self, ctx: &mut ::Context) { let data = self.app_data.clone(); let filename = self.storage_filename.clone(); ctx.spawn(actix::fut::wrap_future(async move { data.write().await.remove_file(&filename).await.unwrap(); })); } }