Source code for instrument.utils.stored_dict
"""
Storage-backed Dictionary
=========================
A dictionary that writes its contents to YAML file.
Replaces ``bluesky.utils.PersistentDict``.
* Contents must be JSON serializable.
* Contents stored in a single human-readable YAML file.
* Sync to disk shortly after dictionary is updated.
.. autosummary::
~StoredDict
"""
__all__ = ["StoredDict"]
import collections.abc
import datetime
import json
import logging
import pathlib
import threading
import time
import yaml
logger = logging.getLogger(__name__)
logger.bsdev(__file__)
# (Internal) Dictionary of active ``StoredDict`` ``sync_agent`` threads.
#
# A single entry for each instance of ``StoredDict``.
# - Key is the ``id`` of an instance of ``StoredDict``.
# - Value is the ``threading`` object (if running) or ``None``.
# Keeping the ``threading`` object **outside** the ``StoredDict`` class
# avoids an error from ``copy.deepcopy(md)`` that happens during Bluesky runs::
#
# TypeError: cannot pickle '_thread.lock' object
_sync_threads = {}
[docs]
class StoredDict(collections.abc.MutableMapping):
"""
Dictionary that syncs to storage.
.. autosummary::
~flush
~popitem
~reload
.. rubric:: Static methods
All support for the YAML format is implemented in the static methods.
.. autosummary::
~dump
~load
----
"""
[docs]
def __init__(self, file, delay=5, title=None, serializable=True):
"""
StoredDict : Dictionary that syncs to storage
PARAMETERS
file : str or pathlib.Path
Name of file to store dictionary contents.
delay : number
Time delay (s) since last dictionary update to write to storage.
Default: 5 seconds.
title : str or None
Comment to write at top of file.
Default: "Written by StoredDict."
serializable : bool
If True, validate new dictionary entries are JSON serializable.
"""
self._file = pathlib.Path(file)
self._delay = max(0, delay)
self._title = title or f"Written by {self.__class__.__name__}."
self.test_serializable = serializable
self._sync_key = id(self)
self._sync_deadline = time.time()
self._sync_while_loop_period = 0.005
self._sync_thread_abort = False
# Need to keep the thread objects outside of the class.
_sync_threads[self._sync_key] = None
self._cache = {}
self.reload()
def __delitem__(self, key):
"""Delete dictionary value by key."""
del self._cache[key]
def __getitem__(self, key):
"""Get dictionary value by key."""
return self._cache[key]
def __iter__(self):
"""Iterate over the dictionary keys."""
yield from self._cache
def __len__(self):
"""Number of keys in the dictionary."""
return len(self._cache)
def __repr__(self):
"""representation of this object."""
return f"<{self.__class__.__name__} {dict(self)!r}>"
def __setitem__(self, key, value):
"""Write to the dictionary."""
if self.test_serializable:
json.dumps({key: value})
self._cache[key] = value
# Reset the deadline.
self._sync_deadline = time.time() + self._delay
logger.debug("new sync deadline in %f s.", self._delay)
if _sync_threads[self._sync_key] is None:
self._delayed_sync_to_storage()
[docs]
def _delayed_sync_to_storage(self):
"""
Sync the metadata to storage.
Start a time-delay thread. New writes to the metadata dictionary will
extend the deadline. Sync once the deadline is reached.
"""
def sync_agent():
"""Threaded task."""
logger.debug("Starting sync_agent...")
while time.time() < self._sync_deadline and not self._sync_thread_abort:
time.sleep(self._sync_while_loop_period)
logger.debug("Sync waiting period ended")
StoredDict.dump(self._file, self._cache, title=self._title)
_sync_threads[self._sync_key] = None
self._sync_thread_abort = False
_sync_threads[self._sync_key] = threading.Thread(target=sync_agent)
_sync_threads[self._sync_key].start()
[docs]
def flush(self):
"""Force a write of the dictionary to disk"""
logger.debug("flush()")
if _sync_threads[self._sync_key] is None:
StoredDict.dump(self._file, self._cache, title=self._title)
# TODO: stop the thread
else:
self._sync_thread_abort = True
[docs]
def popitem(self):
"""
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order.
Raises KeyError if the dict is empty.
"""
return self._cache.popitem()
[docs]
def reload(self):
"""Read dictionary from storage."""
logger.debug("reload()")
self._cache = StoredDict.load(self._file)
[docs]
@staticmethod
def dump(file, contents, title=None):
"""Write dictionary to YAML file."""
logger.debug("_dump(): file='%s', contents=%r, title=%r", file, contents, title)
with open(file, "w") as f:
if isinstance(title, str) and len(title) > 0:
f.write(f"# {title}\n")
f.write(f"# Dictionary contents written: {datetime.datetime.now()}\n\n")
f.write(yaml.dump(contents, indent=2))
[docs]
@staticmethod
def load(file):
"""Read dictionary from YAML file."""
from .config_loaders import load_config_yaml
file = pathlib.Path(file)
logger.debug("_load('%s')", file)
md = None
if file.exists():
md = load_config_yaml(file)
return md or {} # In case file is empty.