use std::{collections::HashSet, fs::File, io::Write}; use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, StreamHandler}; use actix_http::ws::{CloseReason, Item}; use actix_web::web; use actix_web_actors::ws::{self, CloseCode}; use bytes::Bytes; use log::{debug, error, info, trace}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use unicode_normalization::UnicodeNormalization; use crate::{ log_auth_failure, store::{self, FileAddError, StoredFile}, AppState, }; const MAX_FILES: usize = 256; const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] = time::macros::format_description!("[year]-[month]-[day]-[hour][minute][second]"); #[derive(thiserror::Error, Debug)] enum Error { #[error("Failed to parse file metadata")] Parse(#[from] serde_json::Error), #[error("Error writing to stored file")] Storage(#[from] std::io::Error), #[error("Duplicate filename could not be deduplicated")] DuplicateFilename, #[error("This message type was not expected at this stage")] UnexpectedMessageType, #[error("Metadata contained an empty list of files")] NoFiles, #[error("Number of files submitted by client exceeded the maximum limit")] TooManyFiles, #[error("Requested lifetime was too long, can be at most {0} days")] TooLong(u16), #[error("Upload size was too large, can be at most {0} bytes")] TooBig(u64), #[error("File storage is full")] Full, #[error("Incorrect password")] IncorrectPassword, #[error("Websocket was closed by client before completing transfer")] ClosedEarly(Option), #[error("Client sent more data than they were supposed to")] UnexpectedExtraData, } impl Error { fn close_code(&self) -> CloseCode { match self { Self::Storage(_) => CloseCode::Error, Self::Parse(_) | Self::UnexpectedMessageType | Self::ClosedEarly(_) | Self::UnexpectedExtraData => CloseCode::Invalid, Self::DuplicateFilename | Self::NoFiles | Self::TooManyFiles | Self::TooLong(_) | Self::TooBig(_) | Self::Full | Self::IncorrectPassword => CloseCode::Policy, } } } pub struct Uploader { writer: Option>, storage_filename: String, app_state: web::Data, bytes_remaining: u64, ip_addr: String, } impl Uploader { pub(crate) fn new(app_state: web::Data, ip_addr: String) -> Self { Self { writer: None, storage_filename: store::gen_storage_code(app_state.config.mnemonic_codes), app_state, bytes_remaining: 0, ip_addr, } } } impl Actor for Uploader { type Context = ws::WebsocketContext; } type Context = ::Context; pub struct UploadedFile { pub name: String, pub size: u64, pub modtime: OffsetDateTime, } impl UploadedFile { fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self { Self { name: sanitise_file_name::sanitise(&name.nfd().collect::()), size, modtime, } } } #[derive(Debug, Deserialize)] struct RawUploadedFile { name: String, size: u64, modtime: i64, } impl RawUploadedFile { fn process(&self) -> UploadedFile { UploadedFile::new( &self.name, self.size, OffsetDateTime::from_unix_timestamp(self.modtime / 1000) .unwrap_or_else(|_| OffsetDateTime::now_utc()), ) } } #[derive(Deserialize)] struct UploadManifest { files: Vec, lifetime: u16, password: String, } #[derive(Serialize)] #[serde(rename_all = "snake_case", tag = "type")] enum ServerMessage { Ready { code: String }, TooBig { max_size: u64 }, TooLong { max_days: u16 }, IncorrectPassword, Error { details: String }, } impl From<&Error> for ServerMessage { fn from(e: &Error) -> Self { match e { Error::TooBig(max_size) => ServerMessage::TooBig { max_size: *max_size, }, Error::TooLong(max_days) => ServerMessage::TooLong { max_days: *max_days, }, Error::IncorrectPassword => ServerMessage::IncorrectPassword, _ => ServerMessage::Error { details: e.to_string(), }, } } } fn stop_and_flush(_: T, u: &mut Uploader, ctx: &mut Context) { ctx.stop(); if let Some(w) = u.writer.as_mut() { if let Err(e) = w.flush() { error!("Failed to flush writer: {}", e); } } } impl StreamHandler> for Uploader { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { Ok(m) => m, Err(e) => { error!("Websocket error: {}", e); self.cleanup_after_error(ctx); return; } }; match self.handle_message(msg, ctx) { Err(e) => { self.notify_error_and_cleanup(e, ctx); } Ok(true) => { info!("Finished uploading data"); ctx.close(Some(ws::CloseReason { code: CloseCode::Normal, description: None, })); stop_and_flush((), self, ctx); } _ => (), } } } fn ack(ctx: &mut Context) { ctx.text("ack"); } impl Uploader { fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) { error!("{}", e); if let Error::IncorrectPassword = e { log_auth_failure(&self.ip_addr); } ctx.text(serde_json::to_string(&ServerMessage::from(&e)).unwrap()); ctx.close(Some(ws::CloseReason { code: e.close_code(), description: Some(e.to_string()), })); self.cleanup_after_error(ctx); } fn handle_message(&mut self, msg: ws::Message, ctx: &mut Context) -> Result { trace!("Websocket message: {:?}", msg); match msg { ws::Message::Text(text) => { if self.writer.is_some() { return Err(Error::UnexpectedMessageType); } let UploadManifest { files: raw_files, lifetime, password, } = serde_json::from_slice(text.as_bytes())?; if std::env::var("TRANSBEAM_UPLOAD_PASSWORD") != Ok(password) { return Err(Error::IncorrectPassword); } if lifetime > self.app_state.config.max_lifetime { return Err(Error::TooLong(self.app_state.config.max_lifetime)); } info!("Received file list: {} files", raw_files.len()); debug!("{:?}", raw_files); if raw_files.len() > MAX_FILES { return Err(Error::TooManyFiles); } if raw_files.is_empty() { return Err(Error::NoFiles); } let mut filenames: HashSet = HashSet::new(); let mut files = Vec::new(); for raw_file in raw_files.iter() { let mut file = raw_file.process(); while filenames.contains(&file.name) { info!("Duplicate file name: {}", file.name); if file.name.len() >= sanitise_file_name::Options::DEFAULT.length_limit { return Err(Error::DuplicateFilename); } file.name.insert(0, '_'); } filenames.insert(file.name.clone()); self.bytes_remaining += file.size; files.push(file); } if self.bytes_remaining > self.app_state.config.max_upload_size { return Err(Error::TooBig(self.app_state.config.max_upload_size)); } let storage_path = self .app_state .config .storage_dir .join(self.storage_filename.clone()); info!("storing to: {:?}", storage_path); let writer = File::options() .write(true) .create_new(true) .open(&storage_path)?; let (writer, name, size, modtime): (Box, _, _, _) = if files.len() > 1 { info!("Wrapping in zipfile generator"); let now = OffsetDateTime::now_utc(); let zip_writer = super::zip::ZipGenerator::new(files, writer); let size = zip_writer.total_size(); let download_filename = super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT).unwrap() + ".zip"; (Box::new(zip_writer), download_filename, size, now) } else { ( Box::new(writer), files[0].name.clone(), files[0].size, files[0].modtime, ) }; self.writer = Some(writer); let stored_file = StoredFile { name, size, modtime, expiry: OffsetDateTime::now_utc() + lifetime * time::Duration::DAY, }; let state = self.app_state.clone(); let storage_filename = self.storage_filename.clone(); ctx.spawn( actix::fut::wrap_future(async move { debug!("Spawned future to add entry {} to state", storage_filename); state .file_store .write() .await .add_file(storage_filename, stored_file) .await }) .map(|res, u: &mut Self, ctx: &mut Context| match res { Ok(()) => ctx.text( serde_json::to_string(&ServerMessage::Ready { code: u.storage_filename.clone(), }) .unwrap(), ), Err(FileAddError::TooBig(size)) => { u.notify_error_and_cleanup(Error::TooBig(size), ctx) } Err(FileAddError::Full) => u.notify_error_and_cleanup(Error::Full, ctx), Err(FileAddError::FileSystem(e)) => { u.notify_error_and_cleanup(Error::from(e), ctx) } }), ); } ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => { let result = self.handle_data(data)?; ack(ctx); return Ok(result); } ws::Message::Continuation(Item::FirstBinary(data)) | ws::Message::Continuation(Item::Continue(data)) => { return self.handle_data(data); } ws::Message::Close(reason) => { if self.bytes_remaining > 0 { return Err(Error::ClosedEarly(reason)); } else { return Ok(true); } } ws::Message::Ping(ping) => { debug!("Ping received, ponging"); ctx.pong(&ping); } ws::Message::Nop | ws::Message::Pong(_) => (), _ => { return Err(Error::UnexpectedMessageType); } } Ok(false) } fn handle_data(&mut self, data: Bytes) -> Result { if let Some(ref mut writer) = self.writer { if (data.len() as u64) > self.bytes_remaining { return Err(Error::UnexpectedExtraData); } self.bytes_remaining -= data.len() as u64; writer.write_all(&data)?; Ok(self.bytes_remaining == 0) } else { Err(Error::UnexpectedMessageType) } } fn cleanup_after_error(&mut self, ctx: &mut Context) { info!( "Cleaning up after failed upload of {}", self.storage_filename ); let state = self.app_state.clone(); let filename = self.storage_filename.clone(); ctx.wait( actix::fut::wrap_future(async move { debug!("Spawned future to remove entry {} from state", filename); state .file_store .write() .await .remove_file(&filename) .await .unwrap(); }) .map(stop_and_flush), ); } }