use std::{collections::HashSet, fs::File, io::Write}; use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, StreamHandler}; use actix_http::ws::{CloseReason, Item}; use actix_web::web; use actix_web_actors::ws::{self, CloseCode}; use bytes::Bytes; use log::{debug, error, info, trace}; use sanitise_file_name::{sanitise_with_options, Options as SanOptions}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use unicode_normalization::UnicodeNormalization; use crate::{ log_auth_failure, store::{self, FileAddError, StoredFile, StoredFileWithPassword}, zip::FileSet, AppData, }; const MAX_FILES: usize = 256; const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] = time::macros::format_description!("[year]-[month]-[day]-[hour][minute][second]"); /// Sanitises a filename after performing unicode normalization, /// optionally reducing the length limit to leave space for an /// extension yet to be added. fn sanitise(name: &str, extension_length: usize) -> String { let name = name.nfd().collect::(); sanitise_with_options( &name, &SanOptions { length_limit: SanOptions::DEFAULT.length_limit - extension_length, ..SanOptions::DEFAULT }, ) } #[derive(thiserror::Error, Debug)] enum Error { #[error("Failed to parse file metadata")] Parse(#[from] serde_json::Error), #[error("Error writing to stored file")] Storage(#[from] std::io::Error), #[error("Duplicate filename could not be deduplicated")] DuplicateFilename, #[error("This message type was not expected at this stage")] UnexpectedMessageType, #[error("Metadata contained an empty list of files")] NoFiles, #[error("Number of files submitted by client exceeded the maximum limit")] TooManyFiles, #[error("Requested lifetime was too long, can be at most {0} days")] TooLong(u16), #[error("Upload size was too large, can be at most {0} bytes")] TooBig(u64), #[error("File storage is full")] Full, #[error("Incorrect password")] IncorrectPassword, #[error("Websocket was closed by client before completing transfer")] ClosedEarly(Option), #[error("Client sent more data than they were supposed to")] UnexpectedExtraData, } impl Error { fn close_code(&self) -> CloseCode { match self { Self::Storage(_) => CloseCode::Error, Self::Parse(_) | Self::UnexpectedMessageType | Self::ClosedEarly(_) | Self::UnexpectedExtraData => CloseCode::Invalid, Self::DuplicateFilename | Self::NoFiles | Self::TooManyFiles | Self::TooLong(_) | Self::TooBig(_) | Self::Full | Self::IncorrectPassword => CloseCode::Policy, } } } pub struct Uploader { writer: Option>, storage_filename: String, app_data: web::Data, bytes_remaining: u64, ip_addr: String, } impl Uploader { pub(crate) fn new(app_data: web::Data, ip_addr: String) -> Self { Self { writer: None, storage_filename: store::gen_storage_code(app_data.config.mnemonic_codes), app_data, bytes_remaining: 0, ip_addr, } } } impl Actor for Uploader { type Context = ws::WebsocketContext; } type Context = ::Context; pub use crate::state::v1::UploadedFile; impl UploadedFile { fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self { Self { name: sanitise(name, 0), size, modtime, } } } #[derive(Debug, Deserialize)] struct RawUploadedFile { name: String, size: u64, modtime: i64, } impl RawUploadedFile { fn process(&self) -> UploadedFile { UploadedFile::new( &self.name, self.size, OffsetDateTime::from_unix_timestamp(self.modtime / 1000) .unwrap_or_else(|_| OffsetDateTime::now_utc()), ) } } #[derive(Deserialize)] struct UploadManifest { files: Vec, lifetime: u16, password: String, collection_name: Option, } #[derive(Serialize)] #[serde(rename_all = "snake_case", tag = "type")] enum ServerMessage { Ready { code: String }, TooBig { max_size: u64 }, TooLong { max_lifetime: u16 }, IncorrectPassword, Error { details: String }, } impl From<&Error> for ServerMessage { fn from(e: &Error) -> Self { match e { Error::TooBig(max_size) => ServerMessage::TooBig { max_size: *max_size, }, Error::TooLong(max_lifetime) => ServerMessage::TooLong { max_lifetime: *max_lifetime, }, Error::IncorrectPassword => ServerMessage::IncorrectPassword, _ => ServerMessage::Error { details: e.to_string(), }, } } } fn stop_and_flush(_: T, u: &mut Uploader, ctx: &mut Context) { ctx.stop(); if let Some(w) = u.writer.as_mut() { if let Err(e) = w.flush() { error!("Failed to flush writer: {}", e); } } } impl StreamHandler> for Uploader { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { Ok(m) => m, Err(e) => { error!("Websocket error: {}", e); self.cleanup_after_error(ctx); return; } }; match self.handle_message(msg, ctx) { Err(e) => { self.notify_error_and_cleanup(e, ctx); } Ok(true) => { info!("Finished uploading data"); ctx.close(Some(ws::CloseReason { code: CloseCode::Normal, description: None, })); stop_and_flush((), self, ctx); } _ => (), } } } impl Uploader { fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) { error!("{}", e); if let Error::IncorrectPassword = e { log_auth_failure(&self.ip_addr); } ctx.text(serde_json::to_string(&ServerMessage::from(&e)).unwrap()); ctx.close(Some(ws::CloseReason { code: e.close_code(), description: Some(e.to_string()), })); self.cleanup_after_error(ctx); } fn handle_message(&mut self, msg: ws::Message, ctx: &mut Context) -> Result { trace!("Websocket message: {:?}", msg); match msg { ws::Message::Text(text) => { if self.writer.is_some() { return Err(Error::UnexpectedMessageType); } let UploadManifest { files: raw_files, lifetime, password, collection_name, } = serde_json::from_slice(text.as_bytes())?; if std::env::var("TRANSBEAM_UPLOAD_PASSWORD") != Ok(password) { return Err(Error::IncorrectPassword); } if lifetime > self.app_data.config.max_lifetime { return Err(Error::TooLong(self.app_data.config.max_lifetime)); } info!("Received file list: {} files", raw_files.len()); debug!("{:?}", raw_files); if raw_files.len() > MAX_FILES { return Err(Error::TooManyFiles); } if raw_files.is_empty() { return Err(Error::NoFiles); } let mut filenames: HashSet = HashSet::new(); let mut files = Vec::new(); for raw_file in raw_files.iter() { let mut file = raw_file.process(); while filenames.contains(&file.name) { info!("Duplicate file name: {}", file.name); if file.name.len() >= SanOptions::DEFAULT.length_limit { return Err(Error::DuplicateFilename); } file.name.insert(0, '_'); } filenames.insert(file.name.clone()); self.bytes_remaining += file.size; files.push(file); } if self.bytes_remaining > self.app_data.config.max_upload_size { return Err(Error::TooBig(self.app_data.config.max_upload_size)); } let storage_path = self .app_data .config .storage_dir .join(self.storage_filename.clone()); info!("storing to: {:?}", storage_path); let writer = File::options() .write(true) .create_new(true) .open(&storage_path)?; let (writer, name, size, modtime, contents): (Box, _, _, _, _) = if files.len() > 1 { info!("Wrapping in zipfile generator"); let now = OffsetDateTime::now_utc(); let collection_name = collection_name.map(|f| sanitise(&f, 4)).unwrap_or_else(|| { super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT).unwrap() }); let file_set = FileSet { files, directory_name: Some(collection_name.clone()), }; let zip_writer = super::zip::ZipGenerator::new(file_set.clone(), writer); let size = zip_writer.total_size(); ( Box::new(zip_writer), collection_name + ".zip", size, now, Some(file_set), ) } else { ( Box::new(writer), files[0].name.clone(), files[0].size, files[0].modtime, None, ) }; self.writer = Some(writer); let stored_file = StoredFileWithPassword { file: StoredFile { name, size, modtime, expiry: OffsetDateTime::now_utc() + lifetime * time::Duration::DAY, contents, }, password: None, }; let app_data = self.app_data.clone(); let storage_filename = self.storage_filename.clone(); ctx.spawn( actix::fut::wrap_future(async move { debug!("Spawned future to add entry {} to state", storage_filename); app_data.add_file(storage_filename, stored_file).await }) .map(|res, u: &mut Self, ctx: &mut Context| match res { Ok(()) => ctx.text( serde_json::to_string(&ServerMessage::Ready { code: u.storage_filename.clone(), }) .unwrap(), ), Err(FileAddError::TooBig(size)) => { u.notify_error_and_cleanup(Error::TooBig(size), ctx) } Err(FileAddError::Full) => u.notify_error_and_cleanup(Error::Full, ctx), Err(FileAddError::FileSystem(e)) => { u.notify_error_and_cleanup(Error::from(e), ctx) } }), ); } ws::Message::Binary(data) | ws::Message::Continuation(Item::FirstBinary(data)) | ws::Message::Continuation(Item::Continue(data)) | ws::Message::Continuation(Item::Last(data)) => { return self.handle_data(data); } ws::Message::Close(reason) => { if self.bytes_remaining > 0 { return Err(Error::ClosedEarly(reason)); } else { return Ok(true); } } ws::Message::Ping(ping) => { debug!("Ping received, ponging"); ctx.pong(&ping); } ws::Message::Nop | ws::Message::Pong(_) => (), _ => { return Err(Error::UnexpectedMessageType); } } Ok(false) } fn handle_data(&mut self, data: Bytes) -> Result { if let Some(ref mut writer) = self.writer { if (data.len() as u64) > self.bytes_remaining { return Err(Error::UnexpectedExtraData); } self.bytes_remaining -= data.len() as u64; writer.write_all(&data)?; Ok(self.bytes_remaining == 0) } else { Err(Error::UnexpectedMessageType) } } fn cleanup_after_error(&mut self, ctx: &mut Context) { info!( "Cleaning up after failed upload of {}", self.storage_filename ); let app_data = self.app_data.clone(); let filename = self.storage_filename.clone(); ctx.wait( actix::fut::wrap_future(async move { debug!("Spawned future to remove entry {} from state", filename); app_data.remove_file(&filename).await.unwrap(); }) .map(stop_and_flush), ); } }