Compare commits
No commits in common. "bda6da33e8494f8d5c024c46b6f1c8b8e0e47d1f" and "70384b04c3824c431f467b94893035997b7f5dbc" have entirely different histories.
bda6da33e8
...
70384b04c3
|
@ -4,8 +4,8 @@ use actix_web::{
|
||||||
body::{self, BoxBody, SizedStream},
|
body::{self, BoxBody, SizedStream},
|
||||||
http::{
|
http::{
|
||||||
header::{
|
header::{
|
||||||
self, ContentDisposition, DispositionParam, DispositionType, EntityTag, HeaderValue,
|
self, ContentDisposition, DispositionParam,
|
||||||
HttpDate, IfMatch, IfModifiedSince, IfNoneMatch, IfUnmodifiedSince,
|
DispositionType, HeaderValue, HttpDate, EntityTag, IfUnmodifiedSince, IfMatch, IfNoneMatch, IfModifiedSince,
|
||||||
},
|
},
|
||||||
StatusCode,
|
StatusCode,
|
||||||
},
|
},
|
||||||
|
@ -52,7 +52,7 @@ impl DownloadingFile {
|
||||||
ContentDisposition {
|
ContentDisposition {
|
||||||
disposition: DispositionType::Attachment,
|
disposition: DispositionType::Attachment,
|
||||||
parameters: vec![DispositionParam::Filename(self.info.name)],
|
parameters: vec![DispositionParam::Filename(self.info.name)],
|
||||||
},
|
}
|
||||||
));
|
));
|
||||||
res.insert_header((header::LAST_MODIFIED, last_modified));
|
res.insert_header((header::LAST_MODIFIED, last_modified));
|
||||||
res.insert_header((header::ETAG, etag));
|
res.insert_header((header::ETAG, etag));
|
||||||
|
@ -76,12 +76,7 @@ impl DownloadingFile {
|
||||||
|
|
||||||
res.insert_header((
|
res.insert_header((
|
||||||
header::CONTENT_RANGE,
|
header::CONTENT_RANGE,
|
||||||
format!(
|
format!("bytes {}-{}/{}", offset, offset + length - 1, self.info.size),
|
||||||
"bytes {}-{}/{}",
|
|
||||||
offset,
|
|
||||||
offset + length - 1,
|
|
||||||
self.info.size
|
|
||||||
),
|
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
res.insert_header((header::CONTENT_RANGE, format!("bytes */{}", length)));
|
res.insert_header((header::CONTENT_RANGE, format!("bytes */{}", length)));
|
||||||
|
@ -127,9 +122,7 @@ fn precondition_failed(req: &HttpRequest, etag: &EntityTag, last_modified: &Http
|
||||||
|
|
||||||
fn not_modified(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool {
|
fn not_modified(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool {
|
||||||
match req.get_header::<IfNoneMatch>() {
|
match req.get_header::<IfNoneMatch>() {
|
||||||
Some(IfNoneMatch::Any) => {
|
Some(IfNoneMatch::Any) => { return true; }
|
||||||
return true;
|
|
||||||
}
|
|
||||||
Some(IfNoneMatch::Items(ref items)) => {
|
Some(IfNoneMatch::Items(ref items)) => {
|
||||||
return items.iter().any(|item| item.weak_eq(etag));
|
return items.iter().any(|item| item.weak_eq(etag));
|
||||||
}
|
}
|
||||||
|
|
47
src/file.rs
47
src/file.rs
|
@ -1,12 +1,4 @@
|
||||||
use std::{
|
use std::{cmp, fs::File, future::Future, io::{self, Write}, path::PathBuf, pin::Pin, task::{Context, Poll, Waker}};
|
||||||
cmp,
|
|
||||||
fs::File,
|
|
||||||
future::Future,
|
|
||||||
io::{self, Write},
|
|
||||||
path::PathBuf,
|
|
||||||
pin::Pin,
|
|
||||||
task::{Context, Poll, Waker},
|
|
||||||
};
|
|
||||||
|
|
||||||
use actix::Addr;
|
use actix::Addr;
|
||||||
use actix_web::error::{Error, ErrorInternalServerError};
|
use actix_web::error::{Error, ErrorInternalServerError};
|
||||||
|
@ -61,6 +53,7 @@ impl Write for LiveFileWriter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// This implementation of a file responder is copied pretty directly
|
// This implementation of a file responder is copied pretty directly
|
||||||
// from actix-files with some tweaks
|
// from actix-files with some tweaks
|
||||||
|
|
||||||
|
@ -111,19 +104,13 @@ async fn live_file_reader_callback(
|
||||||
use io::{Read as _, Seek as _};
|
use io::{Read as _, Seek as _};
|
||||||
|
|
||||||
let res = actix_web::web::block(move || {
|
let res = actix_web::web::block(move || {
|
||||||
trace!(
|
trace!("reading up to {} bytes of file starting at {}", max_bytes, offset);
|
||||||
"reading up to {} bytes of file starting at {}",
|
|
||||||
max_bytes,
|
|
||||||
offset
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut buf = Vec::with_capacity(max_bytes);
|
let mut buf = Vec::with_capacity(max_bytes);
|
||||||
|
|
||||||
file.seek(io::SeekFrom::Start(offset))?;
|
file.seek(io::SeekFrom::Start(offset))?;
|
||||||
|
|
||||||
let n_bytes = std::io::Read::by_ref(&mut file)
|
let n_bytes = std::io::Read::by_ref(&mut file).take(max_bytes as u64).read_to_end(&mut buf)?;
|
||||||
.take(max_bytes as u64)
|
|
||||||
.read_to_end(&mut buf)?;
|
|
||||||
trace!("got {} bytes from file", n_bytes);
|
trace!("got {} bytes from file", n_bytes);
|
||||||
if n_bytes == 0 {
|
if n_bytes == 0 {
|
||||||
Err(io::Error::from(io::ErrorKind::UnexpectedEof))
|
Err(io::Error::from(io::ErrorKind::UnexpectedEof))
|
||||||
|
@ -154,14 +141,12 @@ where
|
||||||
if size == counter {
|
if size == counter {
|
||||||
Poll::Ready(None)
|
Poll::Ready(None)
|
||||||
} else {
|
} else {
|
||||||
let inner_file = file.take().expect("LiveFileReader polled after completion");
|
let inner_file = file
|
||||||
|
.take()
|
||||||
|
.expect("LiveFileReader polled after completion");
|
||||||
|
|
||||||
if offset >= *this.available_file_size {
|
if offset >= *this.available_file_size {
|
||||||
trace!(
|
trace!("offset {} has reached available file size {}, updating metadata", offset, this.available_file_size);
|
||||||
"offset {} has reached available file size {}, updating metadata",
|
|
||||||
offset,
|
|
||||||
this.available_file_size
|
|
||||||
);
|
|
||||||
// If we've hit the end of what was available
|
// If we've hit the end of what was available
|
||||||
// last time we checked, check again
|
// last time we checked, check again
|
||||||
*this.available_file_size = match inner_file.metadata() {
|
*this.available_file_size = match inner_file.metadata() {
|
||||||
|
@ -180,25 +165,15 @@ where
|
||||||
if let Ok(()) = addr.try_send(WakerMessage(cx.waker().clone())) {
|
if let Ok(()) = addr.try_send(WakerMessage(cx.waker().clone())) {
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
} else {
|
} else {
|
||||||
return Poll::Ready(Some(Err(ErrorInternalServerError(
|
return Poll::Ready(Some(Err(ErrorInternalServerError("Failed to contact file upload actor"))));
|
||||||
"Failed to contact file upload actor",
|
|
||||||
))));
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Poll::Ready(Some(Err(ErrorInternalServerError(
|
return Poll::Ready(Some(Err(ErrorInternalServerError("File upload was not completed"))));
|
||||||
"File upload was not completed",
|
|
||||||
))));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let max_bytes = cmp::min(
|
let max_bytes = cmp::min(65_536, cmp::min(size.saturating_sub(counter), this.available_file_size.saturating_sub(offset))) as usize;
|
||||||
65_536,
|
|
||||||
cmp::min(
|
|
||||||
size.saturating_sub(counter),
|
|
||||||
this.available_file_size.saturating_sub(offset),
|
|
||||||
),
|
|
||||||
) as usize;
|
|
||||||
|
|
||||||
let fut = (this.callback)(inner_file, offset, max_bytes);
|
let fut = (this.callback)(inner_file, offset, max_bytes);
|
||||||
|
|
||||||
|
|
37
src/main.rs
37
src/main.rs
|
@ -1,23 +1,23 @@
|
||||||
mod download;
|
mod download;
|
||||||
mod file;
|
mod file;
|
||||||
mod state;
|
|
||||||
mod upload;
|
mod upload;
|
||||||
mod util;
|
|
||||||
mod zip;
|
mod zip;
|
||||||
|
|
||||||
use std::{fs::File, path::PathBuf};
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
path::PathBuf,
|
||||||
|
fs::File,
|
||||||
|
};
|
||||||
|
|
||||||
use actix::Addr;
|
use actix::Addr;
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
get, middleware::Logger, web, App, HttpRequest, HttpResponse, HttpServer, Responder,
|
get, middleware::Logger, web, App, HttpRequest, HttpServer, Responder, HttpResponse,
|
||||||
};
|
};
|
||||||
use actix_web_actors::ws;
|
use actix_web_actors::ws;
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use state::PersistentState;
|
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
const APP_NAME: &str = "transbeam";
|
const APP_NAME: &'static str = "transbeam";
|
||||||
|
|
||||||
pub struct UploadedFile {
|
pub struct UploadedFile {
|
||||||
name: String,
|
name: String,
|
||||||
|
@ -35,40 +35,33 @@ impl UploadedFile {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Deserialize, Serialize)]
|
#[derive(Clone)]
|
||||||
pub struct DownloadableFile {
|
pub struct DownloadableFile {
|
||||||
name: String,
|
name: String,
|
||||||
size: u64,
|
size: u64,
|
||||||
#[serde(with = "state::timestamp")]
|
|
||||||
modtime: OffsetDateTime,
|
modtime: OffsetDateTime,
|
||||||
#[serde(skip)]
|
|
||||||
uploader: Option<Addr<upload::Uploader>>,
|
uploader: Option<Addr<upload::Uploader>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
type AppData = web::Data<RwLock<PersistentState>>;
|
type AppData = web::Data<RwLock<HashMap<String, DownloadableFile>>>;
|
||||||
|
|
||||||
fn storage_dir() -> PathBuf {
|
fn storage_dir() -> PathBuf {
|
||||||
PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from("storage")))
|
PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from("storage")))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[get("/download/{file_code}")]
|
#[get("/download/{file_code}")]
|
||||||
async fn handle_download(
|
async fn handle_download(req: HttpRequest, path: web::Path<String>, data: AppData) -> actix_web::Result<HttpResponse> {
|
||||||
req: HttpRequest,
|
|
||||||
path: web::Path<String>,
|
|
||||||
data: AppData,
|
|
||||||
) -> actix_web::Result<HttpResponse> {
|
|
||||||
let file_code = path.into_inner();
|
let file_code = path.into_inner();
|
||||||
if !util::is_ascii_alphanumeric(&file_code) {
|
if !file_code.as_bytes().iter().all(|c| c.is_ascii_alphanumeric()) {
|
||||||
return Ok(HttpResponse::NotFound().finish());
|
return Ok(HttpResponse::NotFound().finish());
|
||||||
}
|
}
|
||||||
let data = data.read().await;
|
let data = data.read().await;
|
||||||
let info = data.lookup_file(&file_code);
|
let info = data.get(&file_code);
|
||||||
if let Some(info) = info {
|
if let Some(info) = info {
|
||||||
Ok(download::DownloadingFile {
|
Ok(download::DownloadingFile {
|
||||||
file: File::open(storage_dir().join(file_code))?,
|
file: File::open(storage_dir().join(file_code))?,
|
||||||
info,
|
info: info.clone(),
|
||||||
}
|
}.into_response(&req))
|
||||||
.into_response(&req))
|
|
||||||
} else {
|
} else {
|
||||||
Ok(HttpResponse::NotFound().finish())
|
Ok(HttpResponse::NotFound().finish())
|
||||||
}
|
}
|
||||||
|
@ -83,7 +76,7 @@ async fn handle_upload(req: HttpRequest, stream: web::Payload, data: AppData) ->
|
||||||
async fn main() -> std::io::Result<()> {
|
async fn main() -> std::io::Result<()> {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let data: AppData = web::Data::new(RwLock::new(PersistentState::load().await?));
|
let data: AppData = web::Data::new(RwLock::new(HashMap::new()));
|
||||||
|
|
||||||
let static_dir =
|
let static_dir =
|
||||||
PathBuf::from(std::env::var("STATIC_DIR").unwrap_or_else(|_| String::from("static")));
|
PathBuf::from(std::env::var("STATIC_DIR").unwrap_or_else(|_| String::from("static")));
|
||||||
|
|
132
src/state.rs
132
src/state.rs
|
@ -1,132 +0,0 @@
|
||||||
use std::{collections::HashMap, io::ErrorKind};
|
|
||||||
|
|
||||||
use log::{error, info};
|
|
||||||
use tokio::{
|
|
||||||
fs::File,
|
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::{storage_dir, DownloadableFile};
|
|
||||||
|
|
||||||
const STATE_FILE_NAME: &str = "files.json";
|
|
||||||
|
|
||||||
pub(crate) mod timestamp {
|
|
||||||
use core::fmt;
|
|
||||||
|
|
||||||
use serde::{de::Visitor, Deserializer, Serializer};
|
|
||||||
use time::OffsetDateTime;
|
|
||||||
|
|
||||||
pub(crate) fn serialize<S: Serializer>(
|
|
||||||
time: &OffsetDateTime,
|
|
||||||
ser: S,
|
|
||||||
) -> Result<S::Ok, S::Error> {
|
|
||||||
ser.serialize_i64(time.unix_timestamp())
|
|
||||||
}
|
|
||||||
|
|
||||||
struct I64Visitor;
|
|
||||||
|
|
||||||
impl<'de> Visitor<'de> for I64Visitor {
|
|
||||||
type Value = i64;
|
|
||||||
|
|
||||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(formatter, "a signed integer")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> {
|
|
||||||
Ok(v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn deserialize<'de, D: Deserializer<'de>>(
|
|
||||||
de: D,
|
|
||||||
) -> Result<OffsetDateTime, D::Error> {
|
|
||||||
Ok(
|
|
||||||
OffsetDateTime::from_unix_timestamp(de.deserialize_i64(I64Visitor)?)
|
|
||||||
.unwrap_or_else(|_| OffsetDateTime::now_utc()),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct PersistentState(HashMap<String, DownloadableFile>);
|
|
||||||
impl PersistentState {
|
|
||||||
pub(crate) async fn load() -> std::io::Result<Self> {
|
|
||||||
let open_result = File::open(storage_dir().join(STATE_FILE_NAME)).await;
|
|
||||||
match open_result {
|
|
||||||
Ok(mut f) => {
|
|
||||||
let mut buf = String::new();
|
|
||||||
f.read_to_string(&mut buf).await?;
|
|
||||||
let map: HashMap<String, DownloadableFile> = serde_json::from_str(&buf)?;
|
|
||||||
info!("Loaded {} file entries from persistent storage", map.len());
|
|
||||||
let mut filtered: HashMap<String, DownloadableFile> = HashMap::new();
|
|
||||||
for (key, info) in map.into_iter() {
|
|
||||||
if !crate::util::is_ascii_alphanumeric(&key) {
|
|
||||||
error!("Invalid key in persistent storage: {}", key);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
let file = if let Ok(f) = File::open(storage_dir().join(&key)).await {
|
|
||||||
f
|
|
||||||
} else {
|
|
||||||
error!(
|
|
||||||
"Unable to open file {} referenced in persistent storage",
|
|
||||||
key
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
let metadata = if let Ok(md) = file.metadata().await {
|
|
||||||
md
|
|
||||||
} else {
|
|
||||||
error!(
|
|
||||||
"Unable to get metadata for file {} referenced in persistent storage",
|
|
||||||
key
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
if metadata.len() != info.size {
|
|
||||||
error!("Mismatched file size for file {} referenced in persistent storage: expected {}, found {}", key, info.size, metadata.len());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
filtered.insert(key, info);
|
|
||||||
}
|
|
||||||
Ok(Self(filtered))
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
if let ErrorKind::NotFound = e.kind() {
|
|
||||||
Ok(Self(HashMap::new()))
|
|
||||||
} else {
|
|
||||||
Err(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn save(&mut self) -> std::io::Result<()> {
|
|
||||||
File::create(storage_dir().join(STATE_FILE_NAME))
|
|
||||||
.await?
|
|
||||||
.write_all(&serde_json::to_vec_pretty(&self.0)?)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn add_file(
|
|
||||||
&mut self,
|
|
||||||
key: String,
|
|
||||||
file: DownloadableFile,
|
|
||||||
) -> std::io::Result<()> {
|
|
||||||
self.0.insert(key, file);
|
|
||||||
self.save().await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn lookup_file(&self, key: &str) -> Option<DownloadableFile> {
|
|
||||||
self.0.get(key).cloned()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn remove_file(&mut self, key: &str) -> std::io::Result<()> {
|
|
||||||
self.0.remove(key);
|
|
||||||
self.save().await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn remove_uploader(&mut self, key: &str) {
|
|
||||||
if let Some(f) = self.0.get_mut(key) {
|
|
||||||
f.uploader.take();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,6 +1,6 @@
|
||||||
use std::{collections::HashSet, io::Write, task::Waker};
|
use std::{collections::HashSet, io::Write, task::Waker};
|
||||||
|
|
||||||
use actix::{Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler};
|
use actix::{Actor, ActorContext, AsyncContext, StreamHandler, Message, Handler};
|
||||||
use actix_http::ws::{CloseReason, Item};
|
use actix_http::ws::{CloseReason, Item};
|
||||||
use actix_web_actors::ws::{self, CloseCode};
|
use actix_web_actors::ws::{self, CloseCode};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
@ -61,7 +61,7 @@ pub struct Uploader {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Uploader {
|
impl Uploader {
|
||||||
pub(crate) fn new(app_data: super::AppData) -> Self {
|
pub fn new(app_data: super::AppData) -> Self {
|
||||||
Self {
|
Self {
|
||||||
writer: None,
|
writer: None,
|
||||||
storage_filename: String::new(),
|
storage_filename: String::new(),
|
||||||
|
@ -82,11 +82,7 @@ pub(crate) struct WakerMessage(pub Waker);
|
||||||
impl Handler<WakerMessage> for Uploader {
|
impl Handler<WakerMessage> for Uploader {
|
||||||
type Result = ();
|
type Result = ();
|
||||||
fn handle(&mut self, msg: WakerMessage, _: &mut Self::Context) {
|
fn handle(&mut self, msg: WakerMessage, _: &mut Self::Context) {
|
||||||
if let Some(w) = self.writer.as_mut() {
|
self.writer.as_mut().map(|w| w.add_waker(msg.0));
|
||||||
w.add_waker(msg.0);
|
|
||||||
} else {
|
|
||||||
error!("Got a wakeup request before creating a file");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,7 +110,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||||
Ok(m) => m,
|
Ok(m) => m,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Websocket error: {}", e);
|
error!("Websocket error: {}", e);
|
||||||
self.cleanup_after_error(ctx);
|
|
||||||
ctx.stop();
|
ctx.stop();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -127,7 +122,6 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||||
code: e.close_code(),
|
code: e.close_code(),
|
||||||
description: Some(e.to_string()),
|
description: Some(e.to_string()),
|
||||||
}));
|
}));
|
||||||
self.cleanup_after_error(ctx);
|
|
||||||
ctx.stop();
|
ctx.stop();
|
||||||
}
|
}
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
|
@ -140,7 +134,9 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
|
||||||
let data = self.app_data.clone();
|
let data = self.app_data.clone();
|
||||||
let filename = self.storage_filename.clone();
|
let filename = self.storage_filename.clone();
|
||||||
ctx.wait(actix::fut::wrap_future(async move {
|
ctx.wait(actix::fut::wrap_future(async move {
|
||||||
data.write().await.remove_uploader(&filename);
|
if let Some(f) = data.write().await.get_mut(&filename) {
|
||||||
|
f.uploader.take();
|
||||||
|
}
|
||||||
}));
|
}));
|
||||||
ctx.stop();
|
ctx.stop();
|
||||||
}
|
}
|
||||||
|
@ -202,38 +198,36 @@ impl Uploader {
|
||||||
let size = zip_writer.total_size();
|
let size = zip_writer.total_size();
|
||||||
let download_filename =
|
let download_filename =
|
||||||
super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip";
|
super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip";
|
||||||
(
|
(Box::new(zip_writer), DownloadableFile {
|
||||||
Box::new(zip_writer),
|
|
||||||
DownloadableFile {
|
|
||||||
name: download_filename,
|
name: download_filename,
|
||||||
size,
|
size,
|
||||||
modtime: now,
|
modtime: now,
|
||||||
uploader: addr,
|
uploader: addr,
|
||||||
},
|
})
|
||||||
)
|
|
||||||
} else {
|
} else {
|
||||||
(
|
(Box::new(writer), DownloadableFile {
|
||||||
Box::new(writer),
|
|
||||||
DownloadableFile {
|
|
||||||
name: files[0].name.clone(),
|
name: files[0].name.clone(),
|
||||||
size: files[0].size,
|
size: files[0].size,
|
||||||
modtime: files[0].modtime,
|
modtime: files[0].modtime,
|
||||||
uploader: addr,
|
uploader: addr,
|
||||||
},
|
})
|
||||||
)
|
|
||||||
};
|
};
|
||||||
self.writer = Some(writer);
|
self.writer = Some(writer);
|
||||||
let data = self.app_data.clone();
|
let data = self.app_data.clone();
|
||||||
|
let storage_filename_copy = storage_filename.clone();
|
||||||
ctx.spawn(actix::fut::wrap_future(async move {
|
ctx.spawn(actix::fut::wrap_future(async move {
|
||||||
data.write()
|
data
|
||||||
|
.write()
|
||||||
.await
|
.await
|
||||||
.add_file(storage_filename, downloadable_file)
|
.insert(
|
||||||
.await
|
storage_filename_copy,
|
||||||
.unwrap();
|
downloadable_file,
|
||||||
|
);
|
||||||
}));
|
}));
|
||||||
ctx.text(self.storage_filename.as_str());
|
ctx.text(self.storage_filename.as_str());
|
||||||
}
|
}
|
||||||
ws::Message::Binary(data) | ws::Message::Continuation(Item::Last(data)) => {
|
ws::Message::Binary(data)
|
||||||
|
| ws::Message::Continuation(Item::Last(data)) => {
|
||||||
let result = self.handle_data(data)?;
|
let result = self.handle_data(data)?;
|
||||||
ack(ctx);
|
ack(ctx);
|
||||||
return Ok(result);
|
return Ok(result);
|
||||||
|
@ -273,12 +267,4 @@ impl Uploader {
|
||||||
Err(Error::UnexpectedMessageType)
|
Err(Error::UnexpectedMessageType)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cleanup_after_error(&mut self, ctx: &mut <Self as Actor>::Context) {
|
|
||||||
let data = self.app_data.clone();
|
|
||||||
let filename = self.storage_filename.clone();
|
|
||||||
ctx.spawn(actix::fut::wrap_future(async move {
|
|
||||||
data.write().await.remove_file(&filename).await.unwrap();
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +0,0 @@
|
||||||
pub(crate) fn is_ascii_alphanumeric(s: &str) -> bool {
|
|
||||||
s.as_bytes().iter().all(|c| c.is_ascii_alphanumeric())
|
|
||||||
}
|
|
Loading…
Reference in a new issue