Source code for instrument.utils.stored_dict
"""
Storage-backed Dictionary
=========================
A dictionary that writes its contents to YAML file.
Replaces ``bluesky.utils.PersistentDict``.
* Contents must be JSON serializable.
* Contents stored in a single human-readable YAML file.
* Sync to disk shortly after dictionary is updated.
.. autosummary::
~StoredDict
"""
__all__ = ["StoredDict"]
import collections.abc
import datetime
import inspect
import json
import logging
import pathlib
import threading
import time
import yaml
logger = logging.getLogger(__name__)
logger.bsdev(__file__)
[docs]
class StoredDict(collections.abc.MutableMapping):
"""
Dictionary that syncs to storage.
.. autosummary::
~flush
~popitem
~reload
.. rubric:: Static methods
All support for the YAML format is implemented in the static methods.
.. autosummary::
~dump
~load
----
"""
[docs]
def __init__(self, file, delay=5, title=None, serializable=True):
"""
StoredDict : Dictionary that syncs to storage
PARAMETERS
file : str or pathlib.Path
Name of file to store dictionary contents.
delay : number
Time delay (s) since last dictionary update to write to storage.
Default: 5 seconds.
title : str or None
Comment to write at top of file.
Default: "Written by StoredDict."
serializable : bool
If True, validate new dictionary entries are JSON serializable.
"""
self._file = pathlib.Path(file)
self._delay = max(0, delay)
self._title = title or f"Written by {self.__class__.__name__}."
self.test_serializable = serializable
self.sync_in_progress = False
self._sync_deadline = time.time()
self._sync_key = f"sync_agent_{id(self):x}"
self._sync_loop_period = 0.005
self._cache = {}
self.reload()
def __delitem__(self, key):
"""Delete dictionary value by key."""
del self._cache[key]
def __getitem__(self, key):
"""Get dictionary value by key."""
return self._cache[key]
def __iter__(self):
"""Iterate over the dictionary keys."""
yield from self._cache
def __len__(self):
"""Number of keys in the dictionary."""
return len(self._cache)
def __repr__(self):
"""representation of this object."""
return f"<{self.__class__.__name__} {dict(self)!r}>"
def __setitem__(self, key, value):
"""Write to the dictionary."""
outermost_frame = inspect.getouterframes(inspect.currentframe())[-1]
if "sphinx-build" in outermost_frame.filename:
# Seems that Sphinx is building the documentation.
# Ignore all the objects it tries to add.
return
if self.test_serializable:
json.dumps({key: value})
self._cache[key] = value # Store the new (or revised) content.
# Reset the deadline.
self._sync_deadline = time.time() + self._delay
logger.debug("new sync deadline in %f s.", self._delay)
if not self.sync_in_progress:
# Start the sync_agent (thread).
self._delayed_sync_to_storage()
[docs]
def _delayed_sync_to_storage(self):
"""
Sync the metadata to storage.
Start a time-delay thread. New writes to the metadata dictionary will
extend the deadline. Sync once the deadline is reached.
"""
def sync_agent():
"""Threaded task."""
logger.debug("Starting sync_agent...")
self.sync_in_progress = True
while time.time() < self._sync_deadline:
time.sleep(self._sync_loop_period)
logger.debug("Sync waiting period ended")
self.sync_in_progress = False
StoredDict.dump(self._file, self._cache, title=self._title)
thred = threading.Thread(target=sync_agent)
thred.start()
[docs]
def flush(self):
"""Force a write of the dictionary to disk"""
logger.debug("flush()")
if not self.sync_in_progress:
StoredDict.dump(self._file, self._cache, title=self._title)
self._sync_deadline = time.time()
self.sync_in_progress = False
[docs]
def popitem(self):
"""
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order.
Raises KeyError if the dict is empty.
"""
return self._cache.popitem()
[docs]
def reload(self):
"""Read dictionary from storage."""
logger.debug("reload()")
self._cache = StoredDict.load(self._file)
[docs]
@staticmethod
def dump(file, contents, title=None):
"""Write dictionary to YAML file."""
logger.debug("_dump(): file='%s', contents=%r, title=%r", file, contents, title)
with open(file, "w") as f:
if isinstance(title, str) and len(title) > 0:
f.write(f"# {title}\n")
f.write(f"# Dictionary contents written: {datetime.datetime.now()}\n\n")
f.write(yaml.dump(contents, indent=2))
[docs]
@staticmethod
def load(file):
"""Read dictionary from YAML file."""
from .config_loaders import load_config_yaml
file = pathlib.Path(file)
logger.debug("_load('%s')", file)
md = None
if file.exists():
md = load_config_yaml(file)
return md or {} # In case file is empty.