309 lines
11 KiB
Rust
309 lines
11 KiB
Rust
//! A quick-and-dirty “““embedded database””” library.
|
|
//!
|
|
//! This is just a library for storing data in JSON files, but with
|
|
//! a few added conveniences:
|
|
//!
|
|
//! * The saved data includes a schema version number, and will be
|
|
//! automatically migrated to newer schema versions.
|
|
//! * The live data is guarded by a built-in read-write lock which can
|
|
//! be used synchronously or from a [tokio] async environment.
|
|
//! * Data is saved to the backing JSON file, in a hopefully-atomic
|
|
//! fashion, every time a write lock is released.
|
|
//!
|
|
//! Data can be represented in pretty much any form you can convince
|
|
//! [serde] to go along with, except for the following restrictions:
|
|
//!
|
|
//! * Your data type must be [`Debug`] + [`Send`] + [`Sync`] + `'static`.
|
|
//! * Your serialization format shouldn't include a top-level
|
|
//! `version` field of its own, as this is reserved for our schema
|
|
//! version tracking.
|
|
//! * You can't use `#[serde(deny_unknown_fields)]`, as this conflicts
|
|
//! with our use of `#[serde(flatten)]`.
|
|
|
|
use std::{
|
|
cmp::Ordering,
|
|
ffi::OsString,
|
|
fmt::Debug,
|
|
io::ErrorKind,
|
|
ops::Deref,
|
|
path::{Path, PathBuf},
|
|
sync::Arc,
|
|
};
|
|
|
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
|
use tokio::{
|
|
fs::{rename, File},
|
|
io::{AsyncReadExt, AsyncWriteExt},
|
|
sync::{mpsc, oneshot, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock},
|
|
};
|
|
|
|
#[cfg(test)]
|
|
mod tests;
|
|
|
|
/// A JSON-backed “““database”””.
|
|
///
|
|
/// This wraps a value that is loaded from a JSON file, automatically
|
|
/// migrated forward from previous schema versions, and automatically
|
|
/// written to disk when it's updated (we attempt to make saves atomic
|
|
/// using the `rename(2)` function).
|
|
pub struct JsonDb<T: Schema> {
|
|
channel: mpsc::UnboundedSender<Request<T>>,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum Request<T> {
|
|
Read(oneshot::Sender<OwnedRwLockReadGuard<T>>),
|
|
Write(oneshot::Sender<OwnedRwLockWriteGuard<T>>),
|
|
Flush(oneshot::Sender<()>),
|
|
}
|
|
|
|
/// Schema for a JSON-backed database.
|
|
///
|
|
/// This needs to be a (de)serializable type, with a previous schema
|
|
/// that can be migrated into the new schema, and a version number
|
|
/// which must be the previous schema's version +1, [unless this is
|
|
/// version 0][`SchemaV0`]. This can then be automatically parsed
|
|
/// from a JSON object containing a `version` field along with the
|
|
/// other fields of the corresponding schema version; earlier versions
|
|
/// will be migrated to the current version automatically.
|
|
pub trait Schema: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
|
|
/// Previous schema that can be migrated into the new schema.
|
|
type Prev: Schema + Into<Self>;
|
|
|
|
/// Schema version number.
|
|
const VERSION: u32 = Self::Prev::VERSION + 1;
|
|
|
|
/// Whether unversioned data should be parsed as V0, rather than
|
|
/// rejected with an error.
|
|
const UNVERSIONED_V0: bool = Self::Prev::UNVERSIONED_V0;
|
|
|
|
fn parse(s: &str) -> Result<Self, Error> {
|
|
let version = match serde_json::from_str::<Version>(s)?.version {
|
|
Some(v) => v,
|
|
None => {
|
|
if Self::UNVERSIONED_V0 {
|
|
0
|
|
} else {
|
|
return Err(Error::MissingVersion);
|
|
}
|
|
}
|
|
};
|
|
match version.cmp(&Self::VERSION) {
|
|
Ordering::Less => Ok(Self::Prev::parse(s)?.into()),
|
|
Ordering::Equal => Ok(serde_json::from_str::<VersionedData<Self>>(s)?.data),
|
|
Ordering::Greater => Err(Error::UnknownVersion(version)),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Marker trait to indicate version 0 of a database schema.
|
|
///
|
|
/// Implementing this will automatically implement [`Schema`], with
|
|
/// version number `0` and `Self` as the previous version.
|
|
pub trait SchemaV0: Send + Sync + Debug + DeserializeOwned + Serialize + 'static {
|
|
/// Set this to `true` if your version 0 data may be stored in a
|
|
/// pre-`JsonDb` format that does not include a version number.
|
|
/// Note that regardless of this setting, when data is written
|
|
/// back to the JSON file, it will always include a version
|
|
/// number.
|
|
const VERSION_OPTIONAL: bool = false;
|
|
}
|
|
|
|
impl<T: SchemaV0> Schema for T {
|
|
type Prev = Self;
|
|
const VERSION: u32 = 0;
|
|
const UNVERSIONED_V0: bool = Self::VERSION_OPTIONAL;
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct Version {
|
|
version: Option<u32>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize)]
|
|
struct VersionedData<T> {
|
|
version: Option<u32>,
|
|
#[serde(flatten)]
|
|
data: T,
|
|
}
|
|
|
|
/// Errors that can occur while working with [`JsonDb`].
|
|
#[derive(thiserror::Error, Debug)]
|
|
pub enum Error {
|
|
#[error("I/O error")]
|
|
Io(#[from] std::io::Error),
|
|
#[error("Failed to parse JSON")]
|
|
Json(#[from] serde_json::Error),
|
|
#[error("Unknown schema version {0}")]
|
|
UnknownVersion(u32),
|
|
#[error("Missing schema version")]
|
|
MissingVersion,
|
|
}
|
|
|
|
impl<T: Schema + Default> JsonDb<T> {
|
|
/// Load a [`JsonDb`] from a given file, creating it and
|
|
/// initializing it with the schema's default value if it does not
|
|
/// exist.
|
|
pub async fn load(path: PathBuf) -> Result<JsonDb<T>, Error> {
|
|
Self::load_or_else(path, T::default).await
|
|
}
|
|
}
|
|
|
|
async fn save<T: Schema>(data: &T, path: &Path) -> Result<(), Error> {
|
|
let mut temp_file_name = OsString::from(".");
|
|
temp_file_name.push(path.file_name().unwrap());
|
|
temp_file_name.push(".tmp");
|
|
let temp_file_path = path.parent().unwrap().join(temp_file_name);
|
|
{
|
|
let mut temp_file = File::create(&temp_file_path).await?;
|
|
temp_file
|
|
.write_all(&serde_json::to_vec_pretty(&VersionedData {
|
|
version: Some(T::VERSION),
|
|
data,
|
|
})?)
|
|
.await?;
|
|
temp_file.sync_all().await?;
|
|
}
|
|
// Atomically update the actual file
|
|
rename(&temp_file_path, &path).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
impl<T: Schema> JsonDb<T> {
|
|
/// Load a [`JsonDb`] from a given file, creating it and
|
|
/// initializing it with the provided default value if it does not
|
|
/// exist.
|
|
pub async fn load_or(path: PathBuf, default: T) -> Result<JsonDb<T>, Error> {
|
|
Self::load_or_else(path, || default).await
|
|
}
|
|
|
|
/// Load a [`JsonDb`] from a given file, creating it and
|
|
/// initializing it with the provided function if it does not
|
|
/// exist.
|
|
pub async fn load_or_else<F>(path: PathBuf, default: F) -> Result<JsonDb<T>, Error>
|
|
where
|
|
F: FnOnce() -> T,
|
|
{
|
|
let open_result = File::open(&path).await;
|
|
let data = match open_result {
|
|
Ok(mut f) => {
|
|
let mut buf = String::new();
|
|
f.read_to_string(&mut buf).await?;
|
|
T::parse(&buf)?
|
|
}
|
|
Err(e) => {
|
|
if let ErrorKind::NotFound = e.kind() {
|
|
default()
|
|
} else {
|
|
return Err(e.into());
|
|
}
|
|
}
|
|
};
|
|
let (request_send, mut request_recv) = mpsc::unbounded_channel::<Request<T>>();
|
|
tokio::spawn(async move {
|
|
save(&data, &path).await.expect("Failed to save data");
|
|
let lock = Arc::new(RwLock::new(data));
|
|
while let Some(request) = request_recv.recv().await {
|
|
match request {
|
|
Request::Read(response) => {
|
|
response
|
|
.send(lock.clone().read_owned().await)
|
|
.expect("Failed to send read guard");
|
|
}
|
|
Request::Write(response) => {
|
|
response
|
|
.send(lock.clone().write_owned().await)
|
|
.expect("Failed to send write guard");
|
|
save(lock.read().await.deref(), &path)
|
|
.await
|
|
.expect("Failed to save data");
|
|
}
|
|
Request::Flush(response) => {
|
|
// Once we get around to handling this
|
|
// request, we've already flushed data from
|
|
// any previously-issued write requests
|
|
response
|
|
.send(())
|
|
.expect("Failed to send flush confirmation");
|
|
}
|
|
}
|
|
}
|
|
});
|
|
Ok(JsonDb {
|
|
channel: request_send,
|
|
})
|
|
}
|
|
|
|
fn request_read(&self) -> oneshot::Receiver<OwnedRwLockReadGuard<T>> {
|
|
let (send, recv) = oneshot::channel();
|
|
self.channel
|
|
.send(Request::Read(send))
|
|
.expect("Failed to send read lock request");
|
|
recv
|
|
}
|
|
|
|
/// Take a read lock on the wrapped data.
|
|
pub async fn read(&self) -> OwnedRwLockReadGuard<T> {
|
|
self.request_read()
|
|
.await
|
|
.expect("Failed to receive read lock")
|
|
}
|
|
|
|
/// Synchronous version of [`read`][Self::read].
|
|
pub fn blocking_read(&self) -> OwnedRwLockReadGuard<T> {
|
|
self.request_read()
|
|
.blocking_recv()
|
|
.expect("Failed to receive read lock")
|
|
}
|
|
|
|
fn request_write(&self) -> oneshot::Receiver<OwnedRwLockWriteGuard<T>> {
|
|
let (send, recv) = oneshot::channel();
|
|
self.channel
|
|
.send(Request::Write(send))
|
|
.expect("Failed to send write lock request");
|
|
recv
|
|
}
|
|
|
|
/// Take a write lock on the wrapped data. When the write guard is
|
|
/// dropped, it triggers an atomic write of the updated data back
|
|
/// to disk.
|
|
pub async fn write(&self) -> OwnedRwLockWriteGuard<T> {
|
|
self.request_write()
|
|
.await
|
|
.expect("Failed to receive write lock")
|
|
}
|
|
|
|
/// Synchronous version of [`write`][Self::write].
|
|
pub fn blocking_write(&self) -> OwnedRwLockWriteGuard<T> {
|
|
self.request_write()
|
|
.blocking_recv()
|
|
.expect("Failed to receive write lock")
|
|
}
|
|
|
|
fn request_flush(&self) -> oneshot::Receiver<()> {
|
|
let (send, recv) = oneshot::channel();
|
|
self.channel
|
|
.send(Request::Flush(send))
|
|
.expect("Failed to send flush request");
|
|
recv
|
|
}
|
|
|
|
/// Wait for data to finish flushing to disk. Every call to
|
|
/// [`read`][Self::read] or [`write`][Self::write], or their
|
|
/// blocking equivalents, also waits for data to be flushed before
|
|
/// returning a guard.
|
|
pub async fn flush(&self) {
|
|
self.request_flush()
|
|
.await
|
|
.expect("Failed to receive flush confirmation");
|
|
}
|
|
|
|
/// Synchronous version of [`flush`][Self::flush].
|
|
pub fn blocking_flush(&self) {
|
|
self.request_flush()
|
|
.blocking_recv()
|
|
.expect("Failed to receive flush confirmation");
|
|
}
|
|
}
|