diff --git a/Cargo.lock b/Cargo.lock index 8e0f200..86251a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -597,65 +597,12 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "futures" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-channel" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" -dependencies = [ - "futures-core", - "futures-sink", -] - [[package]] name = "futures-core" version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" -[[package]] -name = "futures-executor" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" - -[[package]] -name = "futures-macro" -version = "0.3.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "futures-sink" version = "0.3.21" @@ -674,16 +621,10 @@ version = "0.3.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" dependencies = [ - "futures-channel", "futures-core", - "futures-io", - "futures-macro", - "futures-sink", "futures-task", - "memchr", "pin-project-lite", "pin-utils", - "slab", ] [[package]] @@ -1036,9 +977,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pin-project-lite" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" [[package]] name = "pin-utils" @@ -1441,8 +1382,10 @@ dependencies = [ "bytes", "crc32fast", "env_logger", - "futures", + "futures-core", "log", + "mime", + "pin-project-lite", "rand", "sanitise-file-name", "serde", diff --git a/Cargo.toml b/Cargo.toml index 76afdc9..0269d3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,8 +16,10 @@ actix-web-actors = "4.1.0" bytes = "1.1.0" crc32fast = "1.3.2" env_logger = "0.9" -futures = "0.3" +futures-core = "0.3" log = "0.4" +mime = "0.3.16" +pin-project-lite = "0.2.9" rand = "0.8.5" sanitise-file-name = "1.0.0" serde = { version = "1.0", features = ["derive"] } diff --git a/src/download.rs b/src/download.rs index a476348..c86d801 100644 --- a/src/download.rs +++ b/src/download.rs @@ -1,496 +1,63 @@ -use std::{ - fs::Metadata, - io, - path::{Path, PathBuf}, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::{fs::File, os::unix::fs::MetadataExt, time::SystemTime}; use actix_web::{ body::{self, BoxBody, SizedStream}, - dev::{ - self, AppService, HttpServiceFactory, ResourceDef, Service, ServiceFactory, - ServiceRequest, ServiceResponse, - }, http::{ header::{ - self, Charset, ContentDisposition, ContentEncoding, DispositionParam, - DispositionType, ExtendedValue, HeaderValue, + self, ContentDisposition, DispositionParam, + DispositionType, HeaderValue, HttpDate, EntityTag, IfUnmodifiedSince, IfMatch, IfNoneMatch, IfModifiedSince, }, StatusCode, }, - Error, HttpMessage, HttpRequest, HttpResponse, Responder, + HttpMessage, HttpRequest, HttpResponse, Responder, }; -use bitflags::bitflags; -use derive_more::{Deref, DerefMut}; -use futures_core::future::LocalBoxFuture; -use mime::Mime; -use mime_guess::from_path; -use crate::{encoding::equiv_utf8_text, range::HttpRange}; +use actix_files::HttpRange; -bitflags! { - pub(crate) struct Flags: u8 { - const ETAG = 0b0000_0001; - const LAST_MD = 0b0000_0010; - const CONTENT_DISPOSITION = 0b0000_0100; - const PREFER_UTF8 = 0b0000_1000; - } +use crate::DownloadableFile; + +// This is copied substantially from actix-files, with some tweaks + +pub(crate) struct DownloadingFile { + pub(crate) file: File, + pub(crate) info: DownloadableFile, } -impl Default for Flags { - fn default() -> Self { - Flags::from_bits_truncate(0b0000_1111) - } -} - -/// A file with an associated name. -/// -/// `NamedFile` can be registered as services: -/// ``` -/// use actix_web::App; -/// use actix_files::NamedFile; -/// -/// # async fn run() -> Result<(), Box> { -/// let file = NamedFile::open_async("./static/index.html").await?; -/// let app = App::new().service(file); -/// # Ok(()) -/// # } -/// ``` -/// -/// They can also be returned from handlers: -/// ``` -/// use actix_web::{Responder, get}; -/// use actix_files::NamedFile; -/// -/// #[get("/")] -/// async fn index() -> impl Responder { -/// NamedFile::open_async("./static/index.html").await -/// } -/// ``` -#[derive(Debug, Deref, DerefMut)] -pub struct NamedFile { - #[deref] - #[deref_mut] - file: File, - path: PathBuf, - modified: Option, - pub(crate) md: Metadata, - pub(crate) flags: Flags, - pub(crate) status_code: StatusCode, - pub(crate) content_type: Mime, - pub(crate) content_disposition: ContentDisposition, - pub(crate) encoding: Option, -} - -pub(crate) use std::fs::File; - -use super::chunked; - -impl NamedFile { - /// Creates an instance from a previously opened file. - /// - /// The given `path` need not exist and is only used to determine the `ContentType` and - /// `ContentDisposition` headers. - /// - /// # Examples - /// ```ignore - /// use std::{ - /// io::{self, Write as _}, - /// env, - /// fs::File - /// }; - /// use actix_files::NamedFile; - /// - /// let mut file = File::create("foo.txt")?; - /// file.write_all(b"Hello, world!")?; - /// let named_file = NamedFile::from_file(file, "bar.txt")?; - /// # std::fs::remove_file("foo.txt"); - /// Ok(()) - /// ``` - pub fn from_file>(file: File, path: P) -> io::Result { - let path = path.as_ref().to_path_buf(); - - // Get the name of the file and use it to construct default Content-Type - // and Content-Disposition values - let (content_type, content_disposition) = { - let filename = match path.file_name() { - Some(name) => name.to_string_lossy(), - None => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Provided path has no filename", - )); - } - }; - - let ct = from_path(&path).first_or_octet_stream(); - - let disposition = match ct.type_() { - mime::IMAGE | mime::TEXT | mime::AUDIO | mime::VIDEO => DispositionType::Inline, - mime::APPLICATION => match ct.subtype() { - mime::JAVASCRIPT | mime::JSON => DispositionType::Inline, - name if name == "wasm" => DispositionType::Inline, - _ => DispositionType::Attachment, - }, - _ => DispositionType::Attachment, - }; - - let mut parameters = - vec![DispositionParam::Filename(String::from(filename.as_ref()))]; - - if !filename.is_ascii() { - parameters.push(DispositionParam::FilenameExt(ExtendedValue { - charset: Charset::Ext(String::from("UTF-8")), - language_tag: None, - value: filename.into_owned().into_bytes(), - })) - } - - let cd = ContentDisposition { - disposition, - parameters, - }; - - (ct, cd) - }; - - let md = { - { - file.metadata()? - } - }; - - let modified = md.modified().ok(); - let encoding = None; - - Ok(NamedFile { - path, - file, - content_type, - content_disposition, - md, - modified, - encoding, - status_code: StatusCode::OK, - flags: Flags::default(), - }) - } - - /// Attempts to open a file in read-only mode. - /// - /// # Examples - /// ``` - /// use actix_files::NamedFile; - /// let file = NamedFile::open("foo.txt"); - /// ``` - pub fn open>(path: P) -> io::Result { - let file = File::open(&path)?; - Self::from_file(file, path) - } - - /// Attempts to open a file asynchronously in read-only mode. - /// - /// When the `experimental-io-uring` crate feature is enabled, this will be async. Otherwise, it - /// will behave just like `open`. - /// - /// # Examples - /// ``` - /// use actix_files::NamedFile; - /// # async fn open() { - /// let file = NamedFile::open_async("foo.txt").await.unwrap(); - /// # } - /// ``` - pub async fn open_async>(path: P) -> io::Result { - let file = { - { - File::open(&path)? - } - }; - - Self::from_file(file, path) - } - - /// Returns reference to the underlying file object. - #[inline] - pub fn file(&self) -> &File { - &self.file - } - - /// Returns the filesystem path to this file. - /// - /// # Examples - /// ``` - /// # use std::io; - /// use actix_files::NamedFile; - /// - /// # async fn path() -> io::Result<()> { - /// let file = NamedFile::open_async("test.txt").await?; - /// assert_eq!(file.path().as_os_str(), "foo.txt"); - /// # Ok(()) - /// # } - /// ``` - #[inline] - pub fn path(&self) -> &Path { - self.path.as_path() - } - - /// Returns the time the file was last modified. - /// - /// Returns `None` only on unsupported platforms; see [`std::fs::Metadata::modified()`]. - /// Therefore, it is usually safe to unwrap this. - #[inline] - pub fn modified(&self) -> Option { - self.modified - } - - /// Returns the filesystem metadata associated with this file. - #[inline] - pub fn metadata(&self) -> &Metadata { - &self.md - } - - /// Returns the `Content-Type` header that will be used when serving this file. - #[inline] - pub fn content_type(&self) -> &Mime { - &self.content_type - } - - /// Returns the `Content-Disposition` that will be used when serving this file. - #[inline] - pub fn content_disposition(&self) -> &ContentDisposition { - &self.content_disposition - } - - /// Returns the `Content-Encoding` that will be used when serving this file. - /// - /// A return value of `None` indicates that the content is not already using a compressed - /// representation and may be subject to compression downstream. - #[inline] - pub fn content_encoding(&self) -> Option { - self.encoding - } - - /// Set response status code. - #[deprecated(since = "0.7.0", note = "Prefer `Responder::customize()`.")] - pub fn set_status_code(mut self, status: StatusCode) -> Self { - self.status_code = status; - self - } - - /// Sets the `Content-Type` header that will be used when serving this file. By default the - /// `Content-Type` is inferred from the filename extension. - #[inline] - pub fn set_content_type(mut self, mime_type: Mime) -> Self { - self.content_type = mime_type; - self - } - - /// Set the Content-Disposition for serving this file. This allows changing the - /// `inline/attachment` disposition as well as the filename sent to the peer. - /// - /// By default the disposition is `inline` for `text/*`, `image/*`, `video/*` and - /// `application/{javascript, json, wasm}` mime types, and `attachment` otherwise, and the - /// filename is taken from the path provided in the `open` method after converting it to UTF-8 - /// (using `to_string_lossy`). - #[inline] - pub fn set_content_disposition(mut self, cd: ContentDisposition) -> Self { - self.content_disposition = cd; - self.flags.insert(Flags::CONTENT_DISPOSITION); - self - } - - /// Disables `Content-Disposition` header. - /// - /// By default, the `Content-Disposition` header is sent. - #[inline] - pub fn disable_content_disposition(mut self) -> Self { - self.flags.remove(Flags::CONTENT_DISPOSITION); - self - } - - /// Sets content encoding for this file. - /// - /// This prevents the `Compress` middleware from modifying the file contents and signals to - /// browsers/clients how to decode it. For example, if serving a compressed HTML file (e.g., - /// `index.html.gz`) then use `.set_content_encoding(ContentEncoding::Gzip)`. - #[inline] - pub fn set_content_encoding(mut self, enc: ContentEncoding) -> Self { - self.encoding = Some(enc); - self - } - - /// Specifies whether to return `ETag` header in response. - /// - /// Default is true. - #[inline] - pub fn use_etag(mut self, value: bool) -> Self { - self.flags.set(Flags::ETAG, value); - self - } - - /// Specifies whether to return `Last-Modified` header in response. - /// - /// Default is true. - #[inline] - pub fn use_last_modified(mut self, value: bool) -> Self { - self.flags.set(Flags::LAST_MD, value); - self - } - - /// Specifies whether text responses should signal a UTF-8 encoding. - /// - /// Default is false (but will default to true in a future version). - #[inline] - pub fn prefer_utf8(mut self, value: bool) -> Self { - self.flags.set(Flags::PREFER_UTF8, value); - self - } - - /// Creates an `ETag` in a format is similar to Apache's. - pub(crate) fn etag(&self) -> Option { - self.modified.as_ref().map(|mtime| { - let ino = { - #[cfg(unix)] - { - #[cfg(unix)] - use std::os::unix::fs::MetadataExt as _; - - self.md.ino() - } - - #[cfg(not(unix))] - { - 0 - } - }; - - let dur = mtime - .duration_since(UNIX_EPOCH) - .expect("modification time must be after epoch"); - - header::EntityTag::new_strong(format!( - "{:x}:{:x}:{:x}:{:x}", - ino, - self.md.len(), - dur.as_secs(), - dur.subsec_nanos() - )) - }) - } - - pub(crate) fn last_modified(&self) -> Option { - self.modified.map(|mtime| mtime.into()) +impl DownloadingFile { + fn etag(&self) -> EntityTag { + let ino = self.file.metadata().map(|md| md.ino()).unwrap_or_default(); + EntityTag::new_strong(format!( + "{:x}:{:x}:{:x}:{:x}", + ino, + self.info.size, + self.info.modtime.unix_timestamp() as u64, + self.info.modtime.nanosecond(), + )) } /// Creates an `HttpResponse` with file as a streaming body. pub fn into_response(self, req: &HttpRequest) -> HttpResponse { - if self.status_code != StatusCode::OK { - let mut res = HttpResponse::build(self.status_code); + let etag = self.etag(); + let last_modified = HttpDate::from(SystemTime::from(self.info.modtime)); - let ct = if self.flags.contains(Flags::PREFER_UTF8) { - equiv_utf8_text(self.content_type.clone()) - } else { - self.content_type - }; + let precondition_failed = precondition_failed(req, &etag, &last_modified); + let not_modified = not_modified(req, &etag, &last_modified); - res.insert_header((header::CONTENT_TYPE, ct.to_string())); + let mut res = HttpResponse::build(StatusCode::OK); - if self.flags.contains(Flags::CONTENT_DISPOSITION) { - res.insert_header(( - header::CONTENT_DISPOSITION, - self.content_disposition.to_string(), - )); + res.insert_header((header::CONTENT_TYPE, mime::APPLICATION_OCTET_STREAM)); + res.insert_header(( + header::CONTENT_DISPOSITION, + ContentDisposition { + disposition: DispositionType::Attachment, + parameters: vec![DispositionParam::Filename(self.info.name)], } - - if let Some(current_encoding) = self.encoding { - res.insert_header((header::CONTENT_ENCODING, current_encoding.as_str())); - } - - let reader = chunked::new_chunked_read(self.md.len(), 0, self.file); - - return res.streaming(reader); - } - - let etag = if self.flags.contains(Flags::ETAG) { - self.etag() - } else { - None - }; - - let last_modified = if self.flags.contains(Flags::LAST_MD) { - self.last_modified() - } else { - None - }; - - // check preconditions - let precondition_failed = if !any_match(etag.as_ref(), req) { - true - } else if let (Some(ref m), Some(header::IfUnmodifiedSince(ref since))) = - (last_modified, req.get_header()) - { - let t1: SystemTime = (*m).into(); - let t2: SystemTime = (*since).into(); - - match (t1.duration_since(UNIX_EPOCH), t2.duration_since(UNIX_EPOCH)) { - (Ok(t1), Ok(t2)) => t1.as_secs() > t2.as_secs(), - _ => false, - } - } else { - false - }; - - // check last modified - let not_modified = if !none_match(etag.as_ref(), req) { - true - } else if req.headers().contains_key(header::IF_NONE_MATCH) { - false - } else if let (Some(ref m), Some(header::IfModifiedSince(ref since))) = - (last_modified, req.get_header()) - { - let t1: SystemTime = (*m).into(); - let t2: SystemTime = (*since).into(); - - match (t1.duration_since(UNIX_EPOCH), t2.duration_since(UNIX_EPOCH)) { - (Ok(t1), Ok(t2)) => t1.as_secs() <= t2.as_secs(), - _ => false, - } - } else { - false - }; - - let mut res = HttpResponse::build(self.status_code); - - let ct = if self.flags.contains(Flags::PREFER_UTF8) { - equiv_utf8_text(self.content_type.clone()) - } else { - self.content_type - }; - - res.insert_header((header::CONTENT_TYPE, ct.to_string())); - - if self.flags.contains(Flags::CONTENT_DISPOSITION) { - res.insert_header(( - header::CONTENT_DISPOSITION, - self.content_disposition.to_string(), - )); - } - - if let Some(current_encoding) = self.encoding { - res.insert_header((header::CONTENT_ENCODING, current_encoding.as_str())); - } - - if let Some(lm) = last_modified { - res.insert_header((header::LAST_MODIFIED, lm.to_string())); - } - - if let Some(etag) = etag { - res.insert_header((header::ETAG, etag.to_string())); - } - + )); + res.insert_header((header::LAST_MODIFIED, last_modified)); + res.insert_header((header::ETAG, etag)); res.insert_header((header::ACCEPT_RANGES, "bytes")); - let mut length = self.md.len(); + let mut length = self.info.size; let mut offset = 0; // check for range header @@ -508,7 +75,7 @@ impl NamedFile { res.insert_header(( header::CONTENT_RANGE, - format!("bytes {}-{}/{}", offset, offset + length - 1, self.md.len()), + format!("bytes {}-{}/{}", offset, offset + length - 1, self.info.size), )); } else { res.insert_header((header::CONTENT_RANGE, format!("bytes */{}", length))); @@ -528,9 +95,9 @@ impl NamedFile { .map_into_boxed_body(); } - let reader = chunked::new_chunked_read(length, offset, self.file); + let reader = crate::file::new_live_reader(length, offset, self.file, self.info.uploader); - if offset != 0 || length != self.md.len() { + if offset != 0 || length != self.info.size { res.status(StatusCode::PARTIAL_CONTENT); } @@ -538,232 +105,40 @@ impl NamedFile { } } -/// Returns true if `req` has no `If-Match` header or one which matches `etag`. -fn any_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { - match req.get_header::() { - None | Some(header::IfMatch::Any) => true, - - Some(header::IfMatch::Items(ref items)) => { - if let Some(some_etag) = etag { - for item in items { - if item.strong_eq(some_etag) { - return true; - } - } - } - - false +fn precondition_failed(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool { + if let Some(IfMatch::Items(ref items)) = req.get_header() { + if !items.iter().any(|item| item.strong_eq(etag)) { + return true; } } -} - -/// Returns true if `req` doesn't have an `If-None-Match` header matching `req`. -fn none_match(etag: Option<&header::EntityTag>, req: &HttpRequest) -> bool { - match req.get_header::() { - Some(header::IfNoneMatch::Any) => false, - - Some(header::IfNoneMatch::Items(ref items)) => { - if let Some(some_etag) = etag { - for item in items { - if item.weak_eq(some_etag) { - return false; - } - } - } - - true + if let Some(IfUnmodifiedSince(ref since)) = req.get_header() { + if last_modified > since { + return true; } - - None => true, } + false } -impl Responder for NamedFile { +fn not_modified(req: &HttpRequest, etag: &EntityTag, last_modified: &HttpDate) -> bool { + match req.get_header::() { + Some(IfNoneMatch::Any) => { return true; } + Some(IfNoneMatch::Items(ref items)) => { + return items.iter().any(|item| item.weak_eq(etag)); + } + None => (), + } + if let Some(IfModifiedSince(ref since)) = req.get_header() { + if last_modified < since { + return true; + } + } + false +} + +impl Responder for DownloadingFile { type Body = BoxBody; fn respond_to(self, req: &HttpRequest) -> HttpResponse { self.into_response(req) } } - -impl ServiceFactory for NamedFile { - type Response = ServiceResponse; - type Error = Error; - type Config = (); - type Service = NamedFileService; - type InitError = (); - type Future = LocalBoxFuture<'static, Result>; - - fn new_service(&self, _: ()) -> Self::Future { - let service = NamedFileService { - path: self.path.clone(), - }; - - Box::pin(async move { Ok(service) }) - } -} - -#[doc(hidden)] -#[derive(Debug)] -pub struct NamedFileService { - path: PathBuf, -} - -impl Service for NamedFileService { - type Response = ServiceResponse; - type Error = Error; - type Future = LocalBoxFuture<'static, Result>; - - dev::always_ready!(); - - fn call(&self, req: ServiceRequest) -> Self::Future { - let (req, _) = req.into_parts(); - - let path = self.path.clone(); - Box::pin(async move { - let file = NamedFile::open_async(path).await?; - let res = file.into_response(&req); - Ok(ServiceResponse::new(req, res)) - }) - } -} - -impl HttpServiceFactory for NamedFile { - fn register(self, config: &mut AppService) { - config.register_service( - ResourceDef::root_prefix(self.path.to_string_lossy().as_ref()), - None, - self, - None, - ) - } -} - - -use std::{ - cmp, fmt, - future::Future, - io, - pin::Pin, - task::{Context, Poll}, -}; - -use actix_web::{error::Error, web::Bytes}; -use futures_core::{ready, Stream}; -use pin_project_lite::pin_project; - -use super::named::File; - -pin_project! { - /// Adapter to read a `std::file::File` in chunks. - #[doc(hidden)] - pub struct ChunkedReadFile { - size: u64, - offset: u64, - #[pin] - state: ChunkedReadFileState, - counter: u64, - callback: F, - } -} - -pin_project! { - #[project = ChunkedReadFileStateProj] - #[project_replace = ChunkedReadFileStateProjReplace] - enum ChunkedReadFileState { - File { file: Option, }, - Future { #[pin] fut: Fut }, - } -} - -impl fmt::Debug for ChunkedReadFile { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("ChunkedReadFile") - } -} - -pub(crate) fn new_chunked_read( - size: u64, - offset: u64, - file: File, -) -> impl Stream> { - ChunkedReadFile { - size, - offset, - state: ChunkedReadFileState::File { file: Some(file) }, - counter: 0, - callback: chunked_read_file_callback, - } -} - -async fn chunked_read_file_callback( - mut file: File, - offset: u64, - max_bytes: usize, -) -> Result<(File, Bytes), Error> { - use io::{Read as _, Seek as _}; - - let res = actix_web::web::block(move || { - let mut buf = Vec::with_capacity(max_bytes); - - file.seek(io::SeekFrom::Start(offset))?; - - let n_bytes = file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?; - - if n_bytes == 0 { - Err(io::Error::from(io::ErrorKind::UnexpectedEof)) - } else { - Ok((file, Bytes::from(buf))) - } - }) - .await??; - - Ok(res) -} - -impl Stream for ChunkedReadFile -where - F: Fn(File, u64, usize) -> Fut, - Fut: Future>, -{ - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.as_mut().project(); - match this.state.as_mut().project() { - ChunkedReadFileStateProj::File { file } => { - let size = *this.size; - let offset = *this.offset; - let counter = *this.counter; - - if size == counter { - Poll::Ready(None) - } else { - let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize; - - let file = file - .take() - .expect("ChunkedReadFile polled after completion"); - - let fut = (this.callback)(file, offset, max_bytes); - - this.state - .project_replace(ChunkedReadFileState::Future { fut }); - - self.poll_next(cx) - } - } - ChunkedReadFileStateProj::Future { fut } => { - let (file, bytes) = ready!(fut.poll(cx))?; - - this.state - .project_replace(ChunkedReadFileState::File { file: Some(file) }); - - *this.offset += bytes.len() as u64; - *this.counter += bytes.len() as u64; - - Poll::Ready(Some(Ok(bytes))) - } - } - } -} diff --git a/src/file.rs b/src/file.rs index 3041b68..50e7330 100644 --- a/src/file.rs +++ b/src/file.rs @@ -1,4 +1,13 @@ -use std::{fs::File, io::Write, path::PathBuf, task::Waker}; +use std::{cmp, fs::File, future::Future, io::{self, Write}, path::PathBuf, pin::Pin, task::{Context, Poll, Waker}}; + +use actix::Addr; +use actix_web::error::{Error, ErrorInternalServerError}; +use bytes::Bytes; +use futures_core::{ready, Stream}; +use log::trace; +use pin_project_lite::pin_project; + +use crate::upload::WakerMessage; pub trait LiveWriter: Write { fn add_waker(&mut self, waker: Waker); @@ -43,3 +52,148 @@ impl Write for LiveFileWriter { self.file.flush() } } + + +// This implementation of a file responder is copied pretty directly +// from actix-files with some tweaks + +pin_project! { + pub struct LiveFileReader { + size: u64, + offset: u64, + #[pin] + state: LiveFileReaderState, + counter: u64, + available_file_size: u64, + callback: F, + uploader: Option>, + } +} + +pin_project! { + #[project = LiveFileReaderStateProj] + #[project_replace = LiveFileReaderStateProjReplace] + enum LiveFileReaderState { + File { file: Option, }, + Future { #[pin] fut: Fut }, + } +} + +pub(crate) fn new_live_reader( + size: u64, + offset: u64, + file: File, + uploader: Option>, +) -> impl Stream> { + LiveFileReader { + size, + offset, + state: LiveFileReaderState::File { file: Some(file) }, + counter: 0, + available_file_size: 0, + callback: live_file_reader_callback, + uploader, + } +} + +async fn live_file_reader_callback( + mut file: File, + offset: u64, + max_bytes: usize, +) -> Result<(File, Bytes), Error> { + use io::{Read as _, Seek as _}; + + let res = actix_web::web::block(move || { + trace!("reading up to {} bytes of file starting at {}", max_bytes, offset); + + let mut buf = Vec::with_capacity(max_bytes); + + file.seek(io::SeekFrom::Start(offset))?; + + let n_bytes = std::io::Read::by_ref(&mut file).take(max_bytes as u64).read_to_end(&mut buf)?; + trace!("got {} bytes from file", n_bytes); + if n_bytes == 0 { + Err(io::Error::from(io::ErrorKind::UnexpectedEof)) + } else { + Ok((file, Bytes::from(buf))) + } + }) + .await??; + + Ok(res) +} + +impl Stream for LiveFileReader +where + F: Fn(File, u64, usize) -> Fut, + Fut: Future>, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + match this.state.as_mut().project() { + LiveFileReaderStateProj::File { file } => { + let size = *this.size; + let offset = *this.offset; + let counter = *this.counter; + + if size == counter { + Poll::Ready(None) + } else { + let inner_file = file + .take() + .expect("LiveFileReader polled after completion"); + + if offset >= *this.available_file_size { + trace!("offset {} has reached available file size {}, updating metadata", offset, this.available_file_size); + // If we've hit the end of what was available + // last time we checked, check again + *this.available_file_size = match inner_file.metadata() { + Ok(md) => md.len(), + Err(e) => { + return Poll::Ready(Some(Err(e.into()))); + } + }; + trace!("new available file size: {}", this.available_file_size); + + // If we're still at the end, wait for a wakeup from the uploader + if offset >= *this.available_file_size { + trace!("requesting wakeup from uploader"); + file.get_or_insert(inner_file); + if let Some(addr) = this.uploader { + if let Ok(()) = addr.try_send(WakerMessage(cx.waker().clone())) { + return Poll::Pending; + } else { + return Poll::Ready(Some(Err(ErrorInternalServerError("Failed to contact file upload actor")))); + } + } else { + return Poll::Ready(Some(Err(ErrorInternalServerError("File upload was not completed")))); + } + } + } + + let max_bytes = cmp::min(65_536, cmp::min(size.saturating_sub(counter), this.available_file_size.saturating_sub(offset))) as usize; + + let fut = (this.callback)(inner_file, offset, max_bytes); + + this.state + .project_replace(LiveFileReaderState::Future { fut }); + + self.poll_next(cx) + } + } + LiveFileReaderStateProj::Future { fut } => { + let (file, bytes) = ready!(fut.poll(cx))?; + + this.state + .project_replace(LiveFileReaderState::File { file: Some(file) }); + + *this.offset += bytes.len() as u64; + *this.counter += bytes.len() as u64; + + Poll::Ready(Some(Ok(bytes))) + } + } + } +} diff --git a/src/main.rs b/src/main.rs index 51260ba..144d612 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -//mod download; +mod download; mod file; mod upload; mod zip; @@ -6,27 +6,27 @@ mod zip; use std::{ collections::HashMap, path::PathBuf, - sync::{mpsc::Sender, RwLock}, - task::Waker, + fs::File, }; use actix::Addr; use actix_web::{ - get, middleware::Logger, web, App, HttpRequest, HttpResponse, HttpServer, Responder, + get, middleware::Logger, web, App, HttpRequest, HttpServer, Responder, HttpResponse, }; use actix_web_actors::ws; use time::OffsetDateTime; +use tokio::sync::RwLock; const APP_NAME: &'static str = "transbeam"; pub struct UploadedFile { name: String, - size: usize, + size: u64, modtime: OffsetDateTime, } impl UploadedFile { - fn new(name: &str, size: usize, modtime: OffsetDateTime) -> Self { + fn new(name: &str, size: u64, modtime: OffsetDateTime) -> Self { Self { name: sanitise_file_name::sanitise(name), size, @@ -35,9 +35,10 @@ impl UploadedFile { } } +#[derive(Clone)] pub struct DownloadableFile { name: String, - size: usize, + size: u64, modtime: OffsetDateTime, uploader: Option>, } @@ -48,8 +49,26 @@ fn storage_dir() -> PathBuf { PathBuf::from(std::env::var("STORAGE_DIR").unwrap_or_else(|_| String::from("storage"))) } +#[get("/download/{file_code}")] +async fn handle_download(req: HttpRequest, path: web::Path, data: AppData) -> actix_web::Result { + let file_code = path.into_inner(); + if !file_code.as_bytes().iter().all(|c| c.is_ascii_alphanumeric()) { + return Ok(HttpResponse::NotFound().finish()); + } + let data = data.read().await; + let info = data.get(&file_code); + if let Some(info) = info { + Ok(download::DownloadingFile { + file: File::open(storage_dir().join(file_code))?, + info: info.clone(), + }.into_response(&req)) + } else { + Ok(HttpResponse::NotFound().finish()) + } +} + #[get("/upload")] -async fn upload_socket(req: HttpRequest, stream: web::Payload, data: AppData) -> impl Responder { +async fn handle_upload(req: HttpRequest, stream: web::Payload, data: AppData) -> impl Responder { ws::start(upload::Uploader::new(data), &req, stream) } @@ -70,7 +89,8 @@ async fn main() -> std::io::Result<()> { App::new() .app_data(data.clone()) .wrap(Logger::default()) - .service(upload_socket) + .service(handle_upload) + .service(handle_download) .service(actix_files::Files::new("/", static_dir.clone()).index_file("index.html")) }) .bind(("127.0.0.1", port))? diff --git a/src/upload.rs b/src/upload.rs index c864247..a87d5d5 100644 --- a/src/upload.rs +++ b/src/upload.rs @@ -1,8 +1,9 @@ -use std::{collections::HashSet, fmt::Display, io::Write, task::Waker}; +use std::{collections::HashSet, io::Write, task::Waker}; use actix::{Actor, ActorContext, AsyncContext, StreamHandler, Message, Handler}; use actix_http::ws::{CloseReason, Item}; use actix_web_actors::ws::{self, CloseCode}; +use bytes::Bytes; use log::{debug, error, info, trace}; use rand::distributions::{Alphanumeric, DistString}; use serde::Deserialize; @@ -21,8 +22,6 @@ enum Error { Storage(#[from] std::io::Error), #[error("Time formatting error")] TimeFormat(#[from] time::error::Format), - #[error("Lock on app state is poisoned")] - LockPoisoned, #[error("Duplicate filename could not be deduplicated")] DuplicateFilename, #[error("This message type was not expected at this stage")] @@ -41,7 +40,6 @@ impl Error { Self::Parse(_) => CloseCode::Invalid, Self::Storage(_) => CloseCode::Error, Self::TimeFormat(_) => CloseCode::Error, - Self::LockPoisoned => CloseCode::Error, Self::DuplicateFilename => CloseCode::Policy, Self::UnexpectedMessageType => CloseCode::Invalid, Self::NoFiles => CloseCode::Policy, @@ -55,7 +53,7 @@ pub struct Uploader { writer: Option>, storage_filename: String, app_data: super::AppData, - bytes_remaining: usize, + bytes_remaining: u64, } impl Uploader { @@ -87,7 +85,7 @@ impl Handler for Uploader { #[derive(Debug, Deserialize)] struct RawUploadedFile { name: String, - size: usize, + size: u64, modtime: i64, } @@ -129,9 +127,13 @@ impl StreamHandler> for Uploader { code: CloseCode::Normal, description: None, })); - if let Some(f) = self.app_data.write().unwrap().get_mut(&self.storage_filename) { - f.uploader.take(); - } + let data = self.app_data.clone(); + let filename = self.storage_filename.clone(); + ctx.wait(actix::fut::wrap_future(async move { + if let Some(f) = data.write().await.get_mut(&filename) { + f.uploader.take(); + } + })); ctx.stop(); } _ => (), @@ -181,61 +183,51 @@ impl Uploader { let storage_path = super::storage_dir().join(storage_filename.clone()); info!("storing to: {:?}", storage_path); let writer = super::file::LiveFileWriter::new(&storage_path)?; - if files.len() > 1 { + let addr = Some(ctx.address()); + let (writer, downloadable_file): (Box, _) = if files.len() > 1 { info!("Wrapping in zipfile generator"); let now = OffsetDateTime::now_utc(); - let writer = super::zip::ZipGenerator::new(files, Box::new(writer)); - let size = writer.total_size(); - self.writer = Some(Box::new(writer)); + let zip_writer = super::zip::ZipGenerator::new(files, writer); + let size = zip_writer.total_size(); let download_filename = super::APP_NAME.to_owned() + &now.format(FILENAME_DATE_FORMAT)? + ".zip"; - let modtime = now; - self.app_data - .write() - .map_err(|_| Error::LockPoisoned)? - .insert( - storage_filename, - DownloadableFile { - name: download_filename, - size, - modtime, - uploader: Some(ctx.address()), - }, - ); + (Box::new(zip_writer), DownloadableFile { + name: download_filename, + size, + modtime: now, + uploader: addr, + }) } else { - self.writer = Some(Box::new(writer)); - self.app_data + (Box::new(writer), DownloadableFile { + name: files[0].name.clone(), + size: files[0].size, + modtime: files[0].modtime, + uploader: addr, + }) + }; + self.writer = Some(writer); + let data = self.app_data.clone(); + let storage_filename_copy = storage_filename.clone(); + ctx.spawn(actix::fut::wrap_future(async move { + data .write() - .map_err(|_| Error::LockPoisoned)? + .await .insert( - storage_filename, - DownloadableFile { - name: files[0].name.clone(), - size: files[0].size, - modtime: files[0].modtime, - uploader: Some(ctx.address()), - }, + storage_filename_copy, + downloadable_file, ); - } + })); ctx.text(self.storage_filename.as_str()); } ws::Message::Binary(data) - | ws::Message::Continuation(Item::FirstBinary(data)) - | ws::Message::Continuation(Item::Continue(data)) | ws::Message::Continuation(Item::Last(data)) => { - if let Some(ref mut writer) = self.writer { - if data.len() > self.bytes_remaining { - return Err(Error::TooMuchData); - } - self.bytes_remaining -= data.len(); - writer.write_all(&data)?; - ack(ctx); - if self.bytes_remaining == 0 { - return Ok(true); - } - } else { - return Err(Error::UnexpectedMessageType); - } + let result = self.handle_data(data)?; + ack(ctx); + return Ok(result); + } + ws::Message::Continuation(Item::FirstBinary(data)) + | ws::Message::Continuation(Item::Continue(data)) => { + return self.handle_data(data); } ws::Message::Close(reason) => { if self.bytes_remaining > 0 { @@ -255,4 +247,17 @@ impl Uploader { } Ok(false) } + + fn handle_data(&mut self, data: Bytes) -> Result { + if let Some(ref mut writer) = self.writer { + if (data.len() as u64) > self.bytes_remaining { + return Err(Error::TooMuchData); + } + self.bytes_remaining -= data.len() as u64; + writer.write_all(&data)?; + Ok(self.bytes_remaining == 0) + } else { + Err(Error::UnexpectedMessageType) + } + } } diff --git a/src/zip.rs b/src/zip.rs index 714ed53..ee112d0 100644 --- a/src/zip.rs +++ b/src/zip.rs @@ -8,30 +8,30 @@ use time::OffsetDateTime; use crate::file::LiveWriter; use crate::UploadedFile; -const SIGNATURE_SIZE: usize = 4; -const SHARED_FIELDS_SIZE: usize = 26; -const EXTRA_FIELD_SIZE: usize = 41; -const LOCAL_HEADER_SIZE_MINUS_FILENAME: usize = +const SIGNATURE_SIZE: u64 = 4; +const SHARED_FIELDS_SIZE: u64 = 26; +const EXTRA_FIELD_SIZE: u64 = 41; +const LOCAL_HEADER_SIZE_MINUS_FILENAME: u64 = SIGNATURE_SIZE + SHARED_FIELDS_SIZE + EXTRA_FIELD_SIZE; -const DATA_DESCRIPTOR_SIZE: usize = 24; -const FILE_ENTRY_SIZE_MINUS_FILENAME_AND_FILE: usize = +const DATA_DESCRIPTOR_SIZE: u64 = 24; +const FILE_ENTRY_SIZE_MINUS_FILENAME_AND_FILE: u64 = LOCAL_HEADER_SIZE_MINUS_FILENAME + DATA_DESCRIPTOR_SIZE; -const CENTRAL_DIRECTORY_HEADER_SIZE_MINUS_FILENAME: usize = +const CENTRAL_DIRECTORY_HEADER_SIZE_MINUS_FILENAME: u64 = SIGNATURE_SIZE + 2 + SHARED_FIELDS_SIZE + 14 + EXTRA_FIELD_SIZE; -const EOCD64_RECORD_SIZE: usize = 56; -const EOCD64_LOCATOR_SIZE: usize = 20; -const EOCD_RECORD_SIZE: usize = 22; -const EOCD_TOTAL_SIZE: usize = EOCD64_RECORD_SIZE + EOCD64_LOCATOR_SIZE + EOCD_RECORD_SIZE; +const EOCD64_RECORD_SIZE: u64 = 56; +const EOCD64_LOCATOR_SIZE: u64 = 20; +const EOCD_RECORD_SIZE: u64 = 22; +const EOCD_TOTAL_SIZE: u64 = EOCD64_RECORD_SIZE + EOCD64_LOCATOR_SIZE + EOCD_RECORD_SIZE; const EMPTY_STRING_CRC32: u32 = 0; -fn file_entry_size(file: &UploadedFile) -> usize { - FILE_ENTRY_SIZE_MINUS_FILENAME_AND_FILE + file.name.len() + file.size +fn file_entry_size(file: &UploadedFile) -> u64 { + FILE_ENTRY_SIZE_MINUS_FILENAME_AND_FILE + file.name.len() as u64 + file.size } -fn file_entries_size(files: &[UploadedFile]) -> usize { +fn file_entries_size(files: &[UploadedFile]) -> u64 { let mut total = 0; for file in files.iter() { total += file_entry_size(file) @@ -39,15 +39,15 @@ fn file_entries_size(files: &[UploadedFile]) -> usize { total } -fn central_directory_size(files: &[UploadedFile]) -> usize { +fn central_directory_size(files: &[UploadedFile]) -> u64 { let mut total = 0; for file in files.iter() { - total += CENTRAL_DIRECTORY_HEADER_SIZE_MINUS_FILENAME + file.name.len(); + total += CENTRAL_DIRECTORY_HEADER_SIZE_MINUS_FILENAME + file.name.len() as u64; } total } -fn zipfile_size(files: &[UploadedFile]) -> usize { +fn zipfile_size(files: &[UploadedFile]) -> u64 { file_entries_size(files) + central_directory_size(files) + EOCD_TOTAL_SIZE } @@ -102,7 +102,7 @@ impl UploadedFile { fields } - fn extra_field(&self, local_header_offset: usize) -> Vec { + fn extra_field(&self, local_header_offset: u64) -> Vec { let mut field = vec![ 0x01, 0x00, // Zip64 extended information 28, 0, // 28 bytes of data @@ -111,9 +111,9 @@ impl UploadedFile { // header, we're supposed to leave these blank and point to // the data descriptor, but I'm assuming it won't hurt to fill // them in regardless - append_value(&mut field, self.size as u64, 8); - append_value(&mut field, self.size as u64, 8); - append_value(&mut field, local_header_offset as u64, 8); + append_value(&mut field, self.size, 8); + append_value(&mut field, self.size, 8); + append_value(&mut field, local_header_offset, 8); append_0(&mut field, 4); // File starts on disk 0, there's no other disk field.append(&mut vec![ @@ -126,7 +126,7 @@ impl UploadedFile { field } - fn local_header(&self, local_header_offset: usize) -> Vec { + fn local_header(&self, local_header_offset: u64) -> Vec { let mut header = vec![0x50, 0x4b, 0x03, 0x04]; // Local file header signature header.append(&mut self.shared_header_fields(None)); header.append(&mut self.name.clone().into_bytes()); @@ -134,7 +134,7 @@ impl UploadedFile { header } - fn central_directory_header(&self, local_header_offset: usize, hash: u32) -> Vec { + fn central_directory_header(&self, local_header_offset: u64, hash: u32) -> Vec { let mut header = vec![ 0x50, 0x4b, 0x01, 0x02, // Central directory file header signature 45, 3, // Made by a Unix system supporting version 4.5 @@ -152,15 +152,15 @@ impl UploadedFile { let mut descriptor = vec![0x50, 0x4b, 0x07, 0x08]; // Data descriptor signature append_value(&mut descriptor, hash as u64, 4); // Compressed and uncompressed sizes - append_value(&mut descriptor, self.size as u64, 8); - append_value(&mut descriptor, self.size as u64, 8); + append_value(&mut descriptor, self.size, 8); + append_value(&mut descriptor, self.size, 8); descriptor } } fn end_of_central_directory(files: &[UploadedFile]) -> Vec { - let entries_size = file_entries_size(files) as u64; - let directory_size = central_directory_size(files) as u64; + let entries_size = file_entries_size(files); + let directory_size = central_directory_size(files); let mut eocd = vec![ 0x50, 0x4b, 0x06, 0x06, // EOCD64 record signature @@ -190,18 +190,18 @@ fn end_of_central_directory(files: &[UploadedFile]) -> Vec { eocd } -pub struct ZipGenerator<'a> { +pub struct ZipGenerator { files: Vec, file_index: usize, - byte_index: usize, + byte_index: u64, pending_metadata: Vec, hasher: Hasher, hashes: Vec, - output: Box, + output: W, } -impl<'a> ZipGenerator<'a> { - pub fn new(files: Vec, output: Box) -> Self { +impl ZipGenerator { + pub fn new(files: Vec, output: W) -> Self { let mut result = Self { files, file_index: 0, @@ -215,7 +215,7 @@ impl<'a> ZipGenerator<'a> { result } - pub fn total_size(&self) -> usize { + pub fn total_size(&self) -> u64 { zipfile_size(&self.files) } @@ -225,7 +225,7 @@ impl<'a> ZipGenerator<'a> { self.pending_metadata .append(&mut self.files[self.file_index].data_descriptor(hash)); debug!( - "Finishing file entry in zipfile: {}, hash {}", + "Finishing file entry in zipfile: {}, hash {:x}", self.files[self.file_index].name, hash ); self.file_index += 1; @@ -243,7 +243,7 @@ impl<'a> ZipGenerator<'a> { let mut local_header = self.files[self.file_index].local_header(offset); let mut data_descriptor = self.files[self.file_index].data_descriptor(EMPTY_STRING_CRC32); - offset += local_header.len() + data_descriptor.len(); + offset += local_header.len() as u64 + data_descriptor.len() as u64; self.file_index += 1; self.pending_metadata.append(&mut local_header); self.pending_metadata.append(&mut data_descriptor); @@ -279,13 +279,13 @@ impl<'a> ZipGenerator<'a> { } } -impl<'a> LiveWriter for ZipGenerator<'a> { +impl LiveWriter for ZipGenerator { fn add_waker(&mut self, waker: Waker) { self.output.add_waker(waker); } } -impl<'a> Write for ZipGenerator<'a> { +impl Write for ZipGenerator { fn write(&mut self, mut buf: &[u8]) -> std::io::Result { while !self.pending_metadata.is_empty() { let result = self.output.write(self.pending_metadata.as_slice()); @@ -302,14 +302,15 @@ impl<'a> Write for ZipGenerator<'a> { return Ok(0); } let bytes_remaining = self.files[self.file_index].size - self.byte_index; - if bytes_remaining < buf.len() { - buf = &buf[..bytes_remaining]; + if bytes_remaining < (buf.len() as u64) { + buf = &buf[..bytes_remaining as usize]; } let result = self.output.write(buf); match result { Ok(0) | Err(_) => (), Ok(n) => { self.hasher.update(&buf[..n]); + let n = n as u64; self.byte_index += n; if n == bytes_remaining { self.finish_file();