fix weird end-of-file bug by having downloaders use inotify to directly track changes

This commit is contained in:
xenofem 2022-04-29 22:36:44 -04:00
parent ba4c7bfcbe
commit cc0aaaab94
8 changed files with 246 additions and 283 deletions

23
Cargo.lock generated
View file

@ -747,6 +747,28 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "inotify"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abf888f9575c290197b2c948dc9e9ff10bd1a39ad1ea8585f734585fa6b9d3f9"
dependencies = [
"bitflags",
"futures-core",
"inotify-sys",
"libc",
"tokio",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "itoa"
version = "1.0.1"
@ -1383,6 +1405,7 @@ dependencies = [
"crc32fast",
"env_logger",
"futures-core",
"inotify",
"log",
"mime",
"pin-project-lite",

View file

@ -17,6 +17,7 @@ bytes = "1.1.0"
crc32fast = "1.3.2"
env_logger = "0.9"
futures-core = "0.3"
inotify = "0.10"
log = "0.4"
mime = "0.3.16"
pin-project-lite = "0.2.9"

View file

@ -1,4 +1,20 @@
use std::{fs::File, os::unix::fs::MetadataExt, time::SystemTime};
use std::{
cmp,
fs::File,
future::Future,
io,
path::PathBuf,
pin::Pin,
task::{Context, Poll},
};
use actix_web::error::{Error, ErrorInternalServerError};
use bytes::Bytes;
use futures_core::{ready, Stream};
use inotify::{Inotify, WatchMask};
use log::trace;
use pin_project_lite::pin_project;
use std::{os::unix::fs::MetadataExt, time::SystemTime};
use actix_web::{
body::{self, BoxBody, SizedStream},
@ -20,6 +36,7 @@ use crate::DownloadableFile;
pub(crate) struct DownloadingFile {
pub(crate) file: File,
pub(crate) storage_path: PathBuf,
pub(crate) info: DownloadableFile,
}
@ -101,7 +118,7 @@ impl DownloadingFile {
.map_into_boxed_body();
}
let reader = crate::file::new_live_reader(length, offset, self.file, self.info.uploader);
let reader = new_live_reader(length, offset, self.file, self.storage_path);
if offset != 0 || length != self.info.size {
res.status(StatusCode::PARTIAL_CONTENT);
@ -150,3 +167,170 @@ impl Responder for DownloadingFile {
self.into_response(req)
}
}
pin_project! {
pub struct LiveFileReader<F, Fut> {
size: u64,
offset: u64,
#[pin]
state: LiveFileReaderState<Fut>,
counter: u64,
available_file_size: u64,
callback: F,
#[pin]
events: inotify::EventStream<[u8; 1024]>,
}
}
pin_project! {
#[project = LiveFileReaderStateProj]
#[project_replace = LiveFileReaderStateProjReplace]
enum LiveFileReaderState<Fut> {
File { file: Option<File>, },
Future { #[pin] fut: Fut },
}
}
pub(crate) fn new_live_reader(
size: u64,
offset: u64,
file: File,
storage_path: PathBuf,
) -> impl Stream<Item = Result<Bytes, Error>> {
let mut inotify = Inotify::init().expect("failed to init inotify");
inotify
.add_watch(storage_path, WatchMask::MODIFY | WatchMask::CLOSE_WRITE)
.expect("Failed to add inotify watch");
let events = inotify
.event_stream([0; 1024])
.expect("failed to set up event stream");
LiveFileReader {
size,
offset,
state: LiveFileReaderState::File { file: Some(file) },
counter: 0,
available_file_size: 0,
callback: live_file_reader_callback,
events,
}
}
async fn live_file_reader_callback(
mut file: File,
offset: u64,
max_bytes: usize,
) -> Result<(File, Bytes), Error> {
use io::{Read as _, Seek as _};
let res = actix_web::web::block(move || {
trace!(
"reading up to {} bytes of file starting at {}",
max_bytes,
offset
);
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let n_bytes = std::io::Read::by_ref(&mut file)
.take(max_bytes as u64)
.read_to_end(&mut buf)?;
trace!("got {} bytes from file", n_bytes);
if n_bytes == 0 {
Err(io::Error::from(io::ErrorKind::UnexpectedEof))
} else {
Ok((file, Bytes::from(buf)))
}
})
.await??;
Ok(res)
}
impl<F, Fut> Stream for LiveFileReader<F, Fut>
where
F: Fn(File, u64, usize) -> Fut,
Fut: Future<Output = Result<(File, Bytes), Error>>,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
LiveFileReaderStateProj::File { file } => {
let size = *this.size;
let offset = *this.offset;
let counter = *this.counter;
if size == counter {
Poll::Ready(None)
} else {
let inner_file = file.take().expect("LiveFileReader polled after completion");
if offset >= *this.available_file_size {
trace!(
"offset {} has reached available file size {}, updating metadata",
offset,
this.available_file_size
);
// If we've hit the end of what was available
// last time we checked, check again
*this.available_file_size = match inner_file.metadata() {
Ok(md) => md.len(),
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
};
trace!("new available file size: {}", this.available_file_size);
// If we're still at the end, inotify time
if offset >= *this.available_file_size {
trace!("waiting for inotify events");
file.get_or_insert(inner_file);
match this.events.poll_next(cx) {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(Some(_)) => {
return self.poll_next(cx);
}
_ => {
return Poll::Ready(Some(Err(ErrorInternalServerError(
"inotify stream empty",
))));
}
}
}
}
let max_bytes = cmp::min(
65_536,
cmp::min(
size.saturating_sub(counter),
this.available_file_size.saturating_sub(offset),
),
) as usize;
let fut = (this.callback)(inner_file, offset, max_bytes);
this.state
.project_replace(LiveFileReaderState::Future { fut });
self.poll_next(cx)
}
}
LiveFileReaderStateProj::Future { fut } => {
let (file, bytes) = ready!(fut.poll(cx))?;
this.state
.project_replace(LiveFileReaderState::File { file: Some(file) });
*this.offset += bytes.len() as u64;
*this.counter += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes)))
}
}
}
}

View file

@ -1,224 +0,0 @@
use std::{
cmp,
fs::File,
future::Future,
io::{self, Write},
path::PathBuf,
pin::Pin,
task::{Context, Poll, Waker},
};
use actix::Addr;
use actix_web::error::{Error, ErrorInternalServerError};
use bytes::Bytes;
use futures_core::{ready, Stream};
use log::trace;
use pin_project_lite::pin_project;
use crate::upload::WakerMessage;
pub trait LiveWriter: Write {
fn add_waker(&mut self, waker: Waker);
}
/// A simple wrapper for a file that can be read while we're still appending data
pub struct LiveFileWriter {
file: File,
/// Wake handles for contexts that are waiting for us to write more
wakers: Vec<Waker>,
}
impl LiveFileWriter {
pub fn new(path: &PathBuf) -> std::io::Result<Self> {
Ok(Self {
file: File::options().write(true).create_new(true).open(path)?,
wakers: Vec::new(),
})
}
}
impl LiveWriter for LiveFileWriter {
fn add_waker(&mut self, waker: Waker) {
self.wakers.push(waker);
}
}
impl Write for LiveFileWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let result = self.file.write(buf);
if let Ok(n) = result {
if n > 0 {
for waker in self.wakers.drain(..) {
waker.wake();
}
}
}
result
}
fn flush(&mut self) -> std::io::Result<()> {
self.file.flush()
}
}
// This implementation of a file responder is copied pretty directly
// from actix-files with some tweaks
pin_project! {
pub struct LiveFileReader<F, Fut> {
size: u64,
offset: u64,
#[pin]
state: LiveFileReaderState<Fut>,
counter: u64,
available_file_size: u64,
callback: F,
uploader: Option<Addr<crate::upload::Uploader>>,
}
}
pin_project! {
#[project = LiveFileReaderStateProj]
#[project_replace = LiveFileReaderStateProjReplace]
enum LiveFileReaderState<Fut> {
File { file: Option<File>, },
Future { #[pin] fut: Fut },
}
}
pub(crate) fn new_live_reader(
size: u64,
offset: u64,
file: File,
uploader: Option<Addr<crate::upload::Uploader>>,
) -> impl Stream<Item = Result<Bytes, Error>> {
LiveFileReader {
size,
offset,
state: LiveFileReaderState::File { file: Some(file) },
counter: 0,
available_file_size: 0,
callback: live_file_reader_callback,
uploader,
}
}
async fn live_file_reader_callback(
mut file: File,
offset: u64,
max_bytes: usize,
) -> Result<(File, Bytes), Error> {
use io::{Read as _, Seek as _};
let res = actix_web::web::block(move || {
trace!(
"reading up to {} bytes of file starting at {}",
max_bytes,
offset
);
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let n_bytes = std::io::Read::by_ref(&mut file)
.take(max_bytes as u64)
.read_to_end(&mut buf)?;
trace!("got {} bytes from file", n_bytes);
if n_bytes == 0 {
Err(io::Error::from(io::ErrorKind::UnexpectedEof))
} else {
Ok((file, Bytes::from(buf)))
}
})
.await??;
Ok(res)
}
impl<F, Fut> Stream for LiveFileReader<F, Fut>
where
F: Fn(File, u64, usize) -> Fut,
Fut: Future<Output = Result<(File, Bytes), Error>>,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
LiveFileReaderStateProj::File { file } => {
let size = *this.size;
let offset = *this.offset;
let counter = *this.counter;
if size == counter {
Poll::Ready(None)
} else {
let inner_file = file.take().expect("LiveFileReader polled after completion");
if offset >= *this.available_file_size {
trace!(
"offset {} has reached available file size {}, updating metadata",
offset,
this.available_file_size
);
// If we've hit the end of what was available
// last time we checked, check again
*this.available_file_size = match inner_file.metadata() {
Ok(md) => md.len(),
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
};
trace!("new available file size: {}", this.available_file_size);
// If we're still at the end, wait for a wakeup from the uploader
if offset >= *this.available_file_size {
trace!("requesting wakeup from uploader");
file.get_or_insert(inner_file);
if let Some(addr) = this.uploader {
if let Ok(()) = addr.try_send(WakerMessage(cx.waker().clone())) {
return Poll::Pending;
} else {
return Poll::Ready(Some(Err(ErrorInternalServerError(
"Failed to contact file upload actor",
))));
}
} else {
return Poll::Ready(Some(Err(ErrorInternalServerError(
"File upload was not completed",
))));
}
}
}
let max_bytes = cmp::min(
65_536,
cmp::min(
size.saturating_sub(counter),
this.available_file_size.saturating_sub(offset),
),
) as usize;
let fut = (this.callback)(inner_file, offset, max_bytes);
this.state
.project_replace(LiveFileReaderState::Future { fut });
self.poll_next(cx)
}
}
LiveFileReaderStateProj::Future { fut } => {
let (file, bytes) = ready!(fut.poll(cx))?;
this.state
.project_replace(LiveFileReaderState::File { file: Some(file) });
*this.offset += bytes.len() as u64;
*this.counter += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes)))
}
}
}
}

View file

@ -1,5 +1,4 @@
mod download;
mod file;
mod state;
mod upload;
mod util;
@ -7,7 +6,6 @@ mod zip;
use std::{fs::File, path::PathBuf};
use actix::Addr;
use actix_web::{
get, middleware::Logger, web, App, HttpRequest, HttpResponse, HttpServer, Responder,
};
@ -41,8 +39,6 @@ pub struct DownloadableFile {
size: u64,
#[serde(with = "state::timestamp")]
modtime: OffsetDateTime,
#[serde(skip)]
uploader: Option<Addr<upload::Uploader>>,
}
type AppData = web::Data<RwLock<PersistentState>>;
@ -64,8 +60,11 @@ async fn handle_download(
let data = data.read().await;
let info = data.lookup_file(&file_code);
if let Some(info) = info {
let storage_path = storage_dir().join(file_code);
let file = File::open(&storage_path)?;
Ok(download::DownloadingFile {
file: File::open(storage_dir().join(file_code))?,
file,
storage_path,
info,
}
.into_response(&req))

View file

@ -1,6 +1,6 @@
use std::{collections::HashMap, io::ErrorKind};
use log::{error, info, warn, debug};
use log::{debug, error, info, warn};
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
@ -52,7 +52,7 @@ pub(crate) mod timestamp {
}
async fn is_valid_entry(key: &str, info: &DownloadableFile) -> bool {
if !crate::util::is_ascii_alphanumeric(&key) {
if !crate::util::is_ascii_alphanumeric(key) {
error!("Invalid key in persistent storage: {}", key);
return false;
}
@ -142,10 +142,4 @@ impl PersistentState {
self.0.remove(key);
self.save().await
}
pub(crate) fn remove_uploader(&mut self, key: &str) {
if let Some(f) = self.0.get_mut(key) {
f.uploader.take();
}
}
}

View file

@ -1,6 +1,6 @@
use std::{collections::HashSet, io::Write, task::Waker};
use std::{collections::HashSet, fs::File, io::Write};
use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, Handler, Message, StreamHandler};
use actix::{fut::future::ActorFutureExt, Actor, ActorContext, AsyncContext, StreamHandler};
use actix_http::ws::{CloseReason, Item};
use actix_web_actors::ws::{self, CloseCode};
use bytes::Bytes;
@ -9,7 +9,7 @@ use rand::distributions::{Alphanumeric, DistString};
use serde::Deserialize;
use time::OffsetDateTime;
use crate::{file::LiveWriter, DownloadableFile, UploadedFile, storage_dir};
use crate::{storage_dir, DownloadableFile, UploadedFile};
const MAX_FILES: usize = 256;
const FILENAME_DATE_FORMAT: &[time::format_description::FormatItem] =
@ -54,7 +54,7 @@ impl Error {
}
pub struct Uploader {
writer: Option<Box<dyn LiveWriter>>,
writer: Option<Box<dyn Write>>,
storage_filename: String,
app_data: super::AppData,
bytes_remaining: u64,
@ -75,21 +75,6 @@ impl Actor for Uploader {
type Context = ws::WebsocketContext<Self>;
}
#[derive(Message)]
#[rtype(result = "()")]
pub(crate) struct WakerMessage(pub Waker);
impl Handler<WakerMessage> for Uploader {
type Result = ();
fn handle(&mut self, msg: WakerMessage, _: &mut Self::Context) {
if let Some(w) = self.writer.as_mut() {
w.add_waker(msg.0);
} else {
error!("Got a wakeup request before creating a file");
}
}
}
#[derive(Debug, Deserialize)]
struct RawUploadedFile {
name: String,
@ -108,6 +93,15 @@ impl RawUploadedFile {
}
}
fn stop_and_flush<T>(_: T, u: &mut Uploader, ctx: &mut <Uploader as Actor>::Context) {
ctx.stop();
if let Some(w) = u.writer.as_mut() {
if let Err(e) = w.flush() {
error!("Failed to flush writer: {}", e);
}
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
@ -130,17 +124,11 @@ impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Uploader {
}
Ok(true) => {
info!("Finished uploading data");
self.writer.as_mut().map(|w| w.flush());
ctx.close(Some(ws::CloseReason {
code: CloseCode::Normal,
description: None,
}));
let data = self.app_data.clone();
let filename = self.storage_filename.clone();
ctx.wait(actix::fut::wrap_future(async move {
debug!("Spawned future to remove uploader from entry {} before stopping", filename);
data.write().await.remove_uploader(&filename);
}).map(|_, _, ctx: &mut Self::Context| ctx.stop()));
stop_and_flush((), self, ctx);
}
_ => (),
}
@ -191,9 +179,11 @@ impl Uploader {
self.storage_filename = storage_filename.clone();
let storage_path = storage_dir().join(storage_filename.clone());
info!("storing to: {:?}", storage_path);
let writer = super::file::LiveFileWriter::new(&storage_path)?;
let addr = Some(ctx.address());
let (writer, downloadable_file): (Box<dyn LiveWriter>, _) = if files.len() > 1 {
let writer = File::options()
.write(true)
.create_new(true)
.open(&storage_path)?;
let (writer, downloadable_file): (Box<dyn Write>, _) = if files.len() > 1 {
info!("Wrapping in zipfile generator");
let now = OffsetDateTime::now_utc();
let zip_writer = super::zip::ZipGenerator::new(files, writer);
@ -206,7 +196,6 @@ impl Uploader {
name: download_filename,
size,
modtime: now,
uploader: addr,
},
)
} else {
@ -216,7 +205,6 @@ impl Uploader {
name: files[0].name.clone(),
size: files[0].size,
modtime: files[0].modtime,
uploader: addr,
},
)
};
@ -274,13 +262,19 @@ impl Uploader {
}
fn cleanup_after_error(&mut self, ctx: &mut <Self as Actor>::Context) {
info!("Cleaning up after failed upload of {}", self.storage_filename);
info!(
"Cleaning up after failed upload of {}",
self.storage_filename
);
let data = self.app_data.clone();
let filename = self.storage_filename.clone();
ctx.wait(actix::fut::wrap_future(async move {
debug!("Spawned future to remove entry {} from state", filename);
data.write().await.remove_file(&filename).await.unwrap();
}).map(|_, _, ctx: &mut <Self as Actor>::Context| ctx.stop()));
ctx.wait(
actix::fut::wrap_future(async move {
debug!("Spawned future to remove entry {} from state", filename);
data.write().await.remove_file(&filename).await.unwrap();
})
.map(stop_and_flush),
);
if let Err(e) = std::fs::remove_file(storage_dir().join(&self.storage_filename)) {
error!("Failed to remove file {}: {}", self.storage_filename, e);
}

View file

@ -1,11 +1,9 @@
use std::io::Write;
use std::task::Waker;
use crc32fast::Hasher;
use log::debug;
use time::OffsetDateTime;
use crate::file::LiveWriter;
use crate::UploadedFile;
const SIGNATURE_SIZE: u64 = 4;
@ -280,12 +278,6 @@ impl<W: Write> ZipGenerator<W> {
}
}
impl<W: LiveWriter> LiveWriter for ZipGenerator<W> {
fn add_waker(&mut self, waker: Waker) {
self.output.add_waker(waker);
}
}
impl<W: Write> Write for ZipGenerator<W> {
fn write(&mut self, mut buf: &[u8]) -> std::io::Result<usize> {
while !self.pending_metadata.is_empty() {