transbeam/src/upload.rs

403 lines
14 KiB
Rust

use std::{collections::HashSet, fs::File, io::Write};
use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, StreamHandler};
use actix_http::ws::{CloseReason, Item};
use actix_web::web;
use actix_web_actors::ws::{self, CloseCode};
use bytes::Bytes;
use log::{debug, error, info, trace};
use sanitise_file_name::{sanitise_with_options, Options as SanOptions};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use unicode_normalization::UnicodeNormalization;
use crate::{
log_auth_failure,
store::{self, FileAddError, StoredFile, StoredFileWithPassword},
zip::FileSet,
AppData,
};
const MAX_FILES: usize = 256;
const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] =
time::macros::format_description!("[year]-[month]-[day]-[hour][minute][second]");
/// Sanitises a filename after performing unicode normalization,
/// optionally reducing the length limit to leave space for an
/// extension yet to be added.
fn sanitise(name: &str, extension_length: usize) -> String {
let name = name.nfd().collect::<String>();
sanitise_with_options(
&name,
&SanOptions {
length_limit: SanOptions::DEFAULT.length_limit - extension_length,
..SanOptions::DEFAULT
},
)
}
#[derive(thiserror::Error, Debug)]
enum Error {
#[error("Failed to parse file metadata")]
Parse(#[from] serde_json::Error),
#[error("Error writing to stored file")]
Storage(#[from] std::io::Error),
#[error("Duplicate filename could not be deduplicated")]
DuplicateFilename,
#[error("This message type was not expected at this stage")]
UnexpectedMessageType,
#[error("Metadata contained an empty list of files")]
NoFiles,
#[error("Number of files submitted by client exceeded the maximum limit")]
TooManyFiles,
#[error("Requested lifetime was too long, can be at most {0} days")]
TooLong(u16),
#[error("Upload size was too large, can be at most {0} bytes")]
TooBig(u64),
#[error("File storage is full")]
Full,
#[error("Incorrect password")]
IncorrectPassword,
#[error("Websocket was closed by client before completing transfer")]
ClosedEarly(Option<CloseReason>),
#[error("Client sent more data than they were supposed to")]
UnexpectedExtraData,
}
impl Error {
fn close_code(&self) -> CloseCode {
match self {
Self::Storage(_) => CloseCode::Error,
Self::Parse(_)
| Self::UnexpectedMessageType
| Self::ClosedEarly(_)
| Self::UnexpectedExtraData => CloseCode::Invalid,
Self::DuplicateFilename
| Self::NoFiles
| Self::TooManyFiles
| Self::TooLong(_)
| Self::TooBig(_)
| Self::Full
| Self::IncorrectPassword => CloseCode::Policy,
}
}
}
pub struct Uploader {
writer: Option<Box<dyn Write>>,
storage_filename: String,
app_data: web::Data<AppData>,
bytes_remaining: u64,
ip_addr: String,
}
impl Uploader {
pub(crate) fn new(app_data: web::Data<AppData>, ip_addr: String) -> Self {
Self {
writer: None,
storage_filename: store::gen_storage_code(app_data.config.mnemonic_codes),
app_data,
bytes_remaining: 0,
ip_addr,
}
}
}
impl Actor for Uploader {
type Context = ws::WebsocketContext<Self>;
}
type Context = <Uploader as Actor>::Context;
pub use crate::state::v1::UploadedFile;
impl UploadedFile {
fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self {
Self {
name: sanitise(name, 0),
size,
modtime,
}
}
}
#[derive(Debug, Deserialize)]
struct RawUploadedFile {
name: String,
size: u64,
modtime: i64,
}
impl RawUploadedFile {
fn process(&self) -> UploadedFile {
UploadedFile::new(
&self.name,
self.size,
OffsetDateTime::from_unix_timestamp(self.modtime / 1000)
.unwrap_or_else(|_| OffsetDateTime::now_utc()),
)
}
}
#[derive(Deserialize)]
struct UploadManifest {
files: Vec<RawUploadedFile>,
lifetime: u16,
password: String,
collection_name: Option<String>,
}
#[derive(Serialize)]
#[serde(rename_all = "snake_case", tag = "type")]
enum ServerMessage {
Ready { code: String },
TooBig { max_size: u64 },
TooLong { max_lifetime: u16 },
IncorrectPassword,
Error { details: String },
}
impl From<&Error> for ServerMessage {
fn from(e: &Error) -> Self {
match e {
Error::TooBig(max_size) => ServerMessage::TooBig {
max_size: *max_size,
},
Error::TooLong(max_lifetime) => ServerMessage::TooLong {
max_lifetime: *max_lifetime,
},
Error::IncorrectPassword => ServerMessage::IncorrectPassword,
_ => ServerMessage::Error {
details: e.to_string(),
},
}
}
}
fn stop_and_flush<T>(_: T, u: &mut Uploader, ctx: &mut Context) {
ctx.stop();
if let Some(w) = u.writer.as_mut() {
if let Err(e) = w.flush() {
error!("Failed to flush writer: {}", e);
}
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
Ok(m) => m,
Err(e) => {
error!("Websocket error: {}", e);
self.cleanup_after_error(ctx);
return;
}
};
match self.handle_message(msg, ctx) {
Err(e) => {
self.notify_error_and_cleanup(e, ctx);
}
Ok(true) => {
info!("Finished uploading data");
ctx.close(Some(ws::CloseReason {
code: CloseCode::Normal,
description: None,
}));
stop_and_flush((), self, ctx);
}
_ => (),
}
}
}
impl Uploader {
fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut Context) {
error!("{}", e);
if let Error::IncorrectPassword = e {
log_auth_failure(&self.ip_addr);
}
ctx.text(serde_json::to_string(&ServerMessage::from(&e)).unwrap());
ctx.close(Some(ws::CloseReason {
code: e.close_code(),
description: Some(e.to_string()),
}));
self.cleanup_after_error(ctx);
}
fn handle_message(&mut self, msg: ws::Message, ctx: &mut Context) -> Result<bool, Error> {
trace!("Websocket message: {:?}", msg);
match msg {
ws::Message::Text(text) => {
if self.writer.is_some() {
return Err(Error::UnexpectedMessageType);
}
let UploadManifest {
files: raw_files,
lifetime,
password,
collection_name,
} = serde_json::from_slice(text.as_bytes())?;
if std::env::var("TRANSBEAM_UPLOAD_PASSWORD") != Ok(password) {
return Err(Error::IncorrectPassword);
}
if lifetime > self.app_data.config.max_lifetime {
return Err(Error::TooLong(self.app_data.config.max_lifetime));
}
info!("Received file list: {} files", raw_files.len());
debug!("{:?}", raw_files);
if raw_files.len() > MAX_FILES {
return Err(Error::TooManyFiles);
}
if raw_files.is_empty() {
return Err(Error::NoFiles);
}
let mut filenames: HashSet<String> = HashSet::new();
let mut files = Vec::new();
for raw_file in raw_files.iter() {
let mut file = raw_file.process();
while filenames.contains(&file.name) {
info!("Duplicate file name: {}", file.name);
if file.name.len() >= SanOptions::DEFAULT.length_limit {
return Err(Error::DuplicateFilename);
}
file.name.insert(0, '_');
}
filenames.insert(file.name.clone());
self.bytes_remaining += file.size;
files.push(file);
}
if self.bytes_remaining > self.app_data.config.max_upload_size {
return Err(Error::TooBig(self.app_data.config.max_upload_size));
}
let storage_path = self
.app_data
.config
.storage_dir
.join(self.storage_filename.clone());
info!("storing to: {:?}", storage_path);
let writer = File::options()
.write(true)
.create_new(true)
.open(&storage_path)?;
let (writer, name, size, modtime, contents): (Box<dyn Write>, _, _, _, _) =
if files.len() > 1 {
info!("Wrapping in zipfile generator");
let now = OffsetDateTime::now_utc();
let collection_name =
collection_name.map(|f| sanitise(&f, 4)).unwrap_or_else(|| {
super::APP_NAME.to_owned()
+ &now.format(FILENAME_DATE_FORMAT).unwrap()
});
let file_set = FileSet {
files,
directory_name: Some(collection_name.clone()),
};
let zip_writer = super::zip::ZipGenerator::new(file_set.clone(), writer);
let size = zip_writer.total_size();
(
Box::new(zip_writer),
collection_name + ".zip",
size,
now,
Some(file_set),
)
} else {
(
Box::new(writer),
files[0].name.clone(),
files[0].size,
files[0].modtime,
None,
)
};
self.writer = Some(writer);
let stored_file = StoredFileWithPassword {
file: StoredFile {
name,
size,
modtime,
expiry: OffsetDateTime::now_utc() + lifetime * time::Duration::DAY,
contents,
},
password: None,
};
let app_data = self.app_data.clone();
let storage_filename = self.storage_filename.clone();
ctx.spawn(
actix::fut::wrap_future(async move {
debug!("Spawned future to add entry {} to state", storage_filename);
app_data.add_file(storage_filename, stored_file).await
})
.map(|res, u: &mut Self, ctx: &mut Context| match res {
Ok(()) => ctx.text(
serde_json::to_string(&ServerMessage::Ready {
code: u.storage_filename.clone(),
})
.unwrap(),
),
Err(FileAddError::TooBig(size)) => {
u.notify_error_and_cleanup(Error::TooBig(size), ctx)
}
Err(FileAddError::Full) => u.notify_error_and_cleanup(Error::Full, ctx),
Err(FileAddError::FileSystem(e)) => {
u.notify_error_and_cleanup(Error::from(e), ctx)
}
}),
);
}
ws::Message::Binary(data)
| ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data))
| ws::Message::Continuation(Item::Last(data)) => {
return self.handle_data(data);
}
ws::Message::Close(reason) => {
if self.bytes_remaining > 0 {
return Err(Error::ClosedEarly(reason));
} else {
return Ok(true);
}
}
ws::Message::Ping(ping) => {
debug!("Ping received, ponging");
ctx.pong(&ping);
}
ws::Message::Nop | ws::Message::Pong(_) => (),
_ => {
return Err(Error::UnexpectedMessageType);
}
}
Ok(false)
}
fn handle_data(&mut self, data: Bytes) -> Result<bool, Error> {
if let Some(ref mut writer) = self.writer {
if (data.len() as u64) > self.bytes_remaining {
return Err(Error::UnexpectedExtraData);
}
self.bytes_remaining -= data.len() as u64;
writer.write_all(&data)?;
Ok(self.bytes_remaining == 0)
} else {
Err(Error::UnexpectedMessageType)
}
}
fn cleanup_after_error(&mut self, ctx: &mut Context) {
info!(
"Cleaning up after failed upload of {}",
self.storage_filename
);
let app_data = self.app_data.clone();
let filename = self.storage_filename.clone();
ctx.wait(
actix::fut::wrap_future(async move {
debug!("Spawned future to remove entry {} from state", filename);
app_data.remove_file(&filename).await.unwrap();
})
.map(stop_and_flush),
);
}
}