implement downloads, lots of misc tweaks/fixes

This commit is contained in:
xenofem 2022-04-27 20:15:51 -04:00
parent 18caa2baf1
commit 2ec295e606
7 changed files with 354 additions and 854 deletions

View file

@ -1,8 +1,9 @@
use std::{collections::HashSet, fmt::Display, io::Write, task::Waker};
use std::{collections::HashSet, io::Write, task::Waker};
use actix::{Actor, ActorContext, AsyncContext, StreamHandler, Message, Handler};
use actix_http::ws::{CloseReason, Item};
use actix_web_actors::ws::{self, CloseCode};
use bytes::Bytes;
use log::{debug, error, info, trace};
use rand::distributions::{Alphanumeric, DistString};
use serde::Deserialize;
@ -21,8 +22,6 @@ enum Error {
Storage(#[from] std::io::Error),
#[error("Time formatting error")]
TimeFormat(#[from] time::error::Format),
#[error("Lock on app state is poisoned")]
LockPoisoned,
#[error("Duplicate filename could not be deduplicated")]
DuplicateFilename,
#[error("This message type was not expected at this stage")]
@ -41,7 +40,6 @@ impl Error {
Self::Parse(_) => CloseCode::Invalid,
Self::Storage(_) => CloseCode::Error,
Self::TimeFormat(_) => CloseCode::Error,
Self::LockPoisoned => CloseCode::Error,
Self::DuplicateFilename => CloseCode::Policy,
Self::UnexpectedMessageType => CloseCode::Invalid,
Self::NoFiles => CloseCode::Policy,
@ -55,7 +53,7 @@ pub struct Uploader {
writer: Option<Box<dyn LiveWriter>>,
storage_filename: String,
app_data: super::AppData,
bytes_remaining: usize,
bytes_remaining: u64,
}
impl Uploader {
@ -87,7 +85,7 @@ impl Handler<WakerMessage> for Uploader {
#[derive(Debug, Deserialize)]
struct RawUploadedFile {
name: String,
size: usize,
size: u64,
modtime: i64,
}
@ -129,9 +127,13 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
code: CloseCode::Normal,
description: None,
}));
if let Some(f) = self.app_data.write().unwrap().get_mut(&self.storage_filename) {
f.uploader.take();
}
let data = self.app_data.clone();
let filename = self.storage_filename.clone();
ctx.wait(actix::fut::wrap_future(async move {
if let Some(f) = data.write().await.get_mut(&filename) {
f.uploader.take();
}
}));
ctx.stop();
}
_ => (),
@ -181,61 +183,51 @@ impl Uploader {
let storage_path = super::storage_dir().join(storage_filename.clone());
info!("storing to: {:?}", storage_path);
let writer = super::file::LiveFileWriter::new(&storage_path)?;
if files.len() > 1 {
let addr = Some(ctx.address());
let (writer, downloadable_file): (Box<dyn LiveWriter>, _) = if files.len() > 1 {
info!("Wrapping in zipfile generator");
let now = OffsetDateTime::now_utc();
let writer = super::zip::ZipGenerator::new(files, Box::new(writer));
let size = writer.total_size();
self.writer = Some(Box::new(writer));
let zip_writer = super::zip::ZipGenerator::new(files, writer);
let size = zip_writer.total_size();
let download_filename =
super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip";
let modtime = now;
self.app_data
.write()
.map_err(|_| Error::LockPoisoned)?
.insert(
storage_filename,
DownloadableFile {
name: download_filename,
size,
modtime,
uploader: Some(ctx.address()),
},
);
(Box::new(zip_writer), DownloadableFile {
name: download_filename,
size,
modtime: now,
uploader: addr,
})
} else {
self.writer = Some(Box::new(writer));
self.app_data
(Box::new(writer), DownloadableFile {
name: files[0].name.clone(),
size: files[0].size,
modtime: files[0].modtime,
uploader: addr,
})
};
self.writer = Some(writer);
let data = self.app_data.clone();
let storage_filename_copy = storage_filename.clone();
ctx.spawn(actix::fut::wrap_future(async move {
data
.write()
.map_err(|_| Error::LockPoisoned)?
.await
.insert(
storage_filename,
DownloadableFile {
name: files[0].name.clone(),
size: files[0].size,
modtime: files[0].modtime,
uploader: Some(ctx.address()),
},
storage_filename_copy,
downloadable_file,
);
}
}));
ctx.text(self.storage_filename.as_str());
}
ws::Message::Binary(data)
| ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data))
| ws::Message::Continuation(Item::Last(data)) => {
if let Some(ref mut writer) = self.writer {
if data.len() > self.bytes_remaining {
return Err(Error::TooMuchData);
}
self.bytes_remaining -= data.len();
writer.write_all(&data)?;
ack(ctx);
if self.bytes_remaining == 0 {
return Ok(true);
}
} else {
return Err(Error::UnexpectedMessageType);
}
let result = self.handle_data(data)?;
ack(ctx);
return Ok(result);
}
ws::Message::Continuation(Item::FirstBinary(data))
| ws::Message::Continuation(Item::Continue(data)) => {
return self.handle_data(data);
}
ws::Message::Close(reason) => {
if self.bytes_remaining > 0 {
@ -255,4 +247,17 @@ impl Uploader {
}
Ok(false)
}
fn handle_data(&mut self, data: Bytes) -> Result<bool, Error> {
if let Some(ref mut writer) = self.writer {
if (data.len() as u64) > self.bytes_remaining {
return Err(Error::TooMuchData);
}
self.bytes_remaining -= data.len() as u64;
writer.write_all(&data)?;
Ok(self.bytes_remaining == 0)
} else {
Err(Error::UnexpectedMessageType)
}
}
}