use std::{cmp, fs::File, future::Future, io::{self, Write}, path::PathBuf, pin::Pin, task::{Context, Poll, Waker}}; use actix::Addr; use actix_web::error::{Error, ErrorInternalServerError}; use bytes::Bytes; use futures_core::{ready, Stream}; use log::trace; use pin_project_lite::pin_project; use crate::upload::WakerMessage; pub trait LiveWriter: Write { fn add_waker(&mut self, waker: Waker); } /// A simple wrapper for a file that can be read while we're still appending data pub struct LiveFileWriter { file: File, /// Wake handles for contexts that are waiting for us to write more wakers: Vec, } impl LiveFileWriter { pub fn new(path: &PathBuf) -> std::io::Result { Ok(Self { file: File::options().write(true).create_new(true).open(path)?, wakers: Vec::new(), }) } } impl LiveWriter for LiveFileWriter { fn add_waker(&mut self, waker: Waker) { self.wakers.push(waker); } } impl Write for LiveFileWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { let result = self.file.write(buf); if let Ok(n) = result { if n > 0 { for waker in self.wakers.drain(..) { waker.wake(); } } } result } fn flush(&mut self) -> std::io::Result<()> { self.file.flush() } } // This implementation of a file responder is copied pretty directly // from actix-files with some tweaks pin_project! { pub struct LiveFileReader { size: u64, offset: u64, #[pin] state: LiveFileReaderState, counter: u64, available_file_size: u64, callback: F, uploader: Option>, } } pin_project! { #[project = LiveFileReaderStateProj] #[project_replace = LiveFileReaderStateProjReplace] enum LiveFileReaderState { File { file: Option, }, Future { #[pin] fut: Fut }, } } pub(crate) fn new_live_reader( size: u64, offset: u64, file: File, uploader: Option>, ) -> impl Stream> { LiveFileReader { size, offset, state: LiveFileReaderState::File { file: Some(file) }, counter: 0, available_file_size: 0, callback: live_file_reader_callback, uploader, } } async fn live_file_reader_callback( mut file: File, offset: u64, max_bytes: usize, ) -> Result<(File, Bytes), Error> { use io::{Read as _, Seek as _}; let res = actix_web::web::block(move || { trace!("reading up to {} bytes of file starting at {}", max_bytes, offset); let mut buf = Vec::with_capacity(max_bytes); file.seek(io::SeekFrom::Start(offset))?; let n_bytes = std::io::Read::by_ref(&mut file).take(max_bytes as u64).read_to_end(&mut buf)?; trace!("got {} bytes from file", n_bytes); if n_bytes == 0 { Err(io::Error::from(io::ErrorKind::UnexpectedEof)) } else { Ok((file, Bytes::from(buf))) } }) .await??; Ok(res) } impl Stream for LiveFileReader where F: Fn(File, u64, usize) -> Fut, Fut: Future>, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut().project(); match this.state.as_mut().project() { LiveFileReaderStateProj::File { file } => { let size = *this.size; let offset = *this.offset; let counter = *this.counter; if size == counter { Poll::Ready(None) } else { let inner_file = file .take() .expect("LiveFileReader polled after completion"); if offset >= *this.available_file_size { trace!("offset {} has reached available file size {}, updating metadata", offset, this.available_file_size); // If we've hit the end of what was available // last time we checked, check again *this.available_file_size = match inner_file.metadata() { Ok(md) => md.len(), Err(e) => { return Poll::Ready(Some(Err(e.into()))); } }; trace!("new available file size: {}", this.available_file_size); // If we're still at the end, wait for a wakeup from the uploader if offset >= *this.available_file_size { trace!("requesting wakeup from uploader"); file.get_or_insert(inner_file); if let Some(addr) = this.uploader { if let Ok(()) = addr.try_send(WakerMessage(cx.waker().clone())) { return Poll::Pending; } else { return Poll::Ready(Some(Err(ErrorInternalServerError("Failed to contact file upload actor")))); } } else { return Poll::Ready(Some(Err(ErrorInternalServerError("File upload was not completed")))); } } } let max_bytes = cmp::min(65_536, cmp::min(size.saturating_sub(counter), this.available_file_size.saturating_sub(offset))) as usize; let fut = (this.callback)(inner_file, offset, max_bytes); this.state .project_replace(LiveFileReaderState::Future { fut }); self.poll_next(cx) } } LiveFileReaderStateProj::Future { fut } => { let (file, bytes) = ready!(fut.poll(cx))?; this.state .project_replace(LiveFileReaderState::File { file: Some(file) }); *this.offset += bytes.len() as u64; *this.counter += bytes.len() as u64; Poll::Ready(Some(Ok(bytes))) } } } }