add remote PDF fetcher with caching
This commit is contained in:
parent
e640a7cd09
commit
91c69f00f5
1364
Cargo.lock
generated
1364
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -2,11 +2,20 @@
|
|||
name = "poop-graph"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
authors = ["xenofem <xenofem@xeno.science>"]
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
bytes = "1.1"
|
||||
futures = "0.3"
|
||||
lazy_static = "1.4"
|
||||
pdf = "0.7.2"
|
||||
regex = "1.5.5"
|
||||
reqwest = { version = "0.11", features = ["rustls-tls", "stream"], default-features = false }
|
||||
scraper = "0.12"
|
||||
thiserror = "1"
|
||||
time = { version = "0.3.9", features = ["formatting", "macros", "parsing"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-util = { version = "0.7", features = ["codec"] }
|
||||
url = "2.2.2"
|
||||
|
|
133
src/fetch.rs
Normal file
133
src/fetch.rs
Normal file
|
@ -0,0 +1,133 @@
|
|||
use std::time::{Duration, Instant, SystemTime};
|
||||
|
||||
use futures::{sink::SinkExt, TryStreamExt};
|
||||
use lazy_static::lazy_static;
|
||||
use reqwest::Url;
|
||||
use scraper::Selector;
|
||||
use time::PrimitiveDateTime;
|
||||
use tokio_util::codec;
|
||||
|
||||
lazy_static! {
|
||||
static ref CHARTS_URL: Url = Url::parse("https://www.mwra.com/biobot/biobotdata.htm").unwrap();
|
||||
static ref PDF_SEL: Selector = Selector::parse(r#"a[href$="-data.pdf"]"#).unwrap();
|
||||
static ref MIN_CHECK_INTERVAL: Duration = Duration::from_secs(300);
|
||||
}
|
||||
|
||||
const CACHED_PDF_PATH: &str = "data.pdf";
|
||||
const LAST_MODIFIED_FORMAT: &[time::format_description::FormatItem] = time::macros::format_description!(
|
||||
"[weekday repr:short], [day] [month repr:short] [year] [hour repr:24]:[minute]:[second] GMT"
|
||||
);
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("HTTP request failed")]
|
||||
Http(#[from] reqwest::Error),
|
||||
#[error("couldn't parse URL")]
|
||||
UrlParse(#[from] url::ParseError),
|
||||
#[error("couldn't read PDF")]
|
||||
Pdf(#[from] pdf::error::PdfError),
|
||||
#[error("I/O error")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("system clock anomaly")]
|
||||
SystemClock(#[from] std::time::SystemTimeError),
|
||||
#[error("data not found")]
|
||||
NotFound,
|
||||
#[error("requesting data too soon")]
|
||||
TooSoon,
|
||||
}
|
||||
|
||||
pub struct PdfFetcher {
|
||||
client: reqwest::Client,
|
||||
last_checked: Option<Instant>,
|
||||
}
|
||||
|
||||
impl PdfFetcher {
|
||||
pub fn new() -> Result<Self, Error> {
|
||||
Ok(Self {
|
||||
client: reqwest::ClientBuilder::new().build()?,
|
||||
last_checked: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn fetch(&mut self) -> Result<pdf::file::File<Vec<u8>>, Error> {
|
||||
let cache_modtime = match tokio::fs::File::open(CACHED_PDF_PATH).await {
|
||||
Ok(file) => Some(
|
||||
file.metadata()
|
||||
.await?
|
||||
.modified()?
|
||||
.duration_since(SystemTime::UNIX_EPOCH)?
|
||||
.as_secs(),
|
||||
),
|
||||
Err(_) => None,
|
||||
};
|
||||
|
||||
if let Some(instant) = self.last_checked {
|
||||
if Instant::now() - instant < *MIN_CHECK_INTERVAL {
|
||||
return if cache_modtime.is_some() {
|
||||
self.cached_pdf()
|
||||
} else {
|
||||
Err(Error::TooSoon)
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
let body = self
|
||||
.client
|
||||
.get(CHARTS_URL.as_ref())
|
||||
.send()
|
||||
.await?
|
||||
.text()
|
||||
.await?;
|
||||
let document = scraper::Html::parse_document(&body);
|
||||
let pdf_href = document
|
||||
.select(&PDF_SEL)
|
||||
.next()
|
||||
.ok_or(Error::NotFound)?
|
||||
.value()
|
||||
.attr("href")
|
||||
.ok_or(Error::NotFound)?;
|
||||
let pdf_url = CHARTS_URL.join(pdf_href)?;
|
||||
|
||||
let origin_modtime = self
|
||||
.client
|
||||
.head(pdf_url.clone())
|
||||
.send()
|
||||
.await?
|
||||
.headers()
|
||||
.get(reqwest::header::LAST_MODIFIED)
|
||||
.and_then(|val| {
|
||||
u64::try_from(
|
||||
PrimitiveDateTime::parse(val.to_str().ok()?, LAST_MODIFIED_FORMAT)
|
||||
.ok()?
|
||||
.assume_utc()
|
||||
.unix_timestamp(),
|
||||
)
|
||||
.ok()
|
||||
});
|
||||
|
||||
let outdated = cache_modtime
|
||||
.zip(origin_modtime)
|
||||
.map_or(true, |(cache, origin)| origin > cache);
|
||||
|
||||
if outdated {
|
||||
let mut pdf_stream = self
|
||||
.client
|
||||
.get(pdf_url)
|
||||
.send()
|
||||
.await?
|
||||
.bytes_stream()
|
||||
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
|
||||
|
||||
let cache_file = tokio::fs::File::create(CACHED_PDF_PATH).await?;
|
||||
let mut sink = codec::FramedWrite::new(cache_file, codec::BytesCodec::new());
|
||||
sink.send_all(&mut pdf_stream).await?;
|
||||
<dyn futures::Sink<bytes::Bytes, Error = std::io::Error> + Unpin>::close(&mut sink)
|
||||
.await?;
|
||||
}
|
||||
self.cached_pdf()
|
||||
}
|
||||
|
||||
fn cached_pdf(&self) -> Result<pdf::file::File<Vec<u8>>, Error> {
|
||||
Ok(pdf::file::File::open(CACHED_PDF_PATH)?)
|
||||
}
|
||||
}
|
|
@ -1,9 +1,13 @@
|
|||
mod extract;
|
||||
mod fetch;
|
||||
|
||||
use extract::DataSet;
|
||||
use fetch::PdfFetcher;
|
||||
|
||||
fn main() {
|
||||
let doc = pdf::file::File::open("data.pdf").expect("Failed to read PDF");
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let mut fetcher = PdfFetcher::new().expect("Failed to initialize PDF fetcher");
|
||||
let doc = fetcher.fetch().await.expect("Failed to fetch PDF");
|
||||
let dataset = DataSet::extract(&doc).expect("Failed to extract dataset");
|
||||
for row in dataset.csv_rows() {
|
||||
println!("{}", row.unwrap());
|
||||
|
|
Loading…
Reference in a new issue