transbeam/src/download.rs

423 lines
13 KiB
Rust

use core::fmt;
use std::{
cmp,
fs::File,
future::Future,
io,
path::PathBuf,
pin::Pin,
task::{Context, Poll},
};
use actix_web::error::{Error, ErrorInternalServerError};
use bytes::Bytes;
use futures_core::{ready, Stream};
use inotify::{Inotify, WatchMask};
use log::trace;
use pin_project_lite::pin_project;
use serde::{de, Deserialize, Deserializer};
use std::{os::unix::fs::MetadataExt, time::SystemTime};
use time::OffsetDateTime;
use actix_web::{
body::{self, BoxBody, SizedStream},
http::{
header::{
self, ContentDisposition, DispositionParam, DispositionType, EntityTag, HeaderValue,
HttpDate, IfMatch, IfModifiedSince, IfNoneMatch, IfUnmodifiedSince,
},
StatusCode,
},
HttpMessage, HttpRequest, HttpResponse, Responder,
};
use actix_files::HttpRange;
use crate::{store::StoredFile, upload::UploadedFile};
#[derive(Clone, Copy)]
pub enum DownloadSelection {
One(usize),
All,
}
impl fmt::Display for DownloadSelection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DownloadSelection::All => write!(f, "all"),
DownloadSelection::One(n) => n.fmt(f),
}
}
}
struct SelectionVisitor;
impl<'de> de::Visitor<'de> for SelectionVisitor {
type Value = DownloadSelection;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, r#"a nonnegative integer or the string "all""#)
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> {
Ok(DownloadSelection::One(v as usize))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
if v == "all" {
Ok(DownloadSelection::All)
} else if let Ok(n) = v.parse::<usize>() {
Ok(DownloadSelection::One(n))
} else {
Err(de::Error::invalid_value(de::Unexpected::Str(v), &self))
}
}
}
impl<'de> Deserialize<'de> for DownloadSelection {
fn deserialize<D: Deserializer<'de>>(de: D) -> Result<DownloadSelection, D::Error> {
de.deserialize_any(SelectionVisitor)
}
}
// This is copied substantially from actix-files, with some tweaks
pub(crate) struct DownloadingFile {
pub(crate) file: File,
pub(crate) storage_path: PathBuf,
pub(crate) info: StoredFile,
pub(crate) selection: DownloadSelection,
}
impl DownloadingFile {
fn selected(&self) -> Option<&UploadedFile> {
match self.selection {
DownloadSelection::All => None,
DownloadSelection::One(n) => Some(self.info.contents.as_ref()?.files.get(n)?),
}
}
fn name(&self) -> &str {
match self.selected() {
None => &self.info.name,
Some(f) => &f.name,
}
}
fn size(&self) -> u64 {
match self.selected() {
None => self.info.size,
Some(f) => f.size,
}
}
fn modtime(&self) -> OffsetDateTime {
match self.selected() {
None => self.info.modtime,
Some(f) => f.modtime,
}
}
fn baseline_offset(&self) -> u64 {
if let (DownloadSelection::One(n), Some(files)) =
(self.selection, self.info.contents.as_ref())
{
crate::zip::file_data_offset(files, n)
} else {
0
}
}
fn etag(&self) -> EntityTag {
let ino = self.file.metadata().map(|md| md.ino()).unwrap_or_default();
let modtime = self.modtime();
EntityTag::new_strong(format!(
"{:x}:{}:{:x}:{:x}:{:x}",
ino,
self.selection,
self.size(),
modtime.unix_timestamp() as u64,
modtime.nanosecond(),
))
}
/// Creates an `HttpResponse` with file as a streaming body.
pub fn into_response(self, req: &HttpRequest) -> HttpResponse<BoxBody> {
let total_size = self.size();
let etag = self.etag();
let last_modified = HttpDate::from(SystemTime::from(self.modtime()));
let precondition_failed = precondition_failed(req, &etag, &last_modified);
let not_modified = not_modified(req, &etag, &last_modified);
let mut res = HttpResponse::build(StatusCode::OK);
res.insert_header((header::CONTENT_SECURITY_POLICY, "sandbox"));
res.insert_header((header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM));
res.insert_header((
header::CONTENT_DISPOSITION,
ContentDisposition {
disposition: DispositionType::Attachment,
parameters: vec![DispositionParam::Filename(self.name().to_string())],
},
));
res.insert_header((header::LAST_MODIFIED, last_modified));
res.insert_header((header::ETAG, etag));
res.insert_header((header::ACCEPT_RANGES, "bytes"));
let mut length = total_size;
let mut offset = 0;
// check for range header
if let Some(ranges) = req.headers().get(header::RANGE) {
if let Ok(ranges_header) = ranges.to_str() {
if let Ok(ranges) = HttpRange::parse(ranges_header, length) {
length = ranges[0].length;
offset = ranges[0].start;
// don't allow compression middleware to modify partial content
res.insert_header((
header::CONTENT_ENCODING,
HeaderValue::from_static("identity"),
));
res.insert_header((
header::CONTENT_RANGE,
format!("bytes {}-{}/{}", offset, offset + length - 1, total_size,),
));
} else {
res.insert_header((header::CONTENT_RANGE, format!("bytes */{}", length)));
return res.status(StatusCode::RANGE_NOT_SATISFIABLE).finish();
};
} else {
return res.status(StatusCode::BAD_REQUEST).finish();
};
};
if precondition_failed {
return res.status(StatusCode::PRECONDITION_FAILED).finish();
} else if not_modified {
return res
.status(StatusCode::NOT_MODIFIED)
.body(body::None::new())
.map_into_boxed_body();
}
let reader = new_live_reader(
length,
self.baseline_offset() + offset,
self.file,
self.storage_path,
);
if offset != 0 || length != total_size {
res.status(StatusCode::PARTIAL_CONTENT);
}
res.body(SizedStream::new(length, reader))
}
}
fn precondition_failed(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool {
if let Some(IfMatch::Items(ref items)) = req.get_header() {
if !items.iter().any(|item| item.strong_eq(etag)) {
return true;
}
}
if let Some(IfUnmodifiedSince(ref since)) = req.get_header() {
if last_modified > since {
return true;
}
}
false
}
fn not_modified(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool {
match req.get_header::<IfNoneMatch>() {
Some(IfNoneMatch::Any) => {
return true;
}
Some(IfNoneMatch::Items(ref items)) => {
return items.iter().any(|item| item.weak_eq(etag));
}
None => (),
}
if let Some(IfModifiedSince(ref since)) = req.get_header() {
if last_modified < since {
return true;
}
}
false
}
impl Responder for DownloadingFile {
type Body = BoxBody;
fn respond_to(self, req: &HttpRequest) -> HttpResponse<Self::Body> {
self.into_response(req)
}
}
pin_project! {
pub struct LiveFileReader<F, Fut> {
size: u64,
offset: u64,
#[pin]
state: LiveFileReaderState<Fut>,
counter: u64,
available_file_size: u64,
callback: F,
#[pin]
events: inotify::EventStream<[u8; 1024]>,
}
}
pin_project! {
#[project = LiveFileReaderStateProj]
#[project_replace = LiveFileReaderStateProjReplace]
enum LiveFileReaderState<Fut> {
File { file: Option<File>, },
Future { #[pin] fut: Fut },
}
}
pub(crate) fn new_live_reader(
size: u64,
offset: u64,
file: File,
storage_path: PathBuf,
) -> impl Stream<Item = Result<Bytes, Error>> {
let mut inotify = Inotify::init().expect("failed to init inotify");
inotify
.add_watch(storage_path, WatchMask::MODIFY | WatchMask::CLOSE_WRITE)
.expect("Failed to add inotify watch");
let events = inotify
.event_stream([0; 1024])
.expect("failed to set up event stream");
LiveFileReader {
size,
offset,
state: LiveFileReaderState::File { file: Some(file) },
counter: 0,
available_file_size: 0,
callback: live_file_reader_callback,
events,
}
}
async fn live_file_reader_callback(
mut file: File,
offset: u64,
max_bytes: usize,
) -> Result<(File, Bytes), Error> {
use io::{Read as _, Seek as _};
let res = actix_web::web::block(move || {
trace!(
"reading up to {} bytes of file starting at {}",
max_bytes,
offset
);
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let n_bytes = std::io::Read::by_ref(&mut file)
.take(max_bytes as u64)
.read_to_end(&mut buf)?;
trace!("got {} bytes from file", n_bytes);
if n_bytes == 0 {
Err(io::Error::from(io::ErrorKind::UnexpectedEof))
} else {
Ok((file, Bytes::from(buf)))
}
})
.await??;
Ok(res)
}
impl<F, Fut> Stream for LiveFileReader<F, Fut>
where
F: Fn(File, u64, usize) -> Fut,
Fut: Future<Output = Result<(File, Bytes), Error>>,
{
type Item = Result<Bytes, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
LiveFileReaderStateProj::File { file } => {
let size = *this.size;
let offset = *this.offset;
let counter = *this.counter;
if size == counter {
Poll::Ready(None)
} else {
let inner_file = file.take().expect("LiveFileReader polled after completion");
if offset >= *this.available_file_size {
trace!(
"offset {} has reached available file size {}, updating metadata",
offset,
this.available_file_size
);
// If we've hit the end of what was available
// last time we checked, check again
*this.available_file_size = match inner_file.metadata() {
Ok(md) => md.len(),
Err(e) => {
return Poll::Ready(Some(Err(e.into())));
}
};
trace!("new available file size: {}", this.available_file_size);
// If we're still at the end, inotify time
if offset >= *this.available_file_size {
trace!("waiting for inotify events");
file.get_or_insert(inner_file);
if ready!(this.events.poll_next(cx)).is_none() {
return Poll::Ready(Some(Err(ErrorInternalServerError(
"inotify stream empty",
))));
}
return self.poll_next(cx);
}
}
let max_bytes = cmp::min(
65_536,
cmp::min(
size.saturating_sub(counter),
this.available_file_size.saturating_sub(offset),
),
) as usize;
let fut = (this.callback)(inner_file, offset, max_bytes);
this.state
.project_replace(LiveFileReaderState::Future { fut });
self.poll_next(cx)
}
}
LiveFileReaderStateProj::Future { fut } => {
let (file, bytes) = ready!(fut.poll(cx))?;
this.state
.project_replace(LiveFileReaderState::File { file: Some(file) });
*this.offset += bytes.len() as u64;
*this.counter += bytes.len() as u64;
Poll::Ready(Some(Ok(bytes)))
}
}
}
}