WIP file drop server, no downloads yet
This commit is contained in:
commit
20da86132b
12 changed files with 3294 additions and 0 deletions
228
src/upload.rs
Normal file
228
src/upload.rs
Normal file
|
@ -0,0 +1,228 @@
|
|||
use std::{collections::HashSet, fmt::Display, io::Write};
|
||||
|
||||
use actix::{Actor, StreamHandler, ActorContext, AsyncContext};
|
||||
use actix_http::ws::{Item, CloseReason};
|
||||
use actix_web_actors::ws::{self, CloseCode};
|
||||
use log::{error, debug, info, trace};
|
||||
use rand::distributions::{Alphanumeric, DistString};
|
||||
use serde::Deserialize;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::{UploadedFile, DownloadableFile, file::LiveWriter};
|
||||
|
||||
const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] =
|
||||
time::macros::format_description!("[year]-[month]-[day]-[hour][minute][second]");
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
enum Error {
|
||||
#[error("Failed to parse file metadata")]
|
||||
Parse(#[from] serde_json::Error),
|
||||
#[error("Error writing to stored file")]
|
||||
Storage(#[from] std::io::Error),
|
||||
#[error("Time formatting error")]
|
||||
TimeFormat(#[from] time::error::Format),
|
||||
#[error("Lock on app state is poisoned")]
|
||||
LockPoisoned,
|
||||
#[error("Duplicate filename could not be deduplicated")]
|
||||
DuplicateFilename,
|
||||
#[error("This message type was not expected at this stage")]
|
||||
UnexpectedMessageType,
|
||||
#[error("Metadata contained an empty list of files")]
|
||||
NoFiles,
|
||||
#[error("Websocket was closed by client before completing transfer")]
|
||||
ClosedEarly(Option<CloseReason>),
|
||||
#[error("Client sent more data than they were supposed to")]
|
||||
TooMuchData,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
fn close_code(&self) -> CloseCode {
|
||||
match self {
|
||||
Self::Parse(_) => CloseCode::Invalid,
|
||||
Self::Storage(_) => CloseCode::Error,
|
||||
Self::TimeFormat(_) => CloseCode::Error,
|
||||
Self::LockPoisoned => CloseCode::Error,
|
||||
Self::DuplicateFilename => CloseCode::Policy,
|
||||
Self::UnexpectedMessageType => CloseCode::Invalid,
|
||||
Self::NoFiles => CloseCode::Policy,
|
||||
Self::ClosedEarly(_) => CloseCode::Invalid,
|
||||
Self::TooMuchData => CloseCode::Invalid,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Uploader {
|
||||
writer: Option<Box<dyn LiveWriter>>,
|
||||
app_data: super::AppData,
|
||||
bytes_remaining: usize,
|
||||
}
|
||||
|
||||
impl Uploader {
|
||||
pub fn new(app_data: super::AppData) -> Self {
|
||||
Self {
|
||||
writer: None,
|
||||
app_data,
|
||||
bytes_remaining: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for Uploader {
|
||||
type Context = ws::WebsocketContext<Self>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RawUploadedFile {
|
||||
name: String,
|
||||
size: usize,
|
||||
modtime: i64,
|
||||
}
|
||||
|
||||
impl RawUploadedFile {
|
||||
fn process(&self) -> UploadedFile {
|
||||
UploadedFile::new(
|
||||
&self.name,
|
||||
self.size,
|
||||
OffsetDateTime::from_unix_timestamp(self.modtime / 1000)
|
||||
.unwrap_or_else(|_| OffsetDateTime::now_utc()),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
|
||||
let msg = match msg {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
error!("Websocket error: {:?}", e);
|
||||
ctx.stop();
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
match self.handle_message(msg, ctx) {
|
||||
Err(e) => {
|
||||
error!("{:?}", e);
|
||||
ctx.close(Some(ws::CloseReason {
|
||||
code: e.close_code(),
|
||||
description: Some(e.to_string()),
|
||||
}));
|
||||
ctx.stop();
|
||||
}
|
||||
Ok(true) => {
|
||||
info!("Finished uploading data");
|
||||
self.writer.as_mut().map(|w| w.flush());
|
||||
ctx.close(Some(ws::CloseReason {
|
||||
code: CloseCode::Normal,
|
||||
description: None,
|
||||
}));
|
||||
// self.app_data.write().unwrap().entry(
|
||||
ctx.stop();
|
||||
}
|
||||
_ => ()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ack(ctx: &mut <Uploader as Actor>::Context) {
|
||||
ctx.text("ack");
|
||||
}
|
||||
|
||||
impl Uploader {
|
||||
fn handle_message(&mut self, msg: ws::Message, ctx: &mut <Self as Actor>::Context) -> Result<bool, Error>{
|
||||
trace!("Websocket message: {:?}", msg);
|
||||
match msg {
|
||||
ws::Message::Text(text) => {
|
||||
if self.writer.is_some() {
|
||||
return Err(Error::UnexpectedMessageType)
|
||||
}
|
||||
let raw_files: Vec<RawUploadedFile> = serde_json::from_slice(text.as_bytes())?;
|
||||
info!("Received file list: {} files", raw_files.len());
|
||||
debug!("{:?}", raw_files);
|
||||
let mut filenames: HashSet<String> = HashSet::new();
|
||||
let mut files = Vec::new();
|
||||
for raw_file in raw_files.iter() {
|
||||
let mut file = raw_file.process();
|
||||
while filenames.contains(&file.name) {
|
||||
info!("Duplicate file name: {}", file.name);
|
||||
if file.name.len() >= sanitise_file_name::Options::DEFAULT.length_limit {
|
||||
return Err(Error::DuplicateFilename);
|
||||
}
|
||||
file.name.insert(0, '_');
|
||||
}
|
||||
filenames.insert(file.name.clone());
|
||||
self.bytes_remaining += file.size;
|
||||
files.push(file);
|
||||
}
|
||||
if files.is_empty() {
|
||||
return Err(Error::NoFiles);
|
||||
}
|
||||
let storage_filename = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
|
||||
let storage_path = super::storage_dir().join(storage_filename.clone());
|
||||
info!("storing to: {:?}", storage_path);
|
||||
let writer = super::file::LiveFileWriter::new(&storage_path)?;
|
||||
if files.len() > 1 {
|
||||
info!("Wrapping in zipfile generator");
|
||||
let now = OffsetDateTime::now_utc();
|
||||
let writer = super::zip::ZipGenerator::new(files, Box::new(writer));
|
||||
let size = writer.total_size();
|
||||
self.writer = Some(Box::new(writer));
|
||||
let download_filename = super::app_name()
|
||||
+ &now.format(FILENAME_DATE_FORMAT)?
|
||||
+ ".zip";
|
||||
let modtime = now;
|
||||
self.app_data.write().map_err(|_| Error::LockPoisoned)?.insert(storage_filename, DownloadableFile {
|
||||
name: download_filename,
|
||||
size,
|
||||
modtime,
|
||||
uploader: Some(ctx.address()),
|
||||
});
|
||||
} else {
|
||||
self.writer = Some(Box::new(writer));
|
||||
self.app_data.write().map_err(|_| Error::LockPoisoned)?.insert(storage_filename, DownloadableFile {
|
||||
name: files[0].name.clone(),
|
||||
size: files[0].size,
|
||||
modtime: files[0].modtime,
|
||||
uploader: Some(ctx.address()),
|
||||
});
|
||||
}
|
||||
ack(ctx);
|
||||
}
|
||||
ws::Message::Binary(data)
|
||||
| ws::Message::Continuation(Item::FirstBinary(data))
|
||||
| ws::Message::Continuation(Item::Continue(data))
|
||||
| ws::Message::Continuation(Item::Last(data)) =>
|
||||
{
|
||||
if let Some(ref mut writer) = self.writer {
|
||||
if data.len() > self.bytes_remaining {
|
||||
return Err(Error::TooMuchData);
|
||||
}
|
||||
self.bytes_remaining -= data.len();
|
||||
writer.write_all(&data)?;
|
||||
ack(ctx);
|
||||
if self.bytes_remaining == 0 {
|
||||
return Ok(true);
|
||||
}
|
||||
} else {
|
||||
return Err(Error::UnexpectedMessageType);
|
||||
}
|
||||
}
|
||||
ws::Message::Close(reason) => {
|
||||
if self.bytes_remaining > 0 {
|
||||
return Err(Error::ClosedEarly(reason));
|
||||
} else {
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
ws::Message::Ping(ping) => {
|
||||
debug!("Ping received, ponging");
|
||||
ctx.pong(&ping);
|
||||
}
|
||||
ws::Message::Nop | ws::Message::Pong(_) => (),
|
||||
_ => {
|
||||
return Err(Error::UnexpectedMessageType);
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue