//! A quick-and-dirty “““embedded database””” library. //! //! This is just a library for storing data in JSON files, but with //! a few added conveniences: //! //! * The saved data includes a schema version number, and will be //! automatically migrated to newer schema versions. //! * The live data is guarded by a built-in read-write lock which can //! be used synchronously or from a [tokio] async environment. //! * Data is saved to the backing JSON file, in a hopefully-atomic //! fashion, every time a write lock is released. //! //! Data can be represented in pretty much any form you can convince //! [serde] to go along with, except for the following restrictions: //! //! * Your data type must be [`Debug`] + [`Send`] + [`Sync`] + `'static`. //! * Your serialization format shouldn't include a top-level //! `version` field of its own, as this is reserved for our schema //! version tracking. //! * You can't use `#[serde(deny_unknown_fields)]`, as this conflicts //! with our use of `#[serde(flatten)]`. use std::{ cmp::Ordering, ffi::OsString, fmt::Debug, io::ErrorKind, ops::Deref, path::{Path, PathBuf}, sync::Arc, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio::{ fs::{rename, File}, io::{AsyncReadExt, AsyncWriteExt}, sync::{mpsc, oneshot, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}, }; #[cfg(test)] mod tests; /// A JSON-backed “““database”””. /// /// This wraps a value that is loaded from a JSON file, automatically /// migrated forward from previous schema versions, and automatically /// written to disk when it's updated (we attempt to make saves atomic /// using the `rename(2)` function). pub struct JsonDb { channel: mpsc::UnboundedSender>, } #[derive(Debug)] enum Request { Read(oneshot::Sender>), Write(oneshot::Sender>), Flush(oneshot::Sender<()>), } /// Schema for a JSON-backed database. /// /// This needs to be a (de)serializable type, with a previous schema /// that can be migrated into the new schema, and a version number /// which must be the previous schema's version +1, [unless this is /// version 0][`SchemaV0`]. This can then be automatically parsed /// from a JSON object containing a `version` field along with the /// other fields of the corresponding schema version; earlier versions /// will be migrated to the current version automatically. pub trait Schema: Send + Sync + Debug + DeserializeOwned + Serialize + 'static { /// Previous schema that can be migrated into the new schema. type Prev: Schema + Into; /// Schema version number. const VERSION: u32 = Self::Prev::VERSION + 1; /// Whether unversioned data should be parsed as V0, rather than /// rejected with an error. const UNVERSIONED_V0: bool = Self::Prev::UNVERSIONED_V0; fn parse(s: &str) -> Result { let version = match serde_json::from_str::(s)?.version { Some(v) => v, None => { if Self::UNVERSIONED_V0 { 0 } else { return Err(Error::MissingVersion); } } }; match version.cmp(&Self::VERSION) { Ordering::Less => Ok(Self::Prev::parse(s)?.into()), Ordering::Equal => Ok(serde_json::from_str::>(s)?.data), Ordering::Greater => Err(Error::UnknownVersion(version)), } } } /// Marker trait to indicate version 0 of a database schema. /// /// Implementing this will automatically implement [`Schema`], with /// version number `0` and `Self` as the previous version. pub trait SchemaV0: Send + Sync + Debug + DeserializeOwned + Serialize + 'static { /// Set this to `true` if your version 0 data may be stored in a /// pre-`JsonDb` format that does not include a version number. /// Note that regardless of this setting, when data is written /// back to the JSON file, it will always include a version /// number. const VERSION_OPTIONAL: bool = false; } impl Schema for T { type Prev = Self; const VERSION: u32 = 0; const UNVERSIONED_V0: bool = Self::VERSION_OPTIONAL; } #[derive(Deserialize)] struct Version { version: Option, } #[derive(Deserialize, Serialize)] struct VersionedData { version: Option, #[serde(flatten)] data: T, } /// Errors that can occur while working with [`JsonDb`]. #[derive(thiserror::Error, Debug)] pub enum Error { #[error("I/O error")] Io(#[from] std::io::Error), #[error("Failed to parse JSON")] Json(#[from] serde_json::Error), #[error("Unknown schema version {0}")] UnknownVersion(u32), #[error("Missing schema version")] MissingVersion, } impl JsonDb { /// Load a [`JsonDb`] from a given file, creating it and /// initializing it with the schema's default value if it does not /// exist. pub async fn load(path: PathBuf) -> Result, Error> { Self::load_or_else(path, T::default).await } } async fn save(data: &T, path: &Path) -> Result<(), Error> { let mut temp_file_name = OsString::from("."); temp_file_name.push(path.file_name().unwrap()); temp_file_name.push(".tmp"); let temp_file_path = path.parent().unwrap().join(temp_file_name); { let mut temp_file = File::create(&temp_file_path).await?; temp_file .write_all(&serde_json::to_vec_pretty(&VersionedData { version: Some(T::VERSION), data, })?) .await?; temp_file.sync_all().await?; } // Atomically update the actual file rename(&temp_file_path, &path).await?; Ok(()) } impl JsonDb { /// Load a [`JsonDb`] from a given file, creating it and /// initializing it with the provided default value if it does not /// exist. pub async fn load_or(path: PathBuf, default: T) -> Result, Error> { Self::load_or_else(path, || default).await } /// Load a [`JsonDb`] from a given file, creating it and /// initializing it with the provided function if it does not /// exist. pub async fn load_or_else(path: PathBuf, default: F) -> Result, Error> where F: FnOnce() -> T, { let open_result = File::open(&path).await; let data = match open_result { Ok(mut f) => { let mut buf = String::new(); f.read_to_string(&mut buf).await?; T::parse(&buf)? } Err(e) => { if let ErrorKind::NotFound = e.kind() { default() } else { return Err(e.into()); } } }; let (request_send, mut request_recv) = mpsc::unbounded_channel::>(); tokio::spawn(async move { save(&data, &path).await.expect("Failed to save data"); let lock = Arc::new(RwLock::new(data)); while let Some(request) = request_recv.recv().await { match request { Request::Read(response) => { response .send(lock.clone().read_owned().await) .expect("Failed to send read guard"); } Request::Write(response) => { response .send(lock.clone().write_owned().await) .expect("Failed to send write guard"); save(lock.read().await.deref(), &path) .await .expect("Failed to save data"); } Request::Flush(response) => { // Once we get around to handling this // request, we've already flushed data from // any previously-issued write requests response .send(()) .expect("Failed to send flush confirmation"); } } } }); Ok(JsonDb { channel: request_send, }) } fn request_read(&self) -> oneshot::Receiver> { let (send, recv) = oneshot::channel(); self.channel .send(Request::Read(send)) .expect("Failed to send read lock request"); recv } /// Take a read lock on the wrapped data. pub async fn read(&self) -> OwnedRwLockReadGuard { self.request_read() .await .expect("Failed to receive read lock") } /// Synchronous version of [`read`][Self::read]. pub fn blocking_read(&self) -> OwnedRwLockReadGuard { self.request_read() .blocking_recv() .expect("Failed to receive read lock") } fn request_write(&self) -> oneshot::Receiver> { let (send, recv) = oneshot::channel(); self.channel .send(Request::Write(send)) .expect("Failed to send write lock request"); recv } /// Take a write lock on the wrapped data. When the write guard is /// dropped, it triggers an atomic write of the updated data back /// to disk. pub async fn write(&self) -> OwnedRwLockWriteGuard { self.request_write() .await .expect("Failed to receive write lock") } /// Synchronous version of [`write`][Self::write]. pub fn blocking_write(&self) -> OwnedRwLockWriteGuard { self.request_write() .blocking_recv() .expect("Failed to receive write lock") } fn request_flush(&self) -> oneshot::Receiver<()> { let (send, recv) = oneshot::channel(); self.channel .send(Request::Flush(send)) .expect("Failed to send flush request"); recv } /// Wait for data to finish flushing to disk. Every call to /// [`read`][Self::read] or [`write`][Self::write], or their /// blocking equivalents, also waits for data to be flushed before /// returning a guard. pub async fn flush(&self) { self.request_flush() .await .expect("Failed to receive flush confirmation"); } /// Synchronous version of [`flush`][Self::flush]. pub fn blocking_flush(&self) { self.request_flush() .blocking_recv() .expect("Failed to receive flush confirmation"); } }