Source code for apsbits.utils.stored_dict

"""Storage-backed Dictionary
=========================

A dictionary that writes its contents to a YAML file. This mutable mapping automatically
synchronizes its in-memory state to a human-readable YAML file on disk shortly after
updates. It replaces ``bluesky.utils.PersistentDict`` and
ensures that all contents are JSON serializable.

.. autosummary::
    ~StoredDict
"""

__all__ = ["StoredDict"]

import collections.abc
import datetime
import inspect
import json
import logging
import pathlib
import threading
import time
from typing import Any
from typing import Dict
from typing import Optional
from typing import Union

import yaml

logger = logging.getLogger(__name__)
logger.bsdev(__file__)


[docs] class StoredDict(collections.abc.MutableMapping): """ Dictionary that synchronizes its contents to a YAML storage file. This mutable mapping stores key-value pairs in an internal cache and periodically writes its contents to a YAML file on disk. Changes are flushed after a configurable delay. The YAML serialization and deserialization are handled by the static methods `dump` and `load`. """
[docs] def __init__( self, file: Union[str, pathlib.Path], delay: float = 5, title: Optional[str] = None, serializable: bool = True, ) -> None: """ Initialize the StoredDict instance. Args: file (str or pathlib.Path): Path to the YAML file for storing dictionary contents. delay (float): Time delay in seconds after the last update before synchronizing to storage. Defaults to 5 seconds. title (str, optional): Comment to include at the top of the YAML file. If not provided, defaults to "Written by StoredDict.". serializable (bool): If True, ensure that new dictionary entries are JSON serializable. Returns: None """ self._file: pathlib.Path = pathlib.Path(file) self._delay: float = max(0, delay) self._title: str = title or f"Written by {self.__class__.__name__}." self.test_serializable: bool = serializable self.sync_in_progress: bool = False self._sync_deadline: float = time.time() self._sync_key: str = f"sync_agent_{id(self):x}" self._sync_loop_period: float = 0.005 self._cache: Dict[Any, Any] = {} self.reload()
def __delitem__(self, key: Any) -> None: """ Delete an item from the dictionary by its key. Args: key (Any): The key of the item to delete. Raises: KeyError: If the key does not exist in the dictionary. """ del self._cache[key] def __getitem__(self, key: Any) -> Any: """ Retrieve the value associated with the given key. Args: key (Any): The key to retrieve. Returns: Any: The value corresponding to the specified key. """ return self._cache[key] def __iter__(self): """Iterate over the dictionary keys.""" yield from self._cache def __len__(self): """Number of keys in the dictionary.""" return len(self._cache) def __repr__(self): """representation of this object.""" return f"<{self.__class__.__name__} {dict(self)!r}>" def __setitem__(self, key, value): """Write to the dictionary.""" outermost_frame = inspect.getouterframes(inspect.currentframe())[-1] if "sphinx-build" in outermost_frame.filename: # Seems that Sphinx is building the documentation. # Ignore all the objects it tries to add. return if self.test_serializable: json.dumps({key: value}) self._cache[key] = value # Store the new (or revised) content. # Reset the deadline. self._sync_deadline = time.time() + self._delay logger.debug("new sync deadline in %f s.", self._delay) if not self.sync_in_progress: # Start the sync_agent (thread). self._delayed_sync_to_storage()
[docs] def _delayed_sync_to_storage(self): """ Sync the metadata to storage. Start a time-delay thread. New writes to the metadata dictionary will extend the deadline. Sync once the deadline is reached. """ def sync_agent(): """Threaded task.""" logger.debug("Starting sync_agent...") self.sync_in_progress = True while time.time() < self._sync_deadline: time.sleep(self._sync_loop_period) logger.debug("Sync waiting period ended") self.sync_in_progress = False StoredDict.dump(self._file, self._cache, title=self._title) thred = threading.Thread(target=sync_agent) thred.start()
[docs] def flush(self): """Force a write of the dictionary to disk""" logger.debug("flush()") if not self.sync_in_progress: StoredDict.dump(self._file, self._cache, title=self._title) self._sync_deadline = time.time() self.sync_in_progress = False
[docs] def popitem(self): """ Remove and return a (key, value) pair as a 2-tuple. Raises: KeyError: If the dictionary is empty. """ return self._cache.popitem()
[docs] def reload(self): """Read dictionary from storage.""" logger.debug("reload()") self._cache = StoredDict.load(self._file)
[docs] @staticmethod def dump(file, contents, title=None): """Write dictionary to YAML file.""" logger.debug("_dump(): file='%s', contents=%r, title=%r", file, contents, title) with open(file, "w") as f: if isinstance(title, str) and len(title) > 0: f.write(f"# {title}\n") f.write(f"# Dictionary contents written: {datetime.datetime.now()}\n\n") f.write(yaml.dump(contents, indent=2))
[docs] @staticmethod def load(file): """Read dictionary from YAML file.""" from apsbits.utils.config_loaders import load_config_yaml file = pathlib.Path(file) logger.debug("_load('%s')", file) md: Optional[Dict[Any, Any]] = None if file.exists(): md = load_config_yaml(file) return md or {}