diff --git a/Cargo.lock b/Cargo.lock index 86251a0..fcc24fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -747,6 +747,28 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "inotify" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abf888f9575c290197b2c948dc9e9ff10bd1a39ad1ea8585f734585fa6b9d3f9" +dependencies = [ + "bitflags", + "futures-core", + "inotify-sys", + "libc", + "tokio", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "itoa" version = "1.0.1" @@ -1383,6 +1405,7 @@ dependencies = [ "crc32fast", "env_logger", "futures-core", + "inotify", "log", "mime", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index 0269d3e..bbdd0ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ bytes = "1.1.0" crc32fast = "1.3.2" env_logger = "0.9" futures-core = "0.3" +inotify = "0.10" log = "0.4" mime = "0.3.16" pin-project-lite = "0.2.9" diff --git a/src/download.rs b/src/download.rs index 9183b91..3bde8de 100644 --- a/src/download.rs +++ b/src/download.rs @@ -1,4 +1,20 @@ -use std::{fs::File, os::unix::fs::MetadataExt, time::SystemTime}; +use std::{ + cmp, + fs::File, + future::Future, + io, + path::PathBuf, + pin::Pin, + task::{Context, Poll}, +}; + +use actix_web::error::{Error, ErrorInternalServerError}; +use bytes::Bytes; +use futures_core::{ready, Stream}; +use inotify::{Inotify, WatchMask}; +use log::trace; +use pin_project_lite::pin_project; +use std::{os::unix::fs::MetadataExt, time::SystemTime}; use actix_web::{ body::{self, BoxBody, SizedStream}, @@ -20,6 +36,7 @@ use crate::DownloadableFile; pub(crate) struct DownloadingFile { pub(crate) file: File, + pub(crate) storage_path: PathBuf, pub(crate) info: DownloadableFile, } @@ -101,7 +118,7 @@ impl DownloadingFile { .map_into_boxed_body(); } - let reader = crate::file::new_live_reader(length, offset, self.file, self.info.uploader); + let reader = new_live_reader(length, offset, self.file, self.storage_path); if offset != 0 || length != self.info.size { res.status(StatusCode::PARTIAL_CONTENT); @@ -150,3 +167,170 @@ impl Responder for DownloadingFile { self.into_response(req) } } + +pin_project! { + pub struct LiveFileReader { + size: u64, + offset: u64, + #[pin] + state: LiveFileReaderState, + counter: u64, + available_file_size: u64, + callback: F, + #[pin] + events: inotify::EventStream<[u8; 1024]>, + } +} + +pin_project! { + #[project = LiveFileReaderStateProj] + #[project_replace = LiveFileReaderStateProjReplace] + enum LiveFileReaderState { + File { file: Option, }, + Future { #[pin] fut: Fut }, + } +} + +pub(crate) fn new_live_reader( + size: u64, + offset: u64, + file: File, + storage_path: PathBuf, +) -> impl Stream> { + let mut inotify = Inotify::init().expect("failed to init inotify"); + inotify + .add_watch(storage_path, WatchMask::MODIFY | WatchMask::CLOSE_WRITE) + .expect("Failed to add inotify watch"); + let events = inotify + .event_stream([0; 1024]) + .expect("failed to set up event stream"); + LiveFileReader { + size, + offset, + state: LiveFileReaderState::File { file: Some(file) }, + counter: 0, + available_file_size: 0, + callback: live_file_reader_callback, + events, + } +} + +async fn live_file_reader_callback( + mut file: File, + offset: u64, + max_bytes: usize, +) -> Result<(File, Bytes), Error> { + use io::{Read as _, Seek as _}; + + let res = actix_web::web::block(move || { + trace!( + "reading up to {} bytes of file starting at {}", + max_bytes, + offset + ); + + let mut buf = Vec::with_capacity(max_bytes); + + file.seek(io::SeekFrom::Start(offset))?; + + let n_bytes = std::io::Read::by_ref(&mut file) + .take(max_bytes as u64) + .read_to_end(&mut buf)?; + trace!("got {} bytes from file", n_bytes); + if n_bytes == 0 { + Err(io::Error::from(io::ErrorKind::UnexpectedEof)) + } else { + Ok((file, Bytes::from(buf))) + } + }) + .await??; + + Ok(res) +} + +impl Stream for LiveFileReader +where + F: Fn(File, u64, usize) -> Fut, + Fut: Future>, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + match this.state.as_mut().project() { + LiveFileReaderStateProj::File { file } => { + let size = *this.size; + let offset = *this.offset; + let counter = *this.counter; + + if size == counter { + Poll::Ready(None) + } else { + let inner_file = file.take().expect("LiveFileReader polled after completion"); + + if offset >= *this.available_file_size { + trace!( + "offset {} has reached available file size {}, updating metadata", + offset, + this.available_file_size + ); + // If we've hit the end of what was available + // last time we checked, check again + *this.available_file_size = match inner_file.metadata() { + Ok(md) => md.len(), + Err(e) => { + return Poll::Ready(Some(Err(e.into()))); + } + }; + trace!("new available file size: {}", this.available_file_size); + + // If we're still at the end, inotify time + if offset >= *this.available_file_size { + trace!("waiting for inotify events"); + file.get_or_insert(inner_file); + match this.events.poll_next(cx) { + Poll::Pending => { + return Poll::Pending; + } + Poll::Ready(Some(_)) => { + return self.poll_next(cx); + } + _ => { + return Poll::Ready(Some(Err(ErrorInternalServerError( + "inotify stream empty", + )))); + } + } + } + } + + let max_bytes = cmp::min( + 65_536, + cmp::min( + size.saturating_sub(counter), + this.available_file_size.saturating_sub(offset), + ), + ) as usize; + + let fut = (this.callback)(inner_file, offset, max_bytes); + + this.state + .project_replace(LiveFileReaderState::Future { fut }); + + self.poll_next(cx) + } + } + LiveFileReaderStateProj::Future { fut } => { + let (file, bytes) = ready!(fut.poll(cx))?; + + this.state + .project_replace(LiveFileReaderState::File { file: Some(file) }); + + *this.offset += bytes.len() as u64; + *this.counter += bytes.len() as u64; + + Poll::Ready(Some(Ok(bytes))) + } + } + } +} diff --git a/src/file.rs b/src/file.rs index 0ba6bf8..e69de29 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,224 +0,0 @@ -use std::{ - cmp, - fs::File, - future::Future, - io::{self, Write}, - path::PathBuf, - pin::Pin, - task::{Context, Poll, Waker}, -}; - -use actix::Addr; -use actix_web::error::{Error, ErrorInternalServerError}; -use bytes::Bytes; -use futures_core::{ready, Stream}; -use log::trace; -use pin_project_lite::pin_project; - -use crate::upload::WakerMessage; - -pub trait LiveWriter: Write { - fn add_waker(&mut self, waker: Waker); -} - -/// A simple wrapper for a file that can be read while we're still appending data -pub struct LiveFileWriter { - file: File, - /// Wake handles for contexts that are waiting for us to write more - wakers: Vec, -} - -impl LiveFileWriter { - pub fn new(path: &PathBuf) -> std::io::Result { - Ok(Self { - file: File::options().write(true).create_new(true).open(path)?, - wakers: Vec::new(), - }) - } -} - -impl LiveWriter for LiveFileWriter { - fn add_waker(&mut self, waker: Waker) { - self.wakers.push(waker); - } -} - -impl Write for LiveFileWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let result = self.file.write(buf); - if let Ok(n) = result { - if n > 0 { - for waker in self.wakers.drain(..) { - waker.wake(); - } - } - } - result - } - - fn flush(&mut self) -> std::io::Result<()> { - self.file.flush() - } -} - -// This implementation of a file responder is copied pretty directly -// from actix-files with some tweaks - -pin_project! { - pub struct LiveFileReader { - size: u64, - offset: u64, - #[pin] - state: LiveFileReaderState, - counter: u64, - available_file_size: u64, - callback: F, - uploader: Option>, - } -} - -pin_project! { - #[project = LiveFileReaderStateProj] - #[project_replace = LiveFileReaderStateProjReplace] - enum LiveFileReaderState { - File { file: Option, }, - Future { #[pin] fut: Fut }, - } -} - -pub(crate) fn new_live_reader( - size: u64, - offset: u64, - file: File, - uploader: Option>, -) -> impl Stream> { - LiveFileReader { - size, - offset, - state: LiveFileReaderState::File { file: Some(file) }, - counter: 0, - available_file_size: 0, - callback: live_file_reader_callback, - uploader, - } -} - -async fn live_file_reader_callback( - mut file: File, - offset: u64, - max_bytes: usize, -) -> Result<(File, Bytes), Error> { - use io::{Read as _, Seek as _}; - - let res = actix_web::web::block(move || { - trace!( - "reading up to {} bytes of file starting at {}", - max_bytes, - offset - ); - - let mut buf = Vec::with_capacity(max_bytes); - - file.seek(io::SeekFrom::Start(offset))?; - - let n_bytes = std::io::Read::by_ref(&mut file) - .take(max_bytes as u64) - .read_to_end(&mut buf)?; - trace!("got {} bytes from file", n_bytes); - if n_bytes == 0 { - Err(io::Error::from(io::ErrorKind::UnexpectedEof)) - } else { - Ok((file, Bytes::from(buf))) - } - }) - .await??; - - Ok(res) -} - -impl Stream for LiveFileReader -where - F: Fn(File, u64, usize) -> Fut, - Fut: Future>, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.as_mut().project(); - match this.state.as_mut().project() { - LiveFileReaderStateProj::File { file } => { - let size = *this.size; - let offset = *this.offset; - let counter = *this.counter; - - if size == counter { - Poll::Ready(None) - } else { - let inner_file = file.take().expect("LiveFileReader polled after completion"); - - if offset >= *this.available_file_size { - trace!( - "offset {} has reached available file size {}, updating metadata", - offset, - this.available_file_size - ); - // If we've hit the end of what was available - // last time we checked, check again - *this.available_file_size = match inner_file.metadata() { - Ok(md) => md.len(), - Err(e) => { - return Poll::Ready(Some(Err(e.into()))); - } - }; - trace!("new available file size: {}", this.available_file_size); - - // If we're still at the end, wait for a wakeup from the uploader - if offset >= *this.available_file_size { - trace!("requesting wakeup from uploader"); - file.get_or_insert(inner_file); - if let Some(addr) = this.uploader { - if let Ok(()) = addr.try_send(WakerMessage(cx.waker().clone())) { - return Poll::Pending; - } else { - return Poll::Ready(Some(Err(ErrorInternalServerError( - "Failed to contact file upload actor", - )))); - } - } else { - return Poll::Ready(Some(Err(ErrorInternalServerError( - "File upload was not completed", - )))); - } - } - } - - let max_bytes = cmp::min( - 65_536, - cmp::min( - size.saturating_sub(counter), - this.available_file_size.saturating_sub(offset), - ), - ) as usize; - - let fut = (this.callback)(inner_file, offset, max_bytes); - - this.state - .project_replace(LiveFileReaderState::Future { fut }); - - self.poll_next(cx) - } - } - LiveFileReaderStateProj::Future { fut } => { - let (file, bytes) = ready!(fut.poll(cx))?; - - this.state - .project_replace(LiveFileReaderState::File { file: Some(file) }); - - *this.offset += bytes.len() as u64; - *this.counter += bytes.len() as u64; - - Poll::Ready(Some(Ok(bytes))) - } - } - } -} diff --git a/src/main.rs b/src/main.rs index 87dcdca..ca48349 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,4 @@ mod download; -mod file; mod state; mod upload; mod util; @@ -7,7 +6,6 @@ mod zip; use std::{fs::File, path::PathBuf}; -use actix::Addr; use actix_web::{ get, middleware::Logger, web, App, HttpRequest, HttpResponse, HttpServer, Responder, }; @@ -41,8 +39,6 @@ pub struct DownloadableFile { size: u64, #[serde(with = "state::timestamp")] modtime: OffsetDateTime, - #[serde(skip)] - uploader: Option>, } type AppData = web::Data>; @@ -64,8 +60,11 @@ async fn handle_download( let data = data.read().await; let info = data.lookup_file(&file_code); if let Some(info) = info { + let storage_path = storage_dir().join(file_code); + let file = File::open(&storage_path)?; Ok(download::DownloadingFile { - file: File::open(storage_dir().join(file_code))?, + file, + storage_path, info, } .into_response(&req)) diff --git a/src/state.rs b/src/state.rs index a71a050..4bcecb3 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, io::ErrorKind}; -use log::{error, info, warn, debug}; +use log::{debug, error, info, warn}; use tokio::{ fs::File, io::{AsyncReadExt, AsyncWriteExt}, @@ -52,7 +52,7 @@ pub(crate) mod timestamp { } async fn is_valid_entry(key: &str, info: &DownloadableFile) -> bool { - if !crate::util::is_ascii_alphanumeric(&key) { + if !crate::util::is_ascii_alphanumeric(key) { error!("Invalid key in persistent storage: {}", key); return false; } @@ -142,10 +142,4 @@ impl PersistentState { self.0.remove(key); self.save().await } - - pub(crate) fn remove_uploader(&mut self, key: &str) { - if let Some(f) = self.0.get_mut(key) { - f.uploader.take(); - } - } } diff --git a/src/upload.rs b/src/upload.rs index d8f3003..ad7bb1f 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -1,6 +1,6 @@ -use std::{collections::HashSet, io::Write, task::Waker}; +use std::{collections::HashSet, fs::File, io::Write}; -use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler}; +use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, StreamHandler}; use actix_http::ws::{CloseReason, Item}; use actix_web_actors::ws::{self, CloseCode}; use bytes::Bytes; @@ -9,7 +9,7 @@ use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; use time::OffsetDateTime; -use crate::{file::LiveWriter, DownloadableFile, UploadedFile, storage_dir}; +use crate::{storage_dir, DownloadableFile, UploadedFile}; const MAX_FILES: usize = 256; const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] = @@ -54,7 +54,7 @@ impl Error { } pub struct Uploader { - writer: Option>, + writer: Option>, storage_filename: String, app_data: super::AppData, bytes_remaining: u64, @@ -75,21 +75,6 @@ impl Actor for Uploader { type Context = ws::WebsocketContext; } -#[derive(Message)] -#[rtype(result = "()")] -pub(crate) struct WakerMessage(pub Waker); - -impl Handler for Uploader { - type Result = (); - fn handle(&mut self, msg: WakerMessage, _: &mut Self::Context) { - if let Some(w) = self.writer.as_mut() { - w.add_waker(msg.0); - } else { - error!("Got a wakeup request before creating a file"); - } - } -} - #[derive(Debug, Deserialize)] struct RawUploadedFile { name: String, @@ -108,6 +93,15 @@ impl RawUploadedFile { } } +fn stop_and_flush(_: T, u: &mut Uploader, ctx: &mut ::Context) { + ctx.stop(); + if let Some(w) = u.writer.as_mut() { + if let Err(e) = w.flush() { + error!("Failed to flush writer: {}", e); + } + } +} + impl StreamHandler> for Uploader { fn handle(&mut self, msg: Result, ctx: &mut Self::Context) { let msg = match msg { @@ -130,17 +124,11 @@ impl StreamHandler> for Uploader { } Ok(true) => { info!("Finished uploading data"); - self.writer.as_mut().map(|w| w.flush()); ctx.close(Some(ws::CloseReason { code: CloseCode::Normal, description: None, })); - let data = self.app_data.clone(); - let filename = self.storage_filename.clone(); - ctx.wait(actix::fut::wrap_future(async move { - debug!("Spawned future to remove uploader from entry {} before stopping", filename); - data.write().await.remove_uploader(&filename); - }).map(|_, _, ctx: &mut Self::Context| ctx.stop())); + stop_and_flush((), self, ctx); } _ => (), } @@ -191,9 +179,11 @@ impl Uploader { self.storage_filename = storage_filename.clone(); let storage_path = storage_dir().join(storage_filename.clone()); info!("storing to: {:?}", storage_path); - let writer = super::file::LiveFileWriter::new(&storage_path)?; - let addr = Some(ctx.address()); - let (writer, downloadable_file): (Box, _) = if files.len() > 1 { + let writer = File::options() + .write(true) + .create_new(true) + .open(&storage_path)?; + let (writer, downloadable_file): (Box, _) = if files.len() > 1 { info!("Wrapping in zipfile generator"); let now = OffsetDateTime::now_utc(); let zip_writer = super::zip::ZipGenerator::new(files, writer); @@ -206,7 +196,6 @@ impl Uploader { name: download_filename, size, modtime: now, - uploader: addr, }, ) } else { @@ -216,7 +205,6 @@ impl Uploader { name: files[0].name.clone(), size: files[0].size, modtime: files[0].modtime, - uploader: addr, }, ) }; @@ -274,13 +262,19 @@ impl Uploader { } fn cleanup_after_error(&mut self, ctx: &mut ::Context) { - info!("Cleaning up after failed upload of {}", self.storage_filename); + info!( + "Cleaning up after failed upload of {}", + self.storage_filename + ); let data = self.app_data.clone(); let filename = self.storage_filename.clone(); - ctx.wait(actix::fut::wrap_future(async move { - debug!("Spawned future to remove entry {} from state", filename); - data.write().await.remove_file(&filename).await.unwrap(); - }).map(|_, _, ctx: &mut ::Context| ctx.stop())); + ctx.wait( + actix::fut::wrap_future(async move { + debug!("Spawned future to remove entry {} from state", filename); + data.write().await.remove_file(&filename).await.unwrap(); + }) + .map(stop_and_flush), + ); if let Err(e) = std::fs::remove_file(storage_dir().join(&self.storage_filename)) { error!("Failed to remove file {}: {}", self.storage_filename, e); } diff --git a/src/zip.rs b/src/zip.rs index 2ff9ecf..de8b1a9 100644 --- a/src/zip.rs +++ b/src/zip.rs @@ -1,11 +1,9 @@ use std::io::Write; -use std::task::Waker; use crc32fast::Hasher; use log::debug; use time::OffsetDateTime; -use crate::file::LiveWriter; use crate::UploadedFile; const SIGNATURE_SIZE: u64 = 4; @@ -280,12 +278,6 @@ impl ZipGenerator { } } -impl LiveWriter for ZipGenerator { - fn add_waker(&mut self, waker: Waker) { - self.output.add_waker(waker); - } -} - impl Write for ZipGenerator { fn write(&mut self, mut buf: &[u8]) -> std::io::Result { while !self.pending_metadata.is_empty() {