use std::{collections::HashSet, fs::File, io::Write}; use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, StreamHandler}; use actix_http::ws::{CloseReason, Item}; use actix_web_actors::ws::{self, CloseCode}; use bytes::Bytes; use log::{debug, error, info, trace}; use rand::distributions::{Alphanumeric, DistString}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use crate::store::{storage_dir, StoredFile, self}; const MAX_FILES: usize = 256; const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] = time::macros::format_description!("[year]-[month]-[day]-[hour][minute][second]"); #[derive(thiserror::Error, Debug)] enum Error { #[error("Failed to parse file metadata")] Parse(#[from] serde_json::Error), #[error("Error writing to stored file")] Storage(#[from] std::io::Error), #[error("Time formatting error")] TimeFormat(#[from] time::error::Format), #[error("Duplicate filename could not be deduplicated")] DuplicateFilename, #[error("This message type was not expected at this stage")] UnexpectedMessageType, #[error("Metadata contained an empty list of files")] NoFiles, #[error("Number of files submitted by client exceeded the maximum limit")] TooManyFiles, #[error("Requested lifetime was too long")] TooLong, #[error("Upload size was too large, can be at most {0} bytes")] TooBig(u64), #[error("Websocket was closed by client before completing transfer")] ClosedEarly(Option), #[error("Client sent more data than they were supposed to")] UnexpectedExtraData, } impl Error { fn close_code(&self) -> CloseCode { match self { Self::Storage(_) | Self::TimeFormat(_) => CloseCode::Error, Self::Parse(_) | Self::UnexpectedMessageType | Self::ClosedEarly(_) | Self::UnexpectedExtraData => CloseCode::Invalid, Self::DuplicateFilename | Self::NoFiles | Self::TooManyFiles | Self::TooLong | Self::TooBig(_) => CloseCode::Policy, } } } pub struct Uploader { writer: Option>, storage_filename: String, app_data: super::AppData, bytes_remaining: u64, } impl Uploader { pub(crate) fn new(app_data: super::AppData) -> Self { Self { writer: None, storage_filename: Alphanumeric.sample_string(&mut rand::thread_rng(), 8), app_data, bytes_remaining: 0, } } } impl Actor for Uploader { type Context = ws::WebsocketContext; } pub struct UploadedFile { pub name: String, pub size: u64, pub modtime: OffsetDateTime, } impl UploadedFile { fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self { Self { name: sanitise_file_name::sanitise(name), size, modtime, } } } #[derive(Debug, Deserialize)] struct RawUploadedFile { name: String, size: u64, modtime: i64, } impl RawUploadedFile { fn process(&self) -> UploadedFile { UploadedFile::new( &self.name, self.size, OffsetDateTime::from_unix_timestamp(self.modtime / 1000) .unwrap_or_else(|_| OffsetDateTime::now_utc()), ) } } #[derive(Deserialize)] struct UploadManifest { files: Vec, lifetime: u32, } #[derive(Serialize)] #[serde(rename_all = "snake_case", tag = "type")] enum ServerMessage { Ready { code: String }, TooBig { max_size: u64 }, TooLong { max_days: u32 }, Error { details: String }, } impl From<&Error> for ServerMessage { fn from(e: &Error) -> Self { match e { Error::TooBig(max_size) => ServerMessage::TooBig { max_size: *max_size }, Error::TooLong => ServerMessage::TooLong { max_days: store::max_lifetime() }, _ => ServerMessage::Error { details: e.to_string() }, } } } fn stop_and_flush(_: T, u: &mut Uploader, ctx: &mut ::Context) { ctx.stop(); if let Some(w) = u.writer.as_mut() { if let Err(e) = w.flush() { error!("Failed to flush writer: {}", e); } } } impl StreamHandler> for Uploader { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { Ok(m) => m, Err(e) => { error!("Websocket error: {}", e); self.cleanup_after_error(ctx); return; } }; match self.handle_message(msg, ctx) { Err(e) => { self.notify_error_and_cleanup(e, ctx); } Ok(true) => { info!("Finished uploading data"); ctx.close(Some(ws::CloseReason { code: CloseCode::Normal, description: None, })); stop_and_flush((), self, ctx); } _ => (), } } } fn ack(ctx: &mut ::Context) { ctx.text("ack"); } impl Uploader { fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut ::Context) { error!("{}", e); ctx.text(serde_json::to_string(&ServerMessage::from(&e)).unwrap()); ctx.close(Some(ws::CloseReason { code: e.close_code(), description: Some(e.to_string()), })); self.cleanup_after_error(ctx); } fn handle_message( &mut self, msg: ws::Message, ctx: &mut ::Context, ) -> Result { trace!("Websocket message: {:?}", msg); match msg { ws::Message::Text(text) => { if self.writer.is_some() { return Err(Error::UnexpectedMessageType); } let UploadManifest { files: raw_files, lifetime, } = serde_json::from_slice(text.as_bytes())?; if lifetime > store::max_lifetime() { return Err(Error::TooLong); } info!("Received file list: {} files", raw_files.len()); debug!("{:?}", raw_files); if raw_files.len() > MAX_FILES { return Err(Error::TooManyFiles); } if raw_files.is_empty() { return Err(Error::NoFiles); } let mut filenames: HashSet = HashSet::new(); let mut files = Vec::new(); for raw_file in raw_files.iter() { let mut file = raw_file.process(); while filenames.contains(&file.name) { info!("Duplicate file name: {}", file.name); if file.name.len() >= sanitise_file_name::Options::DEFAULT.length_limit { return Err(Error::DuplicateFilename); } file.name.insert(0, '_'); } filenames.insert(file.name.clone()); self.bytes_remaining += file.size; files.push(file); } let storage_path = storage_dir().join(self.storage_filename.clone()); info!("storing to: {:?}", storage_path); let writer = File::options() .write(true) .create_new(true) .open(&storage_path)?; let (writer, name, size, modtime): (Box,_,_,_) = if files.len() > 1 { info!("Wrapping in zipfile generator"); let now = OffsetDateTime::now_utc(); let zip_writer = super::zip::ZipGenerator::new(files, writer); let size = zip_writer.total_size(); let download_filename = super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip"; ( Box::new(zip_writer), download_filename, size, now, ) } else { ( Box::new(writer), files[0].name.clone(), files[0].size, files[0].modtime, ) }; self.writer = Some(writer); let stored_file = StoredFile { name, size, modtime, expiry: OffsetDateTime::now_utc() + lifetime*time::Duration::DAY, }; let data = self.app_data.clone(); let storage_filename = self.storage_filename.clone(); ctx.spawn(actix::fut::wrap_future(async move { debug!("Spawned future to add entry {} to state", storage_filename); data.write() .await .add_file(storage_filename, stored_file) .await }).map(|res, u: &mut Self, ctx: &mut ::Context| { match res { Ok(Ok(())) => ctx.text(serde_json::to_string(&ServerMessage::Ready { code: u.storage_filename.clone() }).unwrap()), Ok(Err(size)) => u.notify_error_and_cleanup(Error::TooBig(size), ctx), Err(e) => u.notify_error_and_cleanup(Error::from(e), ctx) } })); } ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => { let result = self.handle_data(data)?; ack(ctx); return Ok(result); } ws::Message::Continuation(Item::FirstBinary(data)) | ws::Message::Continuation(Item::Continue(data)) => { return self.handle_data(data); } ws::Message::Close(reason) => { if self.bytes_remaining > 0 { return Err(Error::ClosedEarly(reason)); } else { return Ok(true); } } ws::Message::Ping(ping) => { debug!("Ping received, ponging"); ctx.pong(&ping); } ws::Message::Nop | ws::Message::Pong(_) => (), _ => { return Err(Error::UnexpectedMessageType); } } Ok(false) } fn handle_data(&mut self, data: Bytes) -> Result { if let Some(ref mut writer) = self.writer { if (data.len() as u64) > self.bytes_remaining { return Err(Error::UnexpectedExtraData); } self.bytes_remaining -= data.len() as u64; writer.write_all(&data)?; Ok(self.bytes_remaining == 0) } else { Err(Error::UnexpectedMessageType) } } fn cleanup_after_error(&mut self, ctx: &mut ::Context) { info!( "Cleaning up after failed upload of {}", self.storage_filename ); let data = self.app_data.clone(); let filename = self.storage_filename.clone(); ctx.wait( actix::fut::wrap_future(async move { debug!("Spawned future to remove entry {} from state", filename); data.write().await.remove_file(&filename).await.unwrap(); }) .map(stop_and_flush), ); if let Err(e) = std::fs::remove_file(storage_dir().join(&self.storage_filename)) { error!("Failed to remove file {}: {}", self.storage_filename, e); } } }