Source code for dtoolcore.storagebroker

"""Disk storage broker."""

import os
import json
import shutil
import logging
import datetime
import socket

from dtoolcore import __version__
from dtoolcore.utils import (
    mkdir_parents,
    generate_identifier,
    generous_parse_uri,
    timestamp,
    IS_WINDOWS,
    windows_to_unix_path,
    unix_to_windows_path,
    handle_to_osrelpath,
)
from dtoolcore.filehasher import FileHasher, md5sum_hexdigest

logger = logging.getLogger(__name__)


_STRUCTURE_PARAMETERS = {
    "data_directory": ["data"],
    "dataset_readme_relpath": ["README.yml"],
    "dtool_directory": [".dtool"],
    "admin_metadata_relpath": [".dtool", "dtool"],
    "structure_metadata_relpath": [".dtool", "structure.json"],
    "dtool_readme_relpath": [".dtool", "README.txt"],
    "manifest_relpath": [".dtool", "manifest.json"],
    "overlays_directory": [".dtool", "overlays"],
    "annotations_directory": [".dtool", "annotations"],
    "tags_directory": [".dtool", "tags"],
    "metadata_fragments_directory": [".dtool", "tmp_fragments"],
    "storage_broker_version": __version__,
}

_DTOOL_README_TXT = """README
======

This is a Dtool dataset stored on traditional file system storage.

Content provided during the dataset creation process
----------------------------------------------------

Dataset descriptive metadata: README.yml
Dataset items: data/

Automatically generated files and directories
---------------------------------------------

This file: .dtool/README.txt
Administrative metadata describing the dataset: .dtool/dtool
Structural metadata describing the dataset: .dtool/structure.json
Structural metadata describing the data items: .dtool/manifest.json
Per item descriptive metadata: .dtool/overlays/
Dataset key/value pairs metadata: .dtool/annotations/
Dataset tags metadata: .dtool/tags/
"""


[docs]class StorageBrokerOSError(OSError): pass
[docs]class DiskStorageBrokerValidationWarning(Warning): pass
[docs]class BaseStorageBroker(object): """Base storage broker class defining the required interface.""" # Class methods to override.
[docs] @classmethod def list_dataset_uris(cls, base_uri, config_path): """Return list containing URIs in location given by base_uri.""" raise(NotImplementedError())
[docs] @classmethod def generate_uri(cls, name, uuid, base_uri): """Return dataset URI.""" raise(NotImplementedError())
# Methods to override.
[docs] def get_text(self, key): """Return the text associated with the key.""" raise(NotImplementedError())
[docs] def put_text(self, key, text): """Put the text into the storage associated with the key.""" raise(NotImplementedError())
[docs] def delete_key(self, key): """Delete the file/object associated with the key.""" raise(NotImplementedError())
[docs] def get_admin_metadata_key(self): """Return the admin metadata key.""" raise(NotImplementedError())
[docs] def get_readme_key(self): """Return the admin metadata key.""" raise(NotImplementedError())
[docs] def get_manifest_key(self): """Return the manifest key.""" raise(NotImplementedError())
[docs] def get_overlay_key(self, overlay_name): """Return the overlay key.""" raise(NotImplementedError())
[docs] def get_annotation_key(self, annotation_name): """Return the annotation key.""" raise(NotImplementedError())
[docs] def get_tag_key(self, tag): """Return the tag key.""" raise(NotImplementedError())
[docs] def list_overlay_names(self): """Return list of overlay names.""" raise(NotImplementedError())
[docs] def list_annotation_names(self): """Return list of annotation names.""" raise(NotImplementedError())
[docs] def list_tags(self): """Return list of tags.""" raise(NotImplementedError())
[docs] def get_item_abspath(self, identifier): """Return absolute path at which item content can be accessed. :param identifier: item identifier :returns: absolute path from which the item content can be accessed """ raise(NotImplementedError())
def _create_structure(self): """Create necessary structure to hold a dataset.""" raise(NotImplementedError())
[docs] def put_item(self, fpath, relpath): """Put item with content from fpath at relpath in dataset. Missing directories in relpath are created on the fly. :param fpath: path to the item on disk :param relpath: relative path name given to the item in the dataset as a handle :returns: the handle given to the item """ raise(NotImplementedError())
[docs] def iter_item_handles(self): """Return iterator over item handles.""" raise(NotImplementedError())
[docs] def get_size_in_bytes(self, handle): """Return the size in bytes.""" raise(NotImplementedError())
[docs] def get_utc_timestamp(self, handle): """Return the UTC timestamp.""" raise(NotImplementedError())
[docs] def get_hash(self, handle): """Return the hash.""" raise(NotImplementedError())
[docs] def has_admin_metadata(self): """Return True if the administrative metadata exists. This is the definition of being a "dataset". """ raise(NotImplementedError())
[docs] def add_item_metadata(self, handle, key, value): """Store the given key:value pair for the item associated with handle. :param handle: handle for accessing an item before the dataset is frozen :param key: metadata key :param value: metadata value """ raise(NotImplementedError())
[docs] def get_item_metadata(self, handle): """Return dictionary containing all metadata associated with handle. In other words all the metadata added using the ``add_item_metadata`` method. :param handle: handle for accessing an item before the dataset is frozen :returns: dictionary containing item metadata """ raise(NotImplementedError())
[docs] def pre_freeze_hook(self): """Pre :meth:`dtoolcore.ProtoDataSet.freeze` actions. This method is called at the beginning of the :meth:`dtoolcore.ProtoDataSet.freeze` method. It may be useful for remote storage backends to generate caches to remove repetitive time consuming calls """ raise(NotImplementedError())
[docs] def post_freeze_hook(self): """Post :meth:`dtoolcore.ProtoDataSet.freeze` cleanup actions. This method is called at the end of the :meth:`dtoolcore.ProtoDataSet.freeze` method. In the :class:`dtoolcore.storage_broker.DiskStorageBroker` it removes the temporary directory for storing item metadata fragment files. """ raise(NotImplementedError())
def _list_historical_readme_keys(self): """Return list of historical README.yml keys.""" raise(NotImplementedError()) # Reusable methods.
[docs] def generate_base_uri(self, uri): """Return dataset base URI given a uri.""" base_uri = uri.rsplit("/", 1)[0] return base_uri
[docs] def get_admin_metadata(self): """Return the admin metadata as a dictionary.""" logger.debug("Getting admin metdata {}".format(self)) text = self.get_text(self.get_admin_metadata_key()) return json.loads(text)
[docs] def get_readme_content(self): """Return the README descriptive metadata as a string.""" logger.debug("Getting readme content {}".format(self)) return self.get_text(self.get_readme_key())
[docs] def get_manifest(self): """Return the manifest as a dictionary.""" logger.debug("Getting manifest {}".format(self)) text = self.get_text(self.get_manifest_key()) return json.loads(text)
[docs] def get_overlay(self, overlay_name): """Return overlay as a dictionary.""" logger.debug("Getting overlay: {} {}".format(overlay_name, self)) overlay_key = self.get_overlay_key(overlay_name) text = self.get_text(overlay_key) return json.loads(text)
[docs] def get_annotation(self, annotation_name): """Return value of the annotation associated with the key. :returns: annotation (string, int, float, bool) :raises: DtoolCoreAnnotationKeyError if the annotation does not exist """ logger.debug("Getting annotation: {} {}".format(annotation_name, self)) annotation_key = self.get_annotation_key(annotation_name) text = self.get_text(annotation_key) return json.loads(text)
[docs] def put_admin_metadata(self, admin_metadata): """Store the admin metadata.""" logger.debug("Putting admin metdata {}".format(self)) text = json.dumps(admin_metadata) key = self.get_admin_metadata_key() self.put_text(key, text)
[docs] def put_manifest(self, manifest): """Store the manifest.""" logger.debug("Putting manifest {}".format(self)) text = json.dumps(manifest, indent=2, sort_keys=True) key = self.get_manifest_key() self.put_text(key, text)
[docs] def put_readme(self, content): """Store the readme descriptive metadata.""" logger.debug("Putting readme {}".format(self)) key = self.get_readme_key() self.put_text(key, content)
[docs] def update_readme(self, content): """Update the readme descriptive metadata.""" logger.debug("Updating readme {}".format(self)) key = self.get_readme_key() # Back up old README content. backup_content = self.get_readme_content() backup_key = key + "-{}".format( timestamp(datetime.datetime.now()) ) logger.debug("README.yml backup key: {} {}".format(backup_key, self)) self.put_text(backup_key, backup_content) self.put_text(key, content)
[docs] def put_overlay(self, overlay_name, overlay): """Store the overlay.""" logger.debug("Putting overlay: {} {}".format(overlay_name, self)) key = self.get_overlay_key(overlay_name) text = json.dumps(overlay, indent=2) self.put_text(key, text)
[docs] def put_annotation(self, annotation_name, annotation): """Set/update value of the annotation associated with the key. :raises: DtoolCoreAnnotationTypeError if the type of the value is not str, int, float or bool. """ logger.debug("Putting annotation: {} {}".format(annotation_name, self)) key = self.get_annotation_key(annotation_name) text = json.dumps(annotation, indent=2) self.put_text(key, text)
[docs] def put_tag(self, tag): """Annotate the dataset with a tag. """ logger.debug("Putting tag: {} {}".format(tag, self)) key = self.get_tag_key(tag) self.put_text(key, "")
[docs] def delete_tag(self, tag): """Delete a tag from a dataset. :param tag: tag """ logger.debug("Deleting tag: {} {}".format(tag, self)) key = self.get_tag_key(tag) self.delete_key(key)
[docs] def get_relpath(self, handle): """Return the relative path.""" return handle
[docs] def item_properties(self, handle): """Return properties of the item with the given handle.""" logger.debug("Getting properties for handle: {} {}".format(handle, self)) # NOQA properties = { 'size_in_bytes': self.get_size_in_bytes(handle), 'utc_timestamp': self.get_utc_timestamp(handle), 'hash': self.get_hash(handle), 'relpath': self.get_relpath(handle) } logger.debug("{} properties: {} {}".format(handle, properties, self)) return properties
def _document_structure(self): """Document the structure of the dataset.""" logger.debug("Documenting dataset structure {}".format(self)) key = self.get_structure_key() text = json.dumps(self._structure_parameters, indent=2, sort_keys=True) self.put_text(key, text) key = self.get_dtool_readme_key() self.put_text(key, self._dtool_readme_txt)
[docs] def create_structure(self): """Create necessary structure to hold a dataset.""" logger.debug("Creating dataset structure {}".format(self)) self._create_structure() self._document_structure()
def _get_abspath_from_uri(uri): """Return abspath. """ logger.debug("In _get_abspath_from_uri") logger.debug("_get_abspath_from_uri.input_uri: {}".format(uri)) parse_result = generous_parse_uri(uri) path = parse_result.path if IS_WINDOWS: path = unix_to_windows_path(path) abspath = os.path.abspath(path) logger.debug("_get_abspath_from_uri.return: {}".format(abspath)) return abspath
[docs]class DiskStorageBroker(BaseStorageBroker): """ Storage broker to interact with datasets on local disk storage. The :class:`dtoolcore.ProtoDataSet` class uses the :class:`dtoolcore.storage_broker.DiskStorageBroker` to construct datasets by writing to disk and the :class:`dtoolcore.DataSet` class uses it to read datasets from disk. """ #: Attribute used to define the type of storage broker. key = "file" #: Attribute used by :class:`dtoolcore.ProtoDataSet` to write the hash #: function name to the manifest. hasher = FileHasher(md5sum_hexdigest) # Attribute used to define the structure of the dataset. _structure_parameters = _STRUCTURE_PARAMETERS # Attribute used to document the structure of the dataset. _dtool_readme_txt = _DTOOL_README_TXT def __init__(self, uri, config_path=None): logger.debug("Initialising {}...".format(self)) # Get the abspath to the dataset. self._abspath = _get_abspath_from_uri(uri) # Define some other more abspaths. self._data_abspath = self._generate_abspath("data_directory") self._overlays_abspath = self._generate_abspath("overlays_directory") self._annotations_abspath = self._generate_abspath( "annotations_directory" ) self._tags_abspath = self._generate_abspath( "tags_directory" ) self._metadata_fragments_abspath = self._generate_abspath( "metadata_fragments_directory" ) # Define some essential directories to be created. self._essential_subdirectories = [ self._generate_abspath("dtool_directory"), self._data_abspath, self._overlays_abspath, self._annotations_abspath, self._tags_abspath, ] # Generic helper functions. def _generate_abspath(self, key): return os.path.join(self._abspath, *self._structure_parameters[key]) def _fpath_from_handle(self, handle): return os.path.join(self._data_abspath, handle) def _handle_to_fragment_absprefixpath(self, handle): stem = generate_identifier(handle) return os.path.join(self._metadata_fragments_abspath, stem) # Class methods to override.
[docs] @classmethod def list_dataset_uris(cls, base_uri, config_path): """Return list containing URIs in location given by base_uri.""" parsed_uri = generous_parse_uri(base_uri) uri_list = [] path = parsed_uri.path if IS_WINDOWS: path = unix_to_windows_path(parsed_uri.path) for d in os.listdir(path): dir_path = os.path.join(path, d) if not os.path.isdir(dir_path): continue storage_broker = cls(dir_path, config_path) if not storage_broker.has_admin_metadata(): continue uri = storage_broker.generate_uri( name=d, uuid=None, base_uri=base_uri ) uri_list.append(uri) return uri_list
[docs] @classmethod def generate_uri(cls, name, uuid, base_uri): logger.debug("In DiskStorageBroker.generate_uri...") parsed_uri = generous_parse_uri(base_uri) base_dir_path = parsed_uri.path if IS_WINDOWS: base_dir_path = unix_to_windows_path(base_dir_path) dataset_path = os.path.join(base_dir_path, name) dataset_abspath = os.path.abspath(dataset_path) if IS_WINDOWS: dataset_abspath = windows_to_unix_path(dataset_abspath) return "{}:///{}".format(cls.key, dataset_abspath) else: return "{}://{}{}".format( cls.key, socket.gethostname(), dataset_abspath )
# Methods to override.
[docs] def get_text(self, key): """Return the text associated with the key.""" with open(key) as fh: return fh.read()
[docs] def put_text(self, key, text): """Put the text into the storage associated with the key.""" parent_directory = os.path.dirname(key) mkdir_parents(parent_directory) with open(key, "w") as fh: fh.write(text)
[docs] def delete_key(self, key): """Delete the file/object associated with the key.""" try: os.unlink(key) except OSError: pass
[docs] def get_admin_metadata_key(self): "Return the path to the admin metadata file.""" return self._generate_abspath("admin_metadata_relpath")
[docs] def get_readme_key(self): "Return the path to the readme file.""" return self._generate_abspath("dataset_readme_relpath")
[docs] def get_manifest_key(self): "Return the path to the readme file.""" return self._generate_abspath("manifest_relpath")
[docs] def get_structure_key(self): "Return the path to the structure parameter file.""" return self._generate_abspath("structure_metadata_relpath")
[docs] def get_dtool_readme_key(self): "Return the path to the dtool readme file.""" return self._generate_abspath("dtool_readme_relpath")
[docs] def get_overlay_key(self, overlay_name): "Return the path to the overlay file.""" return os.path.join(self._overlays_abspath, overlay_name + '.json')
[docs] def get_annotation_key(self, annotation_name): "Return the path to the annotation file.""" return os.path.join( self._annotations_abspath, annotation_name + '.json' )
[docs] def get_tag_key(self, tag): "Return the path to the tag file.""" return os.path.join( self._tags_abspath, tag )
[docs] def get_size_in_bytes(self, handle): """Return the size in bytes.""" fpath = self._fpath_from_handle(handle) return os.stat(fpath).st_size
[docs] def get_utc_timestamp(self, handle): """Return the UTC timestamp.""" fpath = self._fpath_from_handle(handle) datetime_obj = datetime.datetime.utcfromtimestamp( os.stat(fpath).st_mtime ) return timestamp(datetime_obj)
[docs] def get_hash(self, handle): """Return the hash.""" fpath = self._fpath_from_handle(handle) return DiskStorageBroker.hasher(fpath)
[docs] def has_admin_metadata(self): """Return True if the administrative metadata exists. This is the definition of being a "dataset". """ return os.path.isfile(self.get_admin_metadata_key())
[docs] def list_overlay_names(self): """Return list of overlay names.""" overlay_names = [] if not os.path.isdir(self._overlays_abspath): return overlay_names for fname in os.listdir(self._overlays_abspath): name, ext = os.path.splitext(fname) overlay_names.append(name) return overlay_names
[docs] def list_annotation_names(self): """Return list of annotation names.""" annotation_names = [] if not os.path.isdir(self._annotations_abspath): return annotation_names for fname in os.listdir(self._annotations_abspath): name, ext = os.path.splitext(fname) annotation_names.append(name) return annotation_names
[docs] def list_tags(self): """Return list of tags.""" tags = [] if not os.path.isdir(self._tags_abspath): return tags for fname in os.listdir(self._tags_abspath): tags.append(fname) return tags
[docs] def get_item_abspath(self, identifier): """Return absolute path at which item content can be accessed. :param identifier: item identifier :returns: absolute path from which the item content can be accessed """ manifest = self.get_manifest() item = manifest["items"][identifier] relpath = handle_to_osrelpath(item["relpath"], IS_WINDOWS) item_abspath = os.path.join(self._data_abspath, relpath) return item_abspath
def _create_structure(self): """Create necessary structure to hold a dataset.""" # Ensure that the specified path does not exist and create it. if os.path.exists(self._abspath): raise(StorageBrokerOSError( "Path already exists: {}".format(self._abspath) )) # Make sure the parent directory exists. parent, _ = os.path.split(self._abspath) if not os.path.isdir(parent): raise(StorageBrokerOSError( "No such directory: {}".format(parent))) os.mkdir(self._abspath) # Create more essential subdirectories. for abspath in self._essential_subdirectories: if not os.path.isdir(abspath): os.mkdir(abspath)
[docs] def put_item(self, fpath, relpath): """Put item with content from fpath at relpath in dataset. Missing directories in relpath are created on the fly. :param fpath: path to the item on disk :param relpath: relative path name given to the item in the dataset as a handle, i.e. a Unix-like relpath :returns: the handle given to the item """ # Define the destination path and make any missing parent directories. osrelpath = handle_to_osrelpath(relpath, IS_WINDOWS) dest_path = os.path.join(self._data_abspath, osrelpath) dirname = os.path.dirname(dest_path) mkdir_parents(dirname) # Copy the file across. shutil.copyfile(fpath, dest_path) return relpath
[docs] def iter_item_handles(self): """Return iterator over item handles.""" path = self._data_abspath path_length = len(path) + 1 for dirpath, dirnames, filenames in os.walk(path): for fn in filenames: path = os.path.join(dirpath, fn) relative_path = path[path_length:] if IS_WINDOWS: relative_path = windows_to_unix_path(relative_path) yield relative_path
[docs] def add_item_metadata(self, handle, key, value): """Store the given key:value pair for the item associated with handle. :param handle: handle for accessing an item before the dataset is frozen :param key: metadata key :param value: metadata value """ if not os.path.isdir(self._metadata_fragments_abspath): os.mkdir(self._metadata_fragments_abspath) prefix = self._handle_to_fragment_absprefixpath(handle) fpath = prefix + '.{}.json'.format(key) with open(fpath, 'w') as fh: json.dump(value, fh)
[docs] def get_item_metadata(self, handle): """Return dictionary containing all metadata associated with handle. In other words all the metadata added using the ``add_item_metadata`` method. :param handle: handle for accessing an item before the dataset is frozen :returns: dictionary containing item metadata """ if not os.path.isdir(self._metadata_fragments_abspath): return {} prefix = self._handle_to_fragment_absprefixpath(handle) def list_abspaths(dirname): for f in os.listdir(dirname): yield os.path.join(dirname, f) files = [f for f in list_abspaths(self._metadata_fragments_abspath) if f.startswith(prefix)] metadata = {} for f in files: key = f.split('.')[-2] # filename: identifier.key.json with open(f) as fh: value = json.load(fh) metadata[key] = value return metadata
[docs] def pre_freeze_hook(self): """Pre :meth:`dtoolcore.ProtoDataSet.freeze` actions. This method is called at the beginning of the :meth:`dtoolcore.ProtoDataSet.freeze` method. It may be useful for remote storage backends to generate caches to remove repetitive time consuming calls """ allowed = set([v[0] for v in _STRUCTURE_PARAMETERS.values()]) for d in os.listdir(self._abspath): if d not in allowed: msg = "Rogue content in base of dataset: {}".format(d) raise(DiskStorageBrokerValidationWarning(msg))
[docs] def post_freeze_hook(self): """Post :meth:`dtoolcore.ProtoDataSet.freeze` cleanup actions. This method is called at the end of the :meth:`dtoolcore.ProtoDataSet.freeze` method. In the :class:`dtoolcore.storage_broker.DiskStorageBroker` it removes the temporary directory for storing item metadata fragment files. """ if os.path.isdir(self._metadata_fragments_abspath): shutil.rmtree(self._metadata_fragments_abspath)
def _list_historical_readme_keys(self): historical_readme_keys = [] for name in os.listdir(self._abspath): if name.startswith("README.yml-"): key = os.path.join(self._abspath, name) historical_readme_keys.append(key) return historical_readme_keys