use core::fmt; use std::{ cmp, fs::File, future::Future, io, path::PathBuf, pin::Pin, task::{Context, Poll}, }; use actix_web::error::{Error, ErrorInternalServerError}; use bytes::Bytes; use futures_core::{ready, Stream}; use inotify::{Inotify, WatchMask}; use log::trace; use pin_project_lite::pin_project; use serde::{de, Deserialize, Deserializer}; use std::{os::unix::fs::MetadataExt, time::SystemTime}; use time::OffsetDateTime; use actix_web::{ body::{self, BoxBody, SizedStream}, http::{ header::{ self, ContentDisposition, DispositionParam, DispositionType, EntityTag, HeaderValue, HttpDate, IfMatch, IfModifiedSince, IfNoneMatch, IfUnmodifiedSince, }, StatusCode, }, HttpMessage, HttpRequest, HttpResponse, Responder, }; use actix_files::HttpRange; use crate::{store::StoredFile, upload::UploadedFile}; #[derive(Clone, Copy)] pub enum DownloadSelection { One(usize), All, } impl fmt::Display for DownloadSelection { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { DownloadSelection::All => write!(f, "all"), DownloadSelection::One(n) => n.fmt(f), } } } struct SelectionVisitor; impl<'de> de::Visitor<'de> for SelectionVisitor { type Value = DownloadSelection; fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { write!(formatter, r#"a nonnegative integer or the string "all""#) } fn visit_u64(self, v: u64) -> Result { Ok(DownloadSelection::One(v as usize)) } fn visit_str(self, v: &str) -> Result where E: de::Error, { if v == "all" { Ok(DownloadSelection::All) } else if let Ok(n) = v.parse::() { Ok(DownloadSelection::One(n)) } else { Err(de::Error::invalid_value(de::Unexpected::Str(v), &self)) } } } impl<'de> Deserialize<'de> for DownloadSelection { fn deserialize>(de: D) -> Result { de.deserialize_any(SelectionVisitor) } } // This is copied substantially from actix-files, with some tweaks pub(crate) struct DownloadingFile { pub(crate) file: File, pub(crate) storage_path: PathBuf, pub(crate) info: StoredFile, pub(crate) selection: DownloadSelection, } impl DownloadingFile { fn selected(&self) -> Option<&UploadedFile> { match self.selection { DownloadSelection::All => None, DownloadSelection::One(n) => Some(self.info.contents.as_ref()?.get(n)?), } } fn name(&self) -> &str { match self.selected() { None => &self.info.name, Some(f) => &f.name, } } fn size(&self) -> u64 { match self.selected() { None => self.info.size, Some(f) => f.size, } } fn modtime(&self) -> OffsetDateTime { match self.selected() { None => self.info.modtime, Some(f) => f.modtime, } } fn baseline_offset(&self) -> u64 { if let (DownloadSelection::One(n), Some(files)) = (self.selection, self.info.contents.as_ref()) { crate::zip::file_data_offset(files, n) } else { 0 } } fn etag(&self) -> EntityTag { let ino = self.file.metadata().map(|md| md.ino()).unwrap_or_default(); let modtime = self.modtime(); EntityTag::new_strong(format!( "{:x}:{}:{:x}:{:x}:{:x}", ino, self.selection, self.size(), modtime.unix_timestamp() as u64, modtime.nanosecond(), )) } /// Creates an `HttpResponse` with file as a streaming body. pub fn into_response(self, req: &HttpRequest) -> HttpResponse { let total_size = self.size(); let etag = self.etag(); let last_modified = HttpDate::from(SystemTime::from(self.modtime())); let precondition_failed = precondition_failed(req, &etag, &last_modified); let not_modified = not_modified(req, &etag, &last_modified); let mut res = HttpResponse::build(StatusCode::OK); res.insert_header((header::CONTENT_SECURITY_POLICY, "sandbox")); res.insert_header((header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM)); res.insert_header(( header::CONTENT_DISPOSITION, ContentDisposition { disposition: DispositionType::Attachment, parameters: vec![DispositionParam::Filename(self.name().to_string())], }, )); res.insert_header((header::LAST_MODIFIED, last_modified)); res.insert_header((header::ETAG, etag)); res.insert_header((header::ACCEPT_RANGES, "bytes")); let mut length = total_size; let mut offset = 0; // check for range header if let Some(ranges) = req.headers().get(header::RANGE) { if let Ok(ranges_header) = ranges.to_str() { if let Ok(ranges) = HttpRange::parse(ranges_header, length) { length = ranges[0].length; offset = ranges[0].start; // don't allow compression middleware to modify partial content res.insert_header(( header::CONTENT_ENCODING, HeaderValue::from_static("identity"), )); res.insert_header(( header::CONTENT_RANGE, format!("bytes {}-{}/{}", offset, offset + length - 1, total_size,), )); } else { res.insert_header((header::CONTENT_RANGE, format!("bytes */{}", length))); return res.status(StatusCode::RANGE_NOT_SATISFIABLE).finish(); }; } else { return res.status(StatusCode::BAD_REQUEST).finish(); }; }; if precondition_failed { return res.status(StatusCode::PRECONDITION_FAILED).finish(); } else if not_modified { return res .status(StatusCode::NOT_MODIFIED) .body(body::None::new()) .map_into_boxed_body(); } let reader = new_live_reader( length, self.baseline_offset() + offset, self.file, self.storage_path, ); if offset != 0 || length != total_size { res.status(StatusCode::PARTIAL_CONTENT); } res.body(SizedStream::new(length, reader)) } } fn precondition_failed(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool { if let Some(IfMatch::Items(ref items)) = req.get_header() { if !items.iter().any(|item| item.strong_eq(etag)) { return true; } } if let Some(IfUnmodifiedSince(ref since)) = req.get_header() { if last_modified > since { return true; } } false } fn not_modified(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool { match req.get_header::() { Some(IfNoneMatch::Any) => { return true; } Some(IfNoneMatch::Items(ref items)) => { return items.iter().any(|item| item.weak_eq(etag)); } None => (), } if let Some(IfModifiedSince(ref since)) = req.get_header() { if last_modified < since { return true; } } false } impl Responder for DownloadingFile { type Body = BoxBody; fn respond_to(self, req: &HttpRequest) -> HttpResponse { self.into_response(req) } } pin_project! { pub struct LiveFileReader { size: u64, offset: u64, #[pin] state: LiveFileReaderState, counter: u64, available_file_size: u64, callback: F, #[pin] events: inotify::EventStream<[u8; 1024]>, } } pin_project! { #[project = LiveFileReaderStateProj] #[project_replace = LiveFileReaderStateProjReplace] enum LiveFileReaderState { File { file: Option, }, Future { #[pin] fut: Fut }, } } pub(crate) fn new_live_reader( size: u64, offset: u64, file: File, storage_path: PathBuf, ) -> impl Stream> { let mut inotify = Inotify::init().expect("failed to init inotify"); inotify .add_watch(storage_path, WatchMask::MODIFY | WatchMask::CLOSE_WRITE) .expect("Failed to add inotify watch"); let events = inotify .event_stream([0; 1024]) .expect("failed to set up event stream"); LiveFileReader { size, offset, state: LiveFileReaderState::File { file: Some(file) }, counter: 0, available_file_size: 0, callback: live_file_reader_callback, events, } } async fn live_file_reader_callback( mut file: File, offset: u64, max_bytes: usize, ) -> Result<(File, Bytes), Error> { use io::{Read as _, Seek as _}; let res = actix_web::web::block(move || { trace!( "reading up to {} bytes of file starting at {}", max_bytes, offset ); let mut buf = Vec::with_capacity(max_bytes); file.seek(io::SeekFrom::Start(offset))?; let n_bytes = std::io::Read::by_ref(&mut file) .take(max_bytes as u64) .read_to_end(&mut buf)?; trace!("got {} bytes from file", n_bytes); if n_bytes == 0 { Err(io::Error::from(io::ErrorKind::UnexpectedEof)) } else { Ok((file, Bytes::from(buf))) } }) .await??; Ok(res) } impl Stream for LiveFileReader where F: Fn(File, u64, usize) -> Fut, Fut: Future>, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut().project(); match this.state.as_mut().project() { LiveFileReaderStateProj::File { file } => { let size = *this.size; let offset = *this.offset; let counter = *this.counter; if size == counter { Poll::Ready(None) } else { let inner_file = file.take().expect("LiveFileReader polled after completion"); if offset >= *this.available_file_size { trace!( "offset {} has reached available file size {}, updating metadata", offset, this.available_file_size ); // If we've hit the end of what was available // last time we checked, check again *this.available_file_size = match inner_file.metadata() { Ok(md) => md.len(), Err(e) => { return Poll::Ready(Some(Err(e.into()))); } }; trace!("new available file size: {}", this.available_file_size); // If we're still at the end, inotify time if offset >= *this.available_file_size { trace!("waiting for inotify events"); file.get_or_insert(inner_file); if ready!(this.events.poll_next(cx)).is_none() { return Poll::Ready(Some(Err(ErrorInternalServerError( "inotify stream empty", )))); } return self.poll_next(cx); } } let max_bytes = cmp::min( 65_536, cmp::min( size.saturating_sub(counter), this.available_file_size.saturating_sub(offset), ), ) as usize; let fut = (this.callback)(inner_file, offset, max_bytes); this.state .project_replace(LiveFileReaderState::Future { fut }); self.poll_next(cx) } } LiveFileReaderStateProj::Future { fut } => { let (file, bytes) = ready!(fut.poll(cx))?; this.state .project_replace(LiveFileReaderState::File { file: Some(file) }); *this.offset += bytes.len() as u64; *this.counter += bytes.len() as u64; Poll::Ready(Some(Ok(bytes))) } } } }