v0.2.0 with much better read/write API
This commit is contained in:
parent
cd0a92fe94
commit
e6ec9fd202
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "jsondb"
|
name = "jsondb"
|
||||||
version = "0.1.1"
|
version = "0.2.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
authors = ["xenofem <xenofem@xeno.science>"]
|
authors = ["xenofem <xenofem@xeno.science>"]
|
||||||
|
|
||||||
|
|
278
src/lib.rs
278
src/lib.rs
|
@ -5,25 +5,37 @@
|
||||||
//!
|
//!
|
||||||
//! * The saved data includes a schema version number, and will be
|
//! * The saved data includes a schema version number, and will be
|
||||||
//! automatically migrated to newer schema versions.
|
//! automatically migrated to newer schema versions.
|
||||||
|
//! * The live data is guarded by a built-in read-write lock which can
|
||||||
|
//! be used synchronously or from a [tokio] async environment.
|
||||||
//! * Data is saved to the backing JSON file, in a hopefully-atomic
|
//! * Data is saved to the backing JSON file, in a hopefully-atomic
|
||||||
//! fashion, every time it's modified.
|
//! fashion, every time a write lock is released.
|
||||||
//! * All I/O operations are async using [tokio].
|
|
||||||
//!
|
//!
|
||||||
//! Data can be represented in pretty much any format you can convince
|
//! Data can be represented in pretty much any form you can convince
|
||||||
//! [serde] to go along with, except for two restrictions:
|
//! [serde] to go along with, except for the following restrictions:
|
||||||
//!
|
//!
|
||||||
|
//! * Your data type must be [`Debug`] + [`Send`] + [`Sync`] + `'static`.
|
||||||
//! * Your serialization format shouldn't include a top-level
|
//! * Your serialization format shouldn't include a top-level
|
||||||
//! `version` field of its own, as this is reserved for our schema
|
//! `version` field of its own, as this is reserved for our schema
|
||||||
//! version tracking.
|
//! version tracking.
|
||||||
//! * You can't use `#[serde(deny_unknown_fields)]`, as this conflicts
|
//! * You can't use `#[serde(deny_unknown_fields)]`, as this conflicts
|
||||||
//! with our use of `#[serde(flatten)]`.
|
//! with our use of `#[serde(flatten)]`.
|
||||||
|
|
||||||
use std::{cmp::Ordering, ffi::OsString, future::Future, io::ErrorKind, path::PathBuf};
|
use std::{
|
||||||
|
cmp::Ordering,
|
||||||
|
ffi::OsString,
|
||||||
|
fmt::Debug,
|
||||||
|
future::Future,
|
||||||
|
io::ErrorKind,
|
||||||
|
ops::Deref,
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::{rename, File},
|
fs::{rename, File},
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
|
sync::{mpsc, oneshot, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A JSON-backed “““database”””.
|
/// A JSON-backed “““database”””.
|
||||||
|
@ -33,8 +45,14 @@ use tokio::{
|
||||||
/// written to disk when it's updated (we attempt to make saves atomic
|
/// written to disk when it's updated (we attempt to make saves atomic
|
||||||
/// using the `rename(2)` function).
|
/// using the `rename(2)` function).
|
||||||
pub struct JsonDb<T: Schema> {
|
pub struct JsonDb<T: Schema> {
|
||||||
path: PathBuf,
|
channel: mpsc::UnboundedSender<Request<T>>,
|
||||||
data: T,
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum Request<T> {
|
||||||
|
Read(oneshot::Sender<OwnedRwLockReadGuard<T>>),
|
||||||
|
Write(oneshot::Sender<OwnedRwLockWriteGuard<T>>),
|
||||||
|
Flush(oneshot::Sender<()>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Schema for a JSON-backed database.
|
/// Schema for a JSON-backed database.
|
||||||
|
@ -46,7 +64,7 @@ pub struct JsonDb<T: Schema> {
|
||||||
/// from a JSON object containing a `version` field along with the
|
/// from a JSON object containing a `version` field along with the
|
||||||
/// other fields of the corresponding schema version; earlier versions
|
/// other fields of the corresponding schema version; earlier versions
|
||||||
/// will be migrated to the current version automatically.
|
/// will be migrated to the current version automatically.
|
||||||
pub trait Schema: DeserializeOwned + Serialize {
|
pub trait Schema: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
|
||||||
/// Previous schema that can be migrated into the new schema
|
/// Previous schema that can be migrated into the new schema
|
||||||
type Prev: Schema + Into<Self>;
|
type Prev: Schema + Into<Self>;
|
||||||
|
|
||||||
|
@ -67,7 +85,7 @@ pub trait Schema: DeserializeOwned + Serialize {
|
||||||
///
|
///
|
||||||
/// Implementing this will automatically implement [`Schema`], with
|
/// Implementing this will automatically implement [`Schema`], with
|
||||||
/// version number `0` and `Self` as the previous version.
|
/// version number `0` and `Self` as the previous version.
|
||||||
pub trait SchemaV0: DeserializeOwned + Serialize {
|
pub trait SchemaV0: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
|
||||||
/// Set this to false if your version 0 is a pre-`JsonDb` schema
|
/// Set this to false if your version 0 is a pre-`JsonDb` schema
|
||||||
/// that does not include a version number.
|
/// that does not include a version number.
|
||||||
const EXPECT_VERSION_NUMBER: bool = true;
|
const EXPECT_VERSION_NUMBER: bool = true;
|
||||||
|
@ -115,23 +133,44 @@ impl<T: Schema + Default> JsonDb<T> {
|
||||||
/// Load a [`JsonDb`] from a given file, creating it and
|
/// Load a [`JsonDb`] from a given file, creating it and
|
||||||
/// initializing it with the schema's default value if it does not
|
/// initializing it with the schema's default value if it does not
|
||||||
/// exist.
|
/// exist.
|
||||||
pub async fn load(path: PathBuf) -> Result<Self, Error> {
|
pub async fn load(path: PathBuf) -> Result<JsonDb<T>, Error> {
|
||||||
Self::load_or_else(path, || std::future::ready(Ok(T::default()))).await
|
Self::load_or_else(path, || std::future::ready(Ok(T::default()))).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn save<T: Schema>(data: &T, path: &Path) -> Result<(), Error> {
|
||||||
|
let mut temp_file_name = OsString::from(".");
|
||||||
|
temp_file_name.push(path.file_name().unwrap());
|
||||||
|
temp_file_name.push(".tmp");
|
||||||
|
let temp_file_path = path.parent().unwrap().join(temp_file_name);
|
||||||
|
{
|
||||||
|
let mut temp_file = File::create(&temp_file_path).await?;
|
||||||
|
temp_file
|
||||||
|
.write_all(&serde_json::to_vec_pretty(&Repr {
|
||||||
|
version: T::VERSION,
|
||||||
|
data,
|
||||||
|
})?)
|
||||||
|
.await?;
|
||||||
|
temp_file.sync_all().await?;
|
||||||
|
}
|
||||||
|
// Atomically update the actual file
|
||||||
|
rename(&temp_file_path, &path).await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
impl<T: Schema> JsonDb<T> {
|
impl<T: Schema> JsonDb<T> {
|
||||||
/// Load a [`JsonDb`] from a given file, creating it and
|
/// Load a [`JsonDb`] from a given file, creating it and
|
||||||
/// initializing it with the provided default value if it does not
|
/// initializing it with the provided default value if it does not
|
||||||
/// exist.
|
/// exist.
|
||||||
pub async fn load_or(path: PathBuf, default: T) -> Result<Self, Error> {
|
pub async fn load_or(path: PathBuf, default: T) -> Result<JsonDb<T>, Error> {
|
||||||
Self::load_or_else(path, || std::future::ready(Ok(default))).await
|
Self::load_or_else(path, || std::future::ready(Ok(default))).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load a [`JsonDb`] from a given file, creating it and
|
/// Load a [`JsonDb`] from a given file, creating it and
|
||||||
/// initializing it with the provided function if it does not
|
/// initializing it with the provided function if it does not
|
||||||
/// exist.
|
/// exist.
|
||||||
pub async fn load_or_else<F, Fut>(path: PathBuf, default: F) -> Result<Self, Error>
|
pub async fn load_or_else<F, Fut>(path: PathBuf, default: F) -> Result<JsonDb<T>, Error>
|
||||||
where
|
where
|
||||||
F: FnOnce() -> Fut,
|
F: FnOnce() -> Fut,
|
||||||
Fut: Future<Output = std::io::Result<T>>,
|
Fut: Future<Output = std::io::Result<T>>,
|
||||||
|
@ -151,59 +190,110 @@ impl<T: Schema> JsonDb<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let mut db = JsonDb { path, data };
|
let (request_send, mut request_recv) = mpsc::unbounded_channel::<Request<T>>();
|
||||||
// Always save in case we've run migrations
|
tokio::spawn(async move {
|
||||||
db.save().await?;
|
save(&data, &path).await.expect("Failed to save data");
|
||||||
Ok(db)
|
let lock = Arc::new(RwLock::new(data));
|
||||||
|
while let Some(request) = request_recv.recv().await {
|
||||||
|
match request {
|
||||||
|
Request::Read(response) => {
|
||||||
|
response
|
||||||
|
.send(lock.clone().read_owned().await)
|
||||||
|
.expect("Failed to send read guard");
|
||||||
|
}
|
||||||
|
Request::Write(response) => {
|
||||||
|
response
|
||||||
|
.send(lock.clone().write_owned().await)
|
||||||
|
.expect("Failed to send write guard");
|
||||||
|
save(lock.read().await.deref(), &path)
|
||||||
|
.await
|
||||||
|
.expect("Failed to save data");
|
||||||
|
}
|
||||||
|
Request::Flush(response) => {
|
||||||
|
// Once we get around to handling this
|
||||||
|
// request, we've already flushed data from
|
||||||
|
// any previously-issued write requests
|
||||||
|
response
|
||||||
|
.send(())
|
||||||
|
.expect("Failed to send flush confirmation");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(JsonDb {
|
||||||
|
channel: request_send,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn save(&mut self) -> Result<(), Error> {
|
fn request_read(&self) -> oneshot::Receiver<OwnedRwLockReadGuard<T>> {
|
||||||
let mut temp_file_name = OsString::from(".");
|
let (send, recv) = oneshot::channel();
|
||||||
temp_file_name.push(self.path.file_name().unwrap());
|
self.channel
|
||||||
temp_file_name.push(".tmp");
|
.send(Request::Read(send))
|
||||||
let temp_file_path = self.path.parent().unwrap().join(temp_file_name);
|
.expect("Failed to send read lock request");
|
||||||
{
|
recv
|
||||||
let mut temp_file = File::create(&temp_file_path).await?;
|
|
||||||
temp_file
|
|
||||||
.write_all(&serde_json::to_vec_pretty(&Repr {
|
|
||||||
version: T::VERSION,
|
|
||||||
data: &self.data,
|
|
||||||
})?)
|
|
||||||
.await?;
|
|
||||||
temp_file.sync_all().await?;
|
|
||||||
}
|
|
||||||
// Atomically update the actual file
|
|
||||||
rename(&temp_file_path, &self.path).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Borrow an immutable reference to the wrapped data
|
/// Take a read lock on the wrapped data.
|
||||||
pub fn read(&self) -> &T {
|
pub async fn read(&self) -> OwnedRwLockReadGuard<T> {
|
||||||
&self.data
|
self.request_read()
|
||||||
|
.await
|
||||||
|
.expect("Failed to receive read lock")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Modify the wrapped data in-place, atomically writing it back
|
/// Synchronous version of [`read`][Self::read].
|
||||||
/// to disk afterwards.
|
pub fn blocking_read(&self) -> OwnedRwLockReadGuard<T> {
|
||||||
pub async fn write<U, V>(&mut self, updater: U) -> Result<V, Error>
|
self.request_read()
|
||||||
where
|
.blocking_recv()
|
||||||
U: FnOnce(&mut T) -> V,
|
.expect("Failed to receive read lock")
|
||||||
{
|
|
||||||
let result = updater(&mut self.data);
|
|
||||||
self.save().await?;
|
|
||||||
Ok(result)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Modify the wrapped data in-place using asynchronous code,
|
fn request_write(&self) -> oneshot::Receiver<OwnedRwLockWriteGuard<T>> {
|
||||||
/// atomically writing it back to disk afterwards.
|
let (send, recv) = oneshot::channel();
|
||||||
pub async fn write_async<U, V, Fut>(&mut self, updater: U) -> Result<V, Error>
|
self.channel
|
||||||
where
|
.send(Request::Write(send))
|
||||||
U: FnOnce(&mut T) -> Fut,
|
.expect("Failed to send write lock request");
|
||||||
Fut: Future<Output = V>,
|
recv
|
||||||
{
|
}
|
||||||
let result = updater(&mut self.data).await;
|
|
||||||
self.save().await?;
|
/// Take a write lock on the wrapped data. When the write guard is
|
||||||
Ok(result)
|
/// dropped, it triggers an atomic write of the updated data back
|
||||||
|
/// to disk.
|
||||||
|
pub async fn write(&self) -> OwnedRwLockWriteGuard<T> {
|
||||||
|
self.request_write()
|
||||||
|
.await
|
||||||
|
.expect("Failed to receive write lock")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Synchronous version of [`write`][Self::write].
|
||||||
|
pub fn blocking_write(&self) -> OwnedRwLockWriteGuard<T> {
|
||||||
|
self.request_write()
|
||||||
|
.blocking_recv()
|
||||||
|
.expect("Failed to receive write lock")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn request_flush(&self) -> oneshot::Receiver<()> {
|
||||||
|
let (send, recv) = oneshot::channel();
|
||||||
|
self.channel
|
||||||
|
.send(Request::Flush(send))
|
||||||
|
.expect("Failed to send flush request");
|
||||||
|
recv
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for data to finish flushing to disk. Every call to
|
||||||
|
/// [`read`][Self::read] or [`write`][Self::write], or their
|
||||||
|
/// blocking equivalents, also waits for data to be flushed before
|
||||||
|
/// returning a guard.
|
||||||
|
pub async fn flush(&self) {
|
||||||
|
self.request_flush()
|
||||||
|
.await
|
||||||
|
.expect("Failed to receive flush confirmation");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Synchronous version of [`flush`][Self::flush].
|
||||||
|
pub fn blocking_flush(&self) {
|
||||||
|
self.request_flush()
|
||||||
|
.blocking_recv()
|
||||||
|
.expect("Failed to receive flush confirmation");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -352,34 +442,82 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn load_write_migrate() {
|
async fn async_load_write_migrate() {
|
||||||
let dir = tempdir().unwrap();
|
let dir = tempdir().unwrap();
|
||||||
let db_file = dir.path().join("test.json");
|
let db_file = dir.path().join("test.json");
|
||||||
{
|
{
|
||||||
let mut db0: JsonDb<V0> = JsonDb::load(db_file.clone()).await.unwrap();
|
let db0: JsonDb<V0> = JsonDb::load(db_file.clone()).await.unwrap();
|
||||||
|
db0.flush().await;
|
||||||
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
||||||
assert_eq!(value["version"], 0);
|
assert_eq!(value["version"], 0);
|
||||||
assert_eq!(&value["name"], "");
|
assert_eq!(&value["name"], "");
|
||||||
db0.write(|ref mut val| {
|
{
|
||||||
val.name = String::from("mefonex");
|
let mut writer = db0.write().await;
|
||||||
})
|
writer.name = String::from("mefonex");
|
||||||
.await
|
}
|
||||||
.unwrap();
|
{
|
||||||
|
let reader = db0.read().await;
|
||||||
|
assert_eq!(reader.name, "mefonex");
|
||||||
|
}
|
||||||
|
// Reading also awaits a flush
|
||||||
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
||||||
assert_eq!(&value["name"], "mefonex");
|
assert_eq!(&value["name"], "mefonex");
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
let mut db2: JsonDb<V2> = JsonDb::load(db_file.clone()).await.unwrap();
|
let db2: JsonDb<V2> = JsonDb::load(db_file.clone()).await.unwrap();
|
||||||
|
db2.flush().await;
|
||||||
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
||||||
assert_eq!(value["version"], 2);
|
assert_eq!(value["version"], 2);
|
||||||
assert_eq!(&value["name"], "mefonex");
|
assert_eq!(&value["name"], "mefonex");
|
||||||
assert_eq!(value["gender"], Value::Null);
|
assert_eq!(value["gender"], Value::Null);
|
||||||
assert_eq!(&value["last_updated"], "1970-01-01T00:00:00Z");
|
assert_eq!(&value["last_updated"], "1970-01-01T00:00:00Z");
|
||||||
db2.write(|ref mut val| {
|
{
|
||||||
val.last_updated = OffsetDateTime::from_unix_timestamp(1660585638).unwrap();
|
let mut writer = db2.write().await;
|
||||||
})
|
writer.last_updated = OffsetDateTime::from_unix_timestamp(1660585638).unwrap();
|
||||||
.await
|
}
|
||||||
.unwrap();
|
db2.flush().await;
|
||||||
|
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
||||||
|
assert_eq!(&value["last_updated"], "2022-08-15T17:47:18Z");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn blocking_load_write_migrate() {
|
||||||
|
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let db_file = dir.path().join("test.json");
|
||||||
|
{
|
||||||
|
let db0: JsonDb<V0> = rt.block_on(JsonDb::load(db_file.clone())).unwrap();
|
||||||
|
db0.blocking_flush();
|
||||||
|
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
||||||
|
assert_eq!(value["version"], 0);
|
||||||
|
assert_eq!(&value["name"], "");
|
||||||
|
{
|
||||||
|
let mut writer = db0.blocking_write();
|
||||||
|
writer.name = String::from("mefonex");
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let reader = db0.blocking_read();
|
||||||
|
assert_eq!(reader.name, "mefonex");
|
||||||
|
}
|
||||||
|
// Reading also waits for a flush
|
||||||
|
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
||||||
|
assert_eq!(&value["name"], "mefonex");
|
||||||
|
}
|
||||||
|
{
|
||||||
|
let db2: JsonDb<V2> = rt.block_on(JsonDb::load(db_file.clone())).unwrap();
|
||||||
|
db2.blocking_flush();
|
||||||
|
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
||||||
|
assert_eq!(value["version"], 2);
|
||||||
|
assert_eq!(&value["name"], "mefonex");
|
||||||
|
assert_eq!(value["gender"], Value::Null);
|
||||||
|
assert_eq!(&value["last_updated"], "1970-01-01T00:00:00Z");
|
||||||
|
{
|
||||||
|
let mut writer = db2.blocking_write();
|
||||||
|
writer.last_updated = OffsetDateTime::from_unix_timestamp(1660585638).unwrap();
|
||||||
|
}
|
||||||
|
db2.blocking_flush();
|
||||||
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
let value: Value = serde_json::from_reader(File::open(&db_file).unwrap()).unwrap();
|
||||||
assert_eq!(&value["last_updated"], "2022-08-15T17:47:18Z");
|
assert_eq!(&value["last_updated"], "2022-08-15T17:47:18Z");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue