jsondb/src/lib.rs

524 lines
17 KiB
Rust

//! A quick-and-dirty “““embedded database””” library.
//!
//! This is just a library for storing data in JSON files, but with
//! a few added conveniences:
//!
//! * The saved data includes a schema version number, and will be
//! automatically migrated to newer schema versions.
//! * The live data is guarded by a built-in read-write lock which can
//! be used synchronously or from a [tokio] async environment.
//! * Data is saved to the backing JSON file, in a hopefully-atomic
//! fashion, every time a write lock is released.
//!
//! Data can be represented in pretty much any form you can convince
//! [serde] to go along with, except for the following restrictions:
//!
//! * Your data type must be [`Debug`] + [`Send`] + [`Sync`] + `'static`.
//! * Your serialization format shouldn't include a top-level
//! `version` field of its own, as this is reserved for our schema
//! version tracking.
//! * You can't use `#[serde(deny_unknown_fields)]`, as this conflicts
//! with our use of `#[serde(flatten)]`.
use std::{
cmp::Ordering,
ffi::OsString,
fmt::Debug,
io::ErrorKind,
ops::Deref,
path::{Path, PathBuf},
sync::Arc,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{
fs::{rename, File},
io::{AsyncReadExt, AsyncWriteExt},
sync::{mpsc, oneshot, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock},
};
/// A JSON-backed “““database”””.
///
/// This wraps a value that is loaded from a JSON file, automatically
/// migrated forward from previous schema versions, and automatically
/// written to disk when it's updated (we attempt to make saves atomic
/// using the `rename(2)` function).
pub struct JsonDb<T: Schema> {
channel: mpsc::UnboundedSender<Request<T>>,
}
#[derive(Debug)]
enum Request<T> {
Read(oneshot::Sender<OwnedRwLockReadGuard<T>>),
Write(oneshot::Sender<OwnedRwLockWriteGuard<T>>),
Flush(oneshot::Sender<()>),
}
/// Schema for a JSON-backed database.
///
/// This needs to be a (de)serializable type, with a previous schema
/// that can be migrated into the new schema, and a version number
/// which must be the previous schema's version +1, [unless this is
/// version 0][`SchemaV0`]. This can then be automatically parsed
/// from a JSON object containing a `version` field along with the
/// other fields of the corresponding schema version; earlier versions
/// will be migrated to the current version automatically.
pub trait Schema: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
/// Previous schema that can be migrated into the new schema
type Prev: Schema + Into<Self>;
/// Schema version number
const VERSION: u32 = Self::Prev::VERSION + 1;
fn parse(s: &str) -> Result<Self, Error> {
let Version { version } = serde_json::from_str(s)?;
match version.cmp(&Self::VERSION) {
Ordering::Less => Ok(Self::Prev::parse(s)?.into()),
Ordering::Equal => Ok(serde_json::from_str::<VersionedData<Self>>(s)?.data),
Ordering::Greater => Err(Error::UnknownVersion(version)),
}
}
}
/// Marker trait to indicate version 0 of a database schema.
///
/// Implementing this will automatically implement [`Schema`], with
/// version number `0` and `Self` as the previous version.
pub trait SchemaV0: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
/// Set this to false if your version 0 is a pre-`JsonDb` schema
/// that does not include a version number.
const EXPECT_VERSION_NUMBER: bool = true;
}
impl<T: SchemaV0> Schema for T {
type Prev = Self;
const VERSION: u32 = 0;
fn parse(s: &str) -> Result<Self, Error> {
if Self::EXPECT_VERSION_NUMBER {
let Version { version } = serde_json::from_str(s)?;
if version != 0 {
return Err(Error::UnknownVersion(version));
}
}
Ok(serde_json::from_str(s)?)
}
}
#[derive(Deserialize)]
struct Version {
version: u32,
}
#[derive(Deserialize, Serialize)]
struct VersionedData<T> {
version: u32,
#[serde(flatten)]
data: T,
}
/// Errors that can occur while working with [`JsonDb`].
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("I/O error")]
Io(#[from] std::io::Error),
#[error("Failed to parse JSON")]
Json(#[from] serde_json::Error),
#[error("Unknown schema version {0}")]
UnknownVersion(u32),
}
impl<T: Schema + Default> JsonDb<T> {
/// Load a [`JsonDb`] from a given file, creating it and
/// initializing it with the schema's default value if it does not
/// exist.
pub async fn load(path: PathBuf) -> Result<JsonDb<T>, Error> {
Self::load_or_else(path, T::default).await
}
}
async fn save<T: Schema>(data: &T, path: &Path) -> Result<(), Error> {
let mut temp_file_name = OsString::from(".");
temp_file_name.push(path.file_name().unwrap());
temp_file_name.push(".tmp");
let temp_file_path = path.parent().unwrap().join(temp_file_name);
{
let mut temp_file = File::create(&temp_file_path).await?;
temp_file
.write_all(&serde_json::to_vec_pretty(&VersionedData {
version: T::VERSION,
data,
})?)
.await?;
temp_file.sync_all().await?;
}
// Atomically update the actual file
rename(&temp_file_path, &path).await?;
Ok(())
}
impl<T: Schema> JsonDb<T> {
/// Load a [`JsonDb`] from a given file, creating it and
/// initializing it with the provided default value if it does not
/// exist.
pub async fn load_or(path: PathBuf, default: T) -> Result<JsonDb<T>, Error> {
Self::load_or_else(path, || default).await
}
/// Load a [`JsonDb`] from a given file, creating it and
/// initializing it with the provided function if it does not
/// exist.
pub async fn load_or_else<F>(path: PathBuf, default: F) -> Result<JsonDb<T>, Error>
where
F: FnOnce() -> T,
{
let open_result = File::open(&path).await;
let data = match open_result {
Ok(mut f) => {
let mut buf = String::new();
f.read_to_string(&mut buf).await?;
T::parse(&buf)?
}
Err(e) => {
if let ErrorKind::NotFound = e.kind() {
default()
} else {
return Err(e.into());
}
}
};
let (request_send, mut request_recv) = mpsc::unbounded_channel::<Request<T>>();
tokio::spawn(async move {
save(&data, &path).await.expect("Failed to save data");
let lock = Arc::new(RwLock::new(data));
while let Some(request) = request_recv.recv().await {
match request {
Request::Read(response) => {
response
.send(lock.clone().read_owned().await)
.expect("Failed to send read guard");
}
Request::Write(response) => {
response
.send(lock.clone().write_owned().await)
.expect("Failed to send write guard");
save(lock.read().await.deref(), &path)
.await
.expect("Failed to save data");
}
Request::Flush(response) => {
// Once we get around to handling this
// request, we've already flushed data from
// any previously-issued write requests
response
.send(())
.expect("Failed to send flush confirmation");
}
}
}
});
Ok(JsonDb {
channel: request_send,
})
}
fn request_read(&self) -> oneshot::Receiver<OwnedRwLockReadGuard<T>> {
let (send, recv) = oneshot::channel();
self.channel
.send(Request::Read(send))
.expect("Failed to send read lock request");
recv
}
/// Take a read lock on the wrapped data.
pub async fn read(&self) -> OwnedRwLockReadGuard<T> {
self.request_read()
.await
.expect("Failed to receive read lock")
}
/// Synchronous version of [`read`][Self::read].
pub fn blocking_read(&self) -> OwnedRwLockReadGuard<T> {
self.request_read()
.blocking_recv()
.expect("Failed to receive read lock")
}
fn request_write(&self) -> oneshot::Receiver<OwnedRwLockWriteGuard<T>> {
let (send, recv) = oneshot::channel();
self.channel
.send(Request::Write(send))
.expect("Failed to send write lock request");
recv
}
/// Take a write lock on the wrapped data. When the write guard is
/// dropped, it triggers an atomic write of the updated data back
/// to disk.
pub async fn write(&self) -> OwnedRwLockWriteGuard<T> {
self.request_write()
.await
.expect("Failed to receive write lock")
}
/// Synchronous version of [`write`][Self::write].
pub fn blocking_write(&self) -> OwnedRwLockWriteGuard<T> {
self.request_write()
.blocking_recv()
.expect("Failed to receive write lock")
}
fn request_flush(&self) -> oneshot::Receiver<()> {
let (send, recv) = oneshot::channel();
self.channel
.send(Request::Flush(send))
.expect("Failed to send flush request");
recv
}
/// Wait for data to finish flushing to disk. Every call to
/// [`read`][Self::read] or [`write`][Self::write], or their
/// blocking equivalents, also waits for data to be flushed before
/// returning a guard.
pub async fn flush(&self) {
self.request_flush()
.await
.expect("Failed to receive flush confirmation");
}
/// Synchronous version of [`flush`][Self::flush].
pub fn blocking_flush(&self) {
self.request_flush()
.blocking_recv()
.expect("Failed to receive flush confirmation");
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_with::serde_as;
use tempfile::tempdir;
use time::OffsetDateTime;
use super::{JsonDb, Schema, SchemaV0};
#[derive(Default, Debug, PartialEq, Eq, Deserialize, Serialize)]
struct V0 {
name: String,
}
impl SchemaV0 for V0 {}
#[derive(Default, Debug, PartialEq, Eq, Deserialize, Serialize)]
struct V1 {
name: String,
#[serde(default)]
gender: Option<String>,
last_updated: i64,
}
impl Schema for V1 {
type Prev = V0;
}
impl From<V0> for V1 {
fn from(old: V0) -> Self {
V1 {
name: old.name,
gender: None,
last_updated: 0,
}
}
}
#[serde_as]
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
struct V2 {
name: String,
#[serde(default)]
gender: Option<String>,
#[serde_as(as = "time::format_description::well_known::Rfc3339")]
last_updated: OffsetDateTime,
}
impl Default for V2 {
fn default() -> Self {
V2 {
name: String::new(),
gender: None,
last_updated: OffsetDateTime::UNIX_EPOCH,
}
}
}
impl Schema for V2 {
type Prev = V1;
}
impl From<V1> for V2 {
fn from(old: V1) -> Self {
V2 {
name: old.name,
gender: old.gender,
last_updated: OffsetDateTime::from_unix_timestamp(old.last_updated).unwrap(),
}
}
}
const V0DATA: &str = r#"{"version":0,"name":"xenofem"}"#;
const V1DATA: &str =
r#"{"version":1,"name":"xenofem","gender":"dress go spinny","last_updated":1660585235}"#;
const V2DATA: &str = r#"{"version":2,"name":"xenofem","gender":"dress go spinny","last_updated":"2022-08-15T17:47:18Z"}"#;
#[test]
fn parse_v0() {
assert_eq!(
V0::parse(V0DATA).unwrap(),
V0 {
name: String::from("xenofem")
},
);
}
#[test]
fn parse_v1() {
assert_eq!(
V1::parse(V1DATA).unwrap(),
V1 {
name: String::from("xenofem"),
gender: Some(String::from("dress go spinny")),
last_updated: 1660585235
},
);
}
#[test]
fn migrate_v0_v1() {
assert_eq!(
V1::parse(V0DATA).unwrap(),
V1 {
name: String::from("xenofem"),
gender: None,
last_updated: 0
},
);
}
#[test]
fn parse_v2() {
assert_eq!(
V2::parse(V2DATA).unwrap(),
V2 {
name: String::from("xenofem"),
gender: Some(String::from("dress go spinny")),
last_updated: OffsetDateTime::from_unix_timestamp(1660585638).unwrap(),
},
);
}
#[test]
fn migrate_v1_v2() {
assert_eq!(
V2::parse(V1DATA).unwrap(),
V2 {
name: String::from("xenofem"),
gender: Some(String::from("dress go spinny")),
last_updated: time::macros::datetime!(2022-08-15 17:40:35 +00:00)
},
);
}
#[test]
fn migrate_v0_v2() {
assert_eq!(
V2::parse(V0DATA).unwrap(),
V2 {
name: String::from("xenofem"),
gender: None,
last_updated: time::macros::datetime!(1970-01-01 00:00:00 +00:00)
},
);
}
#[tokio::test]
async fn async_load_write_migrate() {
let dir = tempdir().unwrap();
let db_file = dir.path().join("test.json");
{
let db0: JsonDb<V0> = JsonDb::load(db_file.clone()).await.unwrap();
db0.flush().await;
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
assert_eq!(value["version"], 0);
assert_eq!(&value["name"], "");
{
let mut writer = db0.write().await;
writer.name = String::from("mefonex");
}
{
let reader = db0.read().await;
assert_eq!(reader.name, "mefonex");
}
// Reading also awaits a flush
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
assert_eq!(&value["name"], "mefonex");
}
{
let db2: JsonDb<V2> = JsonDb::load(db_file.clone()).await.unwrap();
db2.flush().await;
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
assert_eq!(value["version"], 2);
assert_eq!(&value["name"], "mefonex");
assert_eq!(value["gender"], Value::Null);
assert_eq!(&value["last_updated"], "1970-01-01T00:00:00Z");
{
let mut writer = db2.write().await;
writer.last_updated = OffsetDateTime::from_unix_timestamp(1660585638).unwrap();
}
db2.flush().await;
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
assert_eq!(&value["last_updated"], "2022-08-15T17:47:18Z");
}
}
#[test]
fn blocking_load_write_migrate() {
let rt = tokio::runtime::Runtime::new().unwrap();
let dir = tempdir().unwrap();
let db_file = dir.path().join("test.json");
{
let db0: JsonDb<V0> = rt.block_on(JsonDb::load(db_file.clone())).unwrap();
db0.blocking_flush();
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
assert_eq!(value["version"], 0);
assert_eq!(&value["name"], "");
{
let mut writer = db0.blocking_write();
writer.name = String::from("mefonex");
}
{
let reader = db0.blocking_read();
assert_eq!(reader.name, "mefonex");
}
// Reading also waits for a flush
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
assert_eq!(&value["name"], "mefonex");
}
{
let db2: JsonDb<V2> = rt.block_on(JsonDb::load(db_file.clone())).unwrap();
db2.blocking_flush();
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
assert_eq!(value["version"], 2);
assert_eq!(&value["name"], "mefonex");
assert_eq!(value["gender"], Value::Null);
assert_eq!(&value["last_updated"], "1970-01-01T00:00:00Z");
{
let mut writer = db2.blocking_write();
writer.last_updated = OffsetDateTime::from_unix_timestamp(1660585638).unwrap();
}
db2.blocking_flush();
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
assert_eq!(&value["last_updated"], "2022-08-15T17:47:18Z");
}
}
}