Source code for face_rhythm.h5_handling

"""HDF5 utilities: hierarchical traversal, group I/O, and bulk-close helpers.

Convenience wrappers around :mod:`h5py` for the face-rhythm project. Nothing
here is CUDA- or video-specific; the module is safe to import anywhere.
"""

import gc

import h5py
import numpy as np

[docs] def close_all_h5(): """ Closes every open :class:`h5py.File` object found in the Python workspace. Iterates over all live objects via :mod:`gc` and calls ``close`` on any :class:`h5py.File` instance. Falls back to ``tables.file._open_files.close_all`` if the primary loop raises. Adapted from https://stackoverflow.com/questions/29863342/close-an-open-h5py-data-file. """ try: for obj in gc.get_objects(): # Browse through ALL objects if isinstance(obj, h5py.File): # Just HDF5 files try: obj.close() except (OSError, ValueError, RuntimeError): pass # Was already closed except Exception as e: print(f"Error closing h5 files. Will try again using `tables._open_files.close_all()`. Error: {e}") import tables tables.file._open_files.close_all() gc.collect()
[docs] def show_group_items(hObj): """ Prints the items at the top hierarchical level of an HDF5 object or dict. RH 2021 See :func:`show_item_tree` for a full recursive listing. Args: hObj (object): Hierarchical object: an :class:`h5py.File`, :class:`h5py.Group`, or a Python ``dict``. Example: .. highlight:: python .. code-block:: python with h5py.File(path, 'r') as f: h5_handling.show_group_items(f) """ for ii,val in enumerate(list(iter(hObj))): if isinstance(hObj[val] , h5py.Group) or isinstance(hObj[val]): print(f'{ii+1}. {val}:----------------') if isinstance(hObj[val] , dict): print(f'{ii+1}. {val}:----------------') else: if hasattr(hObj[val] , 'shape') and hasattr(hObj[val] , 'dtype'): print(f'{ii+1}. {val}: shape={hObj[val].shape} , dtype={hObj[val].dtype}') else: print(f'{ii+1}. {val}: type={type(hObj[val])}')
[docs] def show_item_tree(hObj=None , path=None, depth=None, show_metadata=True, print_metadata=False, indent_level=0): """ Recursively prints the items and groups in an HDF5 object or dict. RH 2021 Args: hObj (object): Hierarchical object: an :class:`h5py.File`, :class:`h5py.Group`, or a Python ``dict``. Ignored when ``path`` is provided. (Default is ``None``) path (Optional[object]): Path-like to an HDF5 file to open in read mode. If not ``None``, the file is opened and traversed in place of ``hObj``. (Default is ``None``) depth (Optional[int]): Maximum number of hierarchical levels to descend. ``None`` means unlimited. (Default is ``None``) show_metadata (bool): If ``True``, list per-node metadata attributes alongside items. (Default is ``True``) print_metadata (bool): If ``True``, also print the value of each metadata attribute; otherwise only its shape and dtype are shown. (Default is ``False``) indent_level (int): Internal recursion bookkeeping for indentation; users should leave this at the default. (Default is ``0``) Example: .. highlight:: python .. code-block:: python with h5py.File(path, 'r') as f: h5_handling.show_item_tree(f) """ if depth is None: depth = int(10000000000000000000) else: depth = int(depth) if depth < 0: return if path is not None: with h5py.File(path , 'r') as f: show_item_tree(hObj=f, path=None, depth=depth-1, show_metadata=show_metadata, print_metadata=print_metadata, indent_level=indent_level) else: indent = f' '*indent_level if hasattr(hObj, 'attrs') and show_metadata: for ii,val in enumerate(list(hObj.attrs.keys()) ): if print_metadata: print(f'{indent}METADATA: {val}: {hObj.attrs[val]}') else: print(f'{indent}METADATA: {val}: shape={hObj.attrs[val].shape} , dtype={hObj.attrs[val].dtype}') for ii,val in enumerate(list(iter(hObj))): if isinstance(hObj[val], h5py.Group): print(f'{indent}{ii+1}. {val}:----------------') show_item_tree(hObj[val], depth=depth-1, show_metadata=show_metadata, print_metadata=print_metadata , indent_level=indent_level+1) elif isinstance(hObj[val], dict): print(f'{indent}{ii+1}. {val}:----------------') show_item_tree(hObj[val], depth=depth-1, show_metadata=show_metadata, print_metadata=print_metadata , indent_level=indent_level+1) else: if hasattr(hObj[val], 'shape') and hasattr(hObj[val], 'dtype'): print(f'{indent}{ii+1}. {val}: '.ljust(20) + f'shape={hObj[val].shape} ,'.ljust(20) + f'dtype={hObj[val].dtype}') else: print(f'{indent}{ii+1}. {val}: '.ljust(20) + f'type={type(hObj[val])}')
[docs] def make_h5_tree(dict_obj , h5_obj , group_string='', use_compression=False, track_order=True): """ Recursively writes a Python dict into an HDF5 group/dataset tree. RH 2021 Intended to be called by :func:`write_dict_to_h5`; using it directly is **not** recommended. Args: dict_obj (dict): Source dictionary whose hierarchy and leaf values become groups and datasets, respectively. h5_obj (h5py.File): Open HDF5 file (or group) into which the tree is written. group_string (str): Path of the current HDF5 group within ``h5_obj`` during recursion. An empty string is treated as the root ``'/'``. (Default is ``''``) use_compression (bool): If ``True``, write each dataset with gzip level 9 compression. (Default is ``False``) track_order (bool): If ``True``, set :func:`h5py.get_config` to preserve insertion order of items. (Default is ``True``) """ ## Set track_order to True to keep track of the order of the items in the dict ## This is useful for reading the dict back in from the h5 file h5py.get_config().track_order = track_order for ii,(key,val) in enumerate(dict_obj.items()): if group_string=='': group_string='/' if isinstance(val , dict): # print(f'making group: {key}') h5_obj[group_string].create_group(key) make_h5_tree(val , h5_obj[group_string] , f'{group_string}/{key}', use_compression=use_compression) else: ## cast to 'S' type if string so that it doesn't become '|O' object type in h5 file if isinstance(val, str): val = np.array(val, dtype=np.bytes_) # print(f'saving: {group_string}: {key}') kwargs_compression = {'compression': 'gzip', 'compression_opts': 9} if use_compression else {} h5_obj[group_string].create_dataset(key , data=val, **kwargs_compression)
[docs] def write_dict_to_h5( path_save, input_dict, use_compression=False, track_order=True, write_mode='w-', show_item_tree_pref=True ): """ Writes a Python dict to an HDF5 file, mirroring its hierarchy and data. RH 2021 Wraps :func:`make_h5_tree` and optionally prints the resulting tree. Args: path_save (object): Full path of the file to write. ``str`` or :class:`pathlib.Path`. input_dict (dict): Dictionary whose leaves are HDF5-writable values (typically :class:`numpy.ndarray` or strings). use_compression (bool): If ``True``, write each dataset with gzip compression. (Default is ``False``) track_order (bool): If ``True``, preserve dict insertion order in the HDF5 file. (Default is ``True``) write_mode (str): File-open mode forwarded to :class:`h5py.File`. Either \n * ``'w'``: Overwrite any existing file. * ``'w-'``: Refuse to overwrite an existing file. \n (Default is ``'w-'``) show_item_tree_pref (bool): If ``True``, print the resulting HDF5 hierarchy after writing. (Default is ``True``) """ with h5py.File(path_save , write_mode) as hf: make_h5_tree(input_dict , hf , '', use_compression=use_compression, track_order=track_order) if show_item_tree_pref: print(f'==== Successfully wrote h5 file. Displaying h5 hierarchy ====') show_item_tree(hf)
[docs] def simple_load(filepath, return_dict=True, verbose=False): """ Loads an HDF5 file and returns it as a nested ``dict`` or an open file. RH 2023 Args: filepath (object): Full path of the file to read. ``str`` or :class:`pathlib.Path`. return_dict (bool): If ``True``, return a nested ``dict`` whose keys are group names and whose leaves are the dataset arrays. If ``False``, return the open :class:`h5py.File` object instead. (Default is ``True``) verbose (bool): If ``True``, print the file's hierarchy via :func:`show_item_tree` before returning. (Default is ``False``) Returns: (object): data (object): Either a nested ``dict`` of arrays (when ``return_dict`` is ``True``) or an open :class:`h5py.File` handle. """ if return_dict: with h5py.File(filepath, 'r') as h5_file: if verbose: print(f'==== Loading h5 file with hierarchy: ====') show_item_tree(h5_file) result = {} def visitor_func(name, node): # Split name by '/' and reduce to nested dict keys = name.split('/') sub_dict = result for key in keys[:-1]: sub_dict = sub_dict.setdefault(key, {}) if isinstance(node, h5py.Dataset): sub_dict[keys[-1]] = node[...] elif isinstance(node, h5py.Group): sub_dict.setdefault(keys[-1], {}) h5_file.visititems(visitor_func) return result else: return h5py.File(filepath, 'r')
[docs] def h5Obj_to_dict(hObj): """ Converts an :mod:`h5py` group or file into a nested Python ``dict``. RH 2023 Args: hObj (object): An :class:`h5py.File` or :class:`h5py.Group` to traverse. Returns: (dict): h5_dict (dict): Nested dictionary mirroring the HDF5 hierarchy. Datasets are materialized via ``[()]``. """ h5_dict = {} for ii,val in enumerate(list(iter(hObj))): if isinstance(hObj[val], h5py.Group): h5_dict[val] = h5Obj_to_dict(hObj[val]) else: h5_dict[val] = hObj[val][()] return h5_dict
[docs] def simple_save( dict_to_save, path=None, use_compression=False, track_order=True, write_mode='w-', verbose=False ): """ Saves a Python dict to an HDF5 file or appends it to an existing one. RH 2021 Args: dict_to_save (dict): Dictionary to save to the HDF5 file. path (object): Full path of the file to write. ``str`` or :class:`pathlib.Path`. (Default is ``None``) use_compression (bool): If ``True``, write each dataset with gzip compression. (Default is ``False``) track_order (bool): If ``True``, preserve dict insertion order in the HDF5 file. (Default is ``True``) write_mode (str): File-open mode forwarded to :class:`h5py.File`. Either \n * ``'w'``: Overwrite any existing file. * ``'w-'``: Refuse to overwrite an existing file. * ``'a'``: Append a new dataset to an existing file. \n (Default is ``'w-'``) verbose (bool): If ``True``, print the resulting HDF5 hierarchy after writing. (Default is ``False``) """ write_dict_to_h5( path, dict_to_save, use_compression=use_compression, track_order=track_order, write_mode=write_mode, show_item_tree_pref=verbose )
[docs] def merge_helper(d, group): """ Recursively merges a dictionary into an open :class:`h5py.Group`. Sub-dictionaries map to subgroups; non-dict values are written as datasets, replacing any existing dataset with the same name. Args: d (dict): Dictionary containing the data to merge. group (object): Target :class:`h5py.Group` (or :class:`h5py.File`) to merge into. """ for key, value in d.items(): if isinstance(value, dict): # If the value is a dictionary, check if the group already exists, and either skip it or merge the data if key in group: merge_helper(value, group[key]) else: subgroup = group.create_group(key) merge_helper(value, subgroup) else: # If the value is not a dictionary, convert it to a numpy array and create a dataset if key in group: del group[key] group.create_dataset(key, data=value)
[docs] def merge_dict_into_h5_file(d, filepath=None, h5Obj=None,): """ Merges a dictionary into an existing HDF5 file or open file object. Wraps :func:`merge_helper`, which recursively walks the dict and merges each level into the matching HDF5 group. Exactly one of ``filepath`` or ``h5Obj`` must be supplied. Args: d (dict): Dictionary containing the data to merge. filepath (Optional[str]): Path to an HDF5 file to open in append mode. Do not specify when ``h5Obj`` is provided. (Default is ``None``) h5Obj (object): Open :class:`h5py.File` object to merge into. Do not specify when ``filepath`` is provided. (Default is ``None``) """ if filepath is None and h5Obj is None: raise ValueError('Either filepath or h5Obj must be specified.') elif filepath is not None and h5Obj is not None: raise ValueError('Only one of filepath or h5Obj must be specified.') elif filepath is not None: with h5py.File(filepath, 'a') as file: merge_helper(d, file) elif h5Obj is not None: merge_helper(d, h5Obj)