added file expiration and fleshed out the API a bit

This commit is contained in:
xenofem 2022-04-30 01:38:26 -04:00
parent cc0aaaab94
commit f52aa0f08b
9 changed files with 296 additions and 103 deletions

53
API.md Normal file
View file

@ -0,0 +1,53 @@
# transbeam websocket api
- After opening the connection, the client sends an upload manifest to
the server. This is a JSON object containing the following keys:
- `files`: a list of metadata objects for all the files to be
uploaded, in the exact order they will be sent. This list must
contain at least 1 file and at most 256 files. Each file metadata
object has the following keys, all required:
- `name`: The name of the file. This will be sanitised on the
server side, but the sanitisation library isn't especially
restrictive; most Unicode code points will be allowed through
as-is.
- `size`: The exact size of the file, in bytes.
- `modtime`: The modification time of the file, as milliseconds
since the Unix epoch.
- `lifetime`: an integer number of days the files should be kept
for.
- Once the server receives the metadata, it will respond with a
JSON-encoded object containing at least the field `type`, and
possibly other fields as well. The types of message, and their
associated extra fields if any, are as follows:
- `ready`: The server will accept the upload and is ready to receive
data.
- `code`: A code string that can be used to download the files,
starting now.
- `too_big`: The upload is rejected because the total size of the
files is bigger than the server is willing to accept.
- `max_size`: The maximum total upload size the server will
accept. This is subject to change if the admin changes the
config, or if the server's storage space is filling up.
- `too_long`: The upload is rejected because the requested lifetime
is longer than the server will allow.
- `max_days`: The maximum number of days the client can request
files be kept for.
- `error`: A miscellaneous error has occurred.
- `details`: A string with more information about the error.
If the message type is anything other than `ready`, the connection
will be closed by the server.
- If the server is ready to receive files, the client begins sending
chunks of data from the files, as raw binary blobs. The client must
transmit each file's data in order from start to finish, and must
transmit the files in the same order they were listed in the
metadata. The size of the chunks isn't currently specified, and
it's fine for a chunk to span the end of one file and the start of
the next. After sending each chunk (that is, each complete
websocket message), the client must wait for the server to
acknowledge the chunk by sending back the string "ack", and then
send the next chunk if there is one. Once all chunks have been sent
and acknowledged, or once the server has sent a message other than
"ack" to indicate an error, the connection will be closed.

View file

@ -30,14 +30,14 @@ use actix_web::{
use actix_files::HttpRange;
use crate::DownloadableFile;
use crate::store::StoredFile;
// This is copied substantially from actix-files, with some tweaks
pub(crate) struct DownloadingFile {
pub(crate) file: File,
pub(crate) storage_path: PathBuf,
pub(crate) info: DownloadableFile,
pub(crate) info: StoredFile,
}
impl DownloadingFile {

View file

@ -1,5 +1,5 @@
mod download;
mod state;
mod store;
mod upload;
mod util;
mod zip;
@ -10,42 +10,13 @@ use actix_web::{
get, middleware::Logger, web, App, HttpRequest, HttpResponse, HttpServer, Responder,
};
use actix_web_actors::ws;
use serde::{Deserialize, Serialize};
use state::PersistentState;
use time::OffsetDateTime;
use log::error;
use store::FileStore;
use tokio::sync::RwLock;
const APP_NAME: &str = "transbeam";
pub struct UploadedFile {
name: String,
size: u64,
modtime: OffsetDateTime,
}
impl UploadedFile {
fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self {
Self {
name: sanitise_file_name::sanitise(name),
size,
modtime,
}
}
}
#[derive(Clone, Deserialize, Serialize)]
pub struct DownloadableFile {
name: String,
size: u64,
#[serde(with = "state::timestamp")]
modtime: OffsetDateTime,
}
type AppData = web::Data<RwLock<PersistentState>>;
fn storage_dir() -> PathBuf {
PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from("storage")))
}
type AppData = web::Data<RwLock<FileStore>>;
#[get("/download/{file_code}")]
async fn handle_download(
@ -60,7 +31,7 @@ async fn handle_download(
let data = data.read().await;
let info = data.lookup_file(&file_code);
if let Some(info) = info {
let storage_path = storage_dir().join(file_code);
let storage_path = store::storage_dir().join(file_code);
let file = File::open(&storage_path)?;
Ok(download::DownloadingFile {
file,
@ -82,7 +53,8 @@ async fn handle_upload(req: HttpRequest, stream: web::Payload, data: AppData) ->
async fn main() -> std::io::Result<()> {
env_logger::init();
let data: AppData = web::Data::new(RwLock::new(PersistentState::load().await?));
let data: AppData = web::Data::new(RwLock::new(FileStore::load().await?));
start_reaper(data.clone());
let static_dir =
PathBuf::from(std::env::var("STATIC_DIR").unwrap_or_else(|_| String::from("static")));
@ -104,3 +76,16 @@ async fn main() -> std::io::Result<()> {
.await?;
Ok(())
}
fn start_reaper(data: AppData) {
std::thread::spawn(move || {
actix_web::rt::System::new().block_on(async {
loop {
actix_web::rt::time::sleep(core::time::Duration::from_secs(86400)).await;
if let Err(e) = data.write().await.remove_expired_files().await {
error!("Error reaping expired files: {}", e);
}
}
});
});
}

View file

@ -1,14 +1,50 @@
use std::{collections::HashMap, io::ErrorKind};
use std::{collections::HashMap, io::ErrorKind, path::PathBuf, str::FromStr};
use log::{debug, error, info, warn};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
};
use crate::{storage_dir, DownloadableFile};
const STATE_FILE_NAME: &str = "files.json";
const DEFAULT_STORAGE_DIR: &str = "storage";
const DEFAULT_MAX_LIFETIME: u32 = 30;
const GIGA: u64 = 1024*1024*1024;
const DEFAULT_MAX_SINGLE_SIZE: u64 = 16*GIGA;
const DEFAULT_MAX_TOTAL_SIZE: u64 = 64*GIGA;
pub(crate) fn storage_dir() -> PathBuf {
PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from(DEFAULT_STORAGE_DIR)))
}
fn parse_env_var<T: FromStr>(var: &str, default: T) -> T {
std::env::var(var).ok().and_then(|val| val.parse::<T>().ok()).unwrap_or(default)
}
pub(crate) fn max_lifetime() -> u32 {
parse_env_var("TRANSBEAM_MAX_LIFETIME", DEFAULT_MAX_LIFETIME)
}
pub(crate) fn max_single_size() -> u64 {
parse_env_var("TRANSBEAM_MAX_SINGLE_FILE_SIZE", DEFAULT_MAX_SINGLE_SIZE)
}
pub(crate) fn max_total_size() -> u64 {
parse_env_var("TRANSBEAM_MAX_TOTAL_FILE_SIZE", DEFAULT_MAX_TOTAL_SIZE)
}
#[derive(Clone, Deserialize, Serialize)]
pub struct StoredFile {
pub name: String,
pub size: u64,
#[serde(with = "timestamp")]
pub modtime: OffsetDateTime,
#[serde(with = "timestamp")]
pub expiry: OffsetDateTime,
}
pub(crate) mod timestamp {
use core::fmt;
@ -51,11 +87,12 @@ pub(crate) mod timestamp {
}
}
async fn is_valid_entry(key: &str, info: &DownloadableFile) -> bool {
if !crate::util::is_ascii_alphanumeric(key) {
error!("Invalid key in persistent storage: {}", key);
async fn is_valid_entry(key: &str, info: &StoredFile) -> bool {
if info.expiry < OffsetDateTime::now_utc() {
info!("File {} has expired", key);
return false;
}
let file = if let Ok(f) = File::open(storage_dir().join(&key)).await {
f
} else {
@ -81,24 +118,31 @@ async fn is_valid_entry(key: &str, info: &DownloadableFile) -> bool {
true
}
pub(crate) struct PersistentState(HashMap<String, DownloadableFile>);
impl PersistentState {
pub(crate) struct FileStore(HashMap<String, StoredFile>);
impl FileStore {
pub(crate) async fn load() -> std::io::Result<Self> {
let open_result = File::open(storage_dir().join(STATE_FILE_NAME)).await;
match open_result {
Ok(mut f) => {
let mut buf = String::new();
f.read_to_string(&mut buf).await?;
let map: HashMap<String, DownloadableFile> = serde_json::from_str(&buf)?;
let map: HashMap<String, StoredFile> = serde_json::from_str(&buf)?;
info!("Loaded {} file entries from persistent storage", map.len());
let mut filtered: HashMap<String, DownloadableFile> = HashMap::new();
let mut filtered: HashMap<String, StoredFile> = HashMap::new();
for (key, info) in map.into_iter() {
// Handle this case separately, because we don't
// want to try to delete it if it's not the sort
// of path we're expecting
if !crate::util::is_ascii_alphanumeric(&key) {
error!("Invalid key in persistent storage: {}", key);
continue;
}
if is_valid_entry(&key, &info).await {
filtered.insert(key, info);
} else {
info!("Deleting invalid file {}", key);
info!("Deleting file {}", key);
if let Err(e) = tokio::fs::remove_file(storage_dir().join(&key)).await {
warn!("Failed to delete invalid file {}: {}", key, e);
warn!("Failed to delete file {}: {}", key, e);
}
}
}
@ -116,6 +160,10 @@ impl PersistentState {
}
}
fn total_size(&self) -> u64 {
self.0.iter().fold(0, |acc, (_, f)| acc + f.size)
}
async fn save(&mut self) -> std::io::Result<()> {
info!("saving updated state: {} entries", self.0.len());
File::create(storage_dir().join(STATE_FILE_NAME))
@ -124,16 +172,22 @@ impl PersistentState {
.await
}
/// Attempts to add a file to the store. Returns an I/O error if
/// something's broken, or a u64 of the maximum allowed file size
/// if the file was too big, or a unit if everything worked.
pub(crate) async fn add_file(
&mut self,
key: String,
file: DownloadableFile,
) -> std::io::Result<()> {
file: StoredFile,
) -> std::io::Result<Result<(), u64>> {
let remaining_size = max_total_size().saturating_sub(self.total_size());
let allowed_size = std::cmp::min(remaining_size, max_single_size());
if file.size > allowed_size { return Ok(Err(allowed_size)); }
self.0.insert(key, file);
self.save().await
self.save().await.map(Ok)
}
pub(crate) fn lookup_file(&self, key: &str) -> Option<DownloadableFile> {
pub(crate) fn lookup_file(&self, key: &str) -> Option<StoredFile> {
self.0.get(key).cloned()
}
@ -142,4 +196,20 @@ impl PersistentState {
self.0.remove(key);
self.save().await
}
pub(crate) async fn remove_expired_files(&mut self) -> std::io::Result<()> {
info!("Checking for expired files");
let now = OffsetDateTime::now_utc();
for (key, file) in std::mem::replace(&mut self.0, HashMap::new()).into_iter() {
if file.expiry > now {
self.0.insert(key, file);
} else {
info!("Deleting expired file {}", key);
if let Err(e) = tokio::fs::remove_file(storage_dir().join(&key)).await {
warn!("Failed to delete expired file {}: {}", key, e);
}
}
}
self.save().await
}
}

View file

@ -6,10 +6,10 @@ use actix_web_actors::ws::{self, CloseCode};
use bytes::Bytes;
use log::{debug, error, info, trace};
use rand::distributions::{Alphanumeric, DistString};
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use crate::{storage_dir, DownloadableFile, UploadedFile};
use crate::store::{storage_dir, StoredFile, self};
const MAX_FILES: usize = 256;
const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] =
@ -31,6 +31,10 @@ enum Error {
NoFiles,
#[error("Number of files submitted by client exceeded the maximum limit")]
TooManyFiles,
#[error("Requested lifetime was too long")]
TooLong,
#[error("Upload size was too large, can be at most {0} bytes")]
TooBig(u64),
#[error("Websocket was closed by client before completing transfer")]
ClosedEarly(Option<CloseReason>),
#[error("Client sent more data than they were supposed to")]
@ -40,15 +44,17 @@ enum Error {
impl Error {
fn close_code(&self) -> CloseCode {
match self {
Self::Parse(_) => CloseCode::Invalid,
Self::Storage(_) => CloseCode::Error,
Self::TimeFormat(_) => CloseCode::Error,
Self::DuplicateFilename => CloseCode::Policy,
Self::UnexpectedMessageType => CloseCode::Invalid,
Self::NoFiles => CloseCode::Policy,
Self::TooManyFiles => CloseCode::Policy,
Self::ClosedEarly(_) => CloseCode::Invalid,
Self::UnexpectedExtraData => CloseCode::Invalid,
Self::Storage(_)
| Self::TimeFormat(_) => CloseCode::Error,
Self::Parse(_)
| Self::UnexpectedMessageType
| Self::ClosedEarly(_)
| Self::UnexpectedExtraData => CloseCode::Invalid,
Self::DuplicateFilename
| Self::NoFiles
| Self::TooManyFiles
| Self::TooLong
| Self::TooBig(_) => CloseCode::Policy,
}
}
}
@ -64,7 +70,7 @@ impl Uploader {
pub(crate) fn new(app_data: super::AppData) -> Self {
Self {
writer: None,
storage_filename: String::new(),
storage_filename: Alphanumeric.sample_string(&mut rand::thread_rng(), 8),
app_data,
bytes_remaining: 0,
}
@ -75,6 +81,22 @@ impl Actor for Uploader {
type Context = ws::WebsocketContext<Self>;
}
pub struct UploadedFile {
pub name: String,
pub size: u64,
pub modtime: OffsetDateTime,
}
impl UploadedFile {
fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self {
Self {
name: sanitise_file_name::sanitise(name),
size,
modtime,
}
}
}
#[derive(Debug, Deserialize)]
struct RawUploadedFile {
name: String,
@ -93,6 +115,31 @@ impl RawUploadedFile {
}
}
#[derive(Deserialize)]
struct UploadManifest {
files: Vec<RawUploadedFile>,
lifetime: u32,
}
#[derive(Serialize)]
#[serde(rename_all = "snake_case", tag = "type")]
enum ServerMessage {
Ready { code: String },
TooBig { max_size: u64 },
TooLong { max_days: u32 },
Error { details: String },
}
impl From<&Error> for ServerMessage {
fn from(e: &Error) -> Self {
match e {
Error::TooBig(max_size) => ServerMessage::TooBig { max_size: *max_size },
Error::TooLong => ServerMessage::TooLong { max_days: store::max_lifetime() },
_ => ServerMessage::Error { details: e.to_string() },
}
}
}
fn stop_and_flush<T>(_: T, u: &mut Uploader, ctx: &mut <Uploader as Actor>::Context) {
ctx.stop();
if let Some(w) = u.writer.as_mut() {
@ -115,12 +162,7 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
match self.handle_message(msg, ctx) {
Err(e) => {
error!("{}", e);
ctx.close(Some(ws::CloseReason {
code: e.close_code(),
description: Some(e.to_string()),
}));
self.cleanup_after_error(ctx);
self.notify_error_and_cleanup(e, ctx);
}
Ok(true) => {
info!("Finished uploading data");
@ -140,6 +182,16 @@ fn ack(ctx: &mut <Uploader as Actor>::Context) {
}
impl Uploader {
fn notify_error_and_cleanup(&mut self, e: Error, ctx: &mut <Self as Actor>::Context) {
error!("{}", e);
ctx.text(serde_json::to_string(&ServerMessage::from(&e)).unwrap());
ctx.close(Some(ws::CloseReason {
code: e.close_code(),
description: Some(e.to_string()),
}));
self.cleanup_after_error(ctx);
}
fn handle_message(
&mut self,
msg: ws::Message,
@ -151,12 +203,18 @@ impl Uploader {
if self.writer.is_some() {
return Err(Error::UnexpectedMessageType);
}
let raw_files: Vec<RawUploadedFile> = serde_json::from_slice(text.as_bytes())?;
let UploadManifest { files: raw_files, lifetime, } = serde_json::from_slice(text.as_bytes())?;
if lifetime > store::max_lifetime() {
return Err(Error::TooLong);
}
info!("Received file list: {} files", raw_files.len());
debug!("{:?}", raw_files);
if raw_files.len() > MAX_FILES {
return Err(Error::TooManyFiles);
}
if raw_files.is_empty() {
return Err(Error::NoFiles);
}
let mut filenames: HashSet<String> = HashSet::new();
let mut files = Vec::new();
for raw_file in raw_files.iter() {
@ -172,18 +230,13 @@ impl Uploader {
self.bytes_remaining += file.size;
files.push(file);
}
if files.is_empty() {
return Err(Error::NoFiles);
}
let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
self.storage_filename = storage_filename.clone();
let storage_path = storage_dir().join(storage_filename.clone());
let storage_path = storage_dir().join(self.storage_filename.clone());
info!("storing to: {:?}", storage_path);
let writer = File::options()
.write(true)
.create_new(true)
.open(&storage_path)?;
let (writer, downloadable_file): (Box<dyn Write>, _) = if files.len() > 1 {
let (writer, name, size, modtime): (Box<dyn Write>,_,_,_) = if files.len() > 1 {
info!("Wrapping in zipfile generator");
let now = OffsetDateTime::now_utc();
let zip_writer = super::zip::ZipGenerator::new(files, writer);
@ -192,33 +245,40 @@ impl Uploader {
super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip";
(
Box::new(zip_writer),
DownloadableFile {
name: download_filename,
size,
modtime: now,
},
download_filename,
size,
now,
)
} else {
(
Box::new(writer),
DownloadableFile {
name: files[0].name.clone(),
size: files[0].size,
modtime: files[0].modtime,
},
files[0].name.clone(),
files[0].size,
files[0].modtime,
)
};
self.writer = Some(writer);
let stored_file = StoredFile {
name,
size,
modtime,
expiry: OffsetDateTime::now_utc() + lifetime*time::Duration::DAY,
};
let data = self.app_data.clone();
let storage_filename = self.storage_filename.clone();
ctx.spawn(actix::fut::wrap_future(async move {
debug!("Spawned future to add entry {} to state", storage_filename);
data.write()
.await
.add_file(storage_filename, downloadable_file)
.add_file(storage_filename, stored_file)
.await
.unwrap();
}).map(|res, u: &mut Self, ctx: &mut <Self as Actor>::Context| {
match res {
Ok(Ok(())) => ctx.text(serde_json::to_string(&ServerMessage::Ready { code: u.storage_filename.clone() }).unwrap()),
Ok(Err(size)) => u.notify_error_and_cleanup(Error::TooBig(size), ctx),
Err(e) => u.notify_error_and_cleanup(Error::from(e), ctx)
}
}));
ctx.text(self.storage_filename.as_str());
}
ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => {
let result = self.handle_data(data)?;

View file

@ -4,7 +4,7 @@ use crc32fast::Hasher;
use log::debug;
use time::OffsetDateTime;
use crate::UploadedFile;
use crate::upload::UploadedFile;
const SIGNATURE_SIZE: u64 = 4;
const SHARED_FIELDS_SIZE: u64 = 26;

View file

@ -18,6 +18,17 @@
<noscript>This page requires Javascript :(</noscript>
<button id="upload">Upload</button>
<div id="lifetime_container" style="display: none;">
<label>
Keep files for:
<select id="lifetime">
<option value="1">1 day</option>
<option value="7">1 week</option>
<option value="14" selected>2 weeks</option>
<option value="30">1 month</option>
</select>
</label>
</div>
<div id="download_link_container" style="display: none;">
<div id="download_link_main">
<div>Download link: <span id="download_link"></span></div><div class="copy_button"></div>

View file

@ -133,6 +133,10 @@ button:disabled, input:disabled + .fake_button {
cursor: not-allowed;
}
#lifetime_container {
margin-top: 10px;
}
#footer {
margin-top: 30px;
}

View file

@ -8,6 +8,9 @@ const fileInputMessage = document.getElementById('file_input_message');
const fileListContainer = document.getElementById('file_list_container');
const fileList = document.getElementById('file_list');
const lifetimeContainer = document.getElementById('lifetime_container');
const lifetimeInput = document.getElementById('lifetime');
const uploadButton = document.getElementById('upload');
const downloadLinkContainer = document.getElementById('download_link_container');
@ -27,13 +30,13 @@ let byteIndex = 0;
let bytesSent = 0;
let totalBytes = 0;
function sendMetadata() {
const metadata = files.map((file) => ({
function sendManifest(lifetime) {
const fileMetadata = files.map((file) => ({
name: file.name,
size: file.size,
modtime: file.lastModified,
}));
socket.send(JSON.stringify(metadata));
socket.send(JSON.stringify({ lifetime, files: fileMetadata }));
}
function finishSending() {
@ -95,11 +98,13 @@ function updateFiles() {
fileInputMessage.textContent = 'Select files to upload...';
fileListContainer.style.display = 'none';
uploadButton.style.display = 'none';
lifetimeContainer.style.display = 'none';
} else {
fileInputMessage.textContent = 'Select more files to upload...';
fileListContainer.style.display = '';
uploadButton.textContent = `Upload ${files.length} file${files.length > 1 ? 's' : ''} (${displaySize(totalBytes)})`;
uploadButton.style.display = '';
lifetimeContainer.style.display = '';
}
fileInput.disabled = (files.length >= MAX_FILES);
}
@ -156,20 +161,25 @@ fileInput.addEventListener('input', (e) => {
uploadButton.addEventListener('click', (e) => {
if (files.length === 0) { return; }
const lifetime = parseInt(lifetimeInput.value);
lifetimeContainer.remove();
fileInputContainer.remove();
for (const button of Array.from(document.getElementsByTagName('button')).concat(...document.getElementsByClassName('file_delete'))) {
button.remove();
}
socket = new WebSocket(`${window.location.protocol === 'http:' ? 'ws' : 'wss'}://${window.location.host}/upload`);
socket.addEventListener('open', sendMetadata);
socket.addEventListener('open', () => sendManifest(lifetime));
socket.addEventListener('message', (msg) => {
if (bytesSent === 0 && msg.data.match(/^[A-Za-z0-9]+$/)) {
downloadLink.textContent = `${window.location.origin}/download/${msg.data}`;
downloadLinkContainer.style.display = '';
updateProgress();
progressContainer.style.display = '';
sendData();
if (bytesSent === 0) {
const reply = JSON.parse(msg.data);
if (reply.type === 'ready' && reply.code.match(/^[A-Za-z0-9]+$/)) {
downloadLink.textContent = `${window.location.origin}/download/${reply.code}`;
downloadLinkContainer.style.display = '';
updateProgress();
progressContainer.style.display = '';
sendData();
}
} else if (msg.data === 'ack') {
sendData();
}