Source code for face_rhythm.data_importing

from pathlib import Path
from typing import Union
from typing import List
import multiprocessing as mp

import numpy as np
from tqdm.auto import tqdm
import decord

from .util import FR_Module
from .helpers import VideoReaderWrapper, BufferedVideoReader

## Define Dataset class as a subclass of utils.FR_Module
[docs] class Dataset_videos(FR_Module): """ Container for one or more videos used as input to the face-rhythm pipeline. RH 2022 Imports videos via ``decord`` (or wraps an existing :class:`BufferedVideoReader`) and exposes lazy per-video readers along with aggregated metadata (frame counts, frame rate, frame shape, channel count). Acts as a sequence of video readers. Args: bufferedVideoReader (object): Pre-built :class:`BufferedVideoReader` whose readers and metadata are reused. Mutually exclusive with ``paths_videos``; exactly one must be provided. (Default is ``None``) paths_videos (Union[str, List[str]]): Path or list of paths to the video files to load. Used when ``bufferedVideoReader`` is ``None``. (Default is ``None``) contiguous (bool): If ``True``, videos are treated as a single contiguous stream (the first frame of each subsequent video continues the frame index of the previous one). (Default is ``False``) frame_rate_clamp (float): If ``None`` the frame rate stored in ``self.frame_rate`` is the median of the per-video metadata frame rates. If a float, that value is used verbatim. (Default is ``None``) verbose (Union[bool, int]): Verbosity level. \n * ``0``: Silent. * ``1``: Warnings only. * ``2``: Warnings and informational progress messages. \n (Default is ``1``) Attributes: videos (List[object]): Per-video lazy reader objects (``VideoReaderWrapper`` instances or readers borrowed from ``bufferedVideoReader``). paths_videos (List[str]): Absolute paths to the source video files. metadata (dict): Per-video metadata with keys ``'paths_videos'``, ``'num_frames'``, ``'frame_rate'``, ``'frame_height_width'``, and ``'num_channels'``. num_frames_total (int): Total number of frames summed across all videos. frame_rate (float): Effective frame rate used by the pipeline. frame_height_width (List[int]): Frame height and width shared by all videos. num_channels (int): Number of channels shared by all videos. example_image (np.ndarray): The first frame of the first video, materialized as a CPU ``numpy`` array. contiguous (bool): Whether videos are treated as a single contiguous stream. config (dict): Inputs needed to reconstruct this object, used by ``FR_Module``. run_info (dict): Derived run-level metadata, used by ``FR_Module``. run_data (dict): Heavyweight outputs (currently ``example_image``), used by ``FR_Module``. """ def __init__( self, bufferedVideoReader: BufferedVideoReader=None, paths_videos: Union[str, List[str]]=None, contiguous: bool=False, frame_rate_clamp: float=None, verbose: Union[bool, int]=1, ): """Initializes the dataset, opens each video, and collects metadata.""" ## Imports super().__init__() ## Set variables self.contiguous = bool(contiguous) self.verbose = int(verbose) ## Determine whether to use bufferedVideoReader or paths_videos self._videoDataType = 'BufferedVideoReader' if bufferedVideoReader is not None else 'paths_videos' ### Assert that either bufferedVideoReader or paths_videos is specified assert bufferedVideoReader is not None or paths_videos is not None, "FR ERROR: bufferedVideoReader or paths_videos must be specified" ## Workflow if method is 'paths_videos' ### Assert that if using 'paths_videos', it is either a list of strings or a string if self._videoDataType == 'paths_videos': if isinstance(paths_videos, list): assert all([isinstance(path, str) for path in paths_videos]), "FR ERROR: paths_videos must be a string or list of strings to paths of videos" else: assert isinstance(paths_videos, str), "FR ERROR: paths_videos must be a string or list of strings to paths of videos" ## If paths_videos is a string, convert it to a list of strings self.paths_videos = paths_videos if isinstance(paths_videos, list) else [paths_videos] ## Assert that all paths_videos exist exists_paths_videos = [bool(Path(path).exists()) for path in self.paths_videos] assert all(exists_paths_videos), f"FR ERROR: paths_videos must exist. The following paths do not exist: {[path for path, exists in zip(self.paths_videos, exists_paths_videos) if not exists]}" ## Load videos print("FR: Loading lazy video reader objects...") if self.verbose > 1 else None decord.bridge.set_bridge('torch') self.videos = [VideoReaderWrapper(path_video, ctx=decord.cpu(0), num_threads=mp.cpu_count()) for path_video in tqdm(self.paths_videos, disable=(self.verbose < 2))] ## make video metadata dataframe print("FR: Collecting video metadata...") if self.verbose > 1 else None self.metadata = {"paths_videos": self.paths_videos} self.num_frames, self.frame_rate, self.frame_height_width, self.num_channels = [], [], [], [] for v in tqdm(self.videos): self.num_frames.append(int(len(v))) self.frame_rate.append(float(v.get_avg_fps())) frame_tmp = v[0] self.frame_height_width.append([int(n) for n in frame_tmp.shape[:2]]) self.num_channels.append(int(frame_tmp.shape[2])) self.metadata["num_frames"] = self.num_frames self.metadata["frame_rate"] = self.frame_rate self.metadata["frame_height_width"] = self.frame_height_width self.metadata["num_channels"] = self.num_channels ## Assert that all videos must have at least one frame assert all([n > 0 for n in self.metadata["num_frames"]]), "FR ERROR: All videos must have at least one frame" ## Assert that all videos must have the same shape assert all([n == self.metadata["frame_height_width"][0] for n in self.metadata["frame_height_width"]]), "FR ERROR: All videos must have the same shape" ## Assert that all videos must have the same number of channels assert all([n == self.metadata["num_channels"][0] for n in self.metadata["num_channels"]]), "FR ERROR: All videos must have the same number of channels" ## Workflow if method is 'BufferedVideoReader' elif self._videoDataType == 'BufferedVideoReader': ## Assert that bufferedVideoReader is a BufferedVideoReader object print('printing this line helps the bufferedVideoReader object load properly. ', type(bufferedVideoReader), type(BufferedVideoReader), BufferedVideoReader.__class__, isinstance(bufferedVideoReader, BufferedVideoReader)) if self.verbose > 1 else None ## line needed sometimes for next assert to work assert isinstance(bufferedVideoReader, BufferedVideoReader), "FR ERROR: bufferedVideoReader must be a BufferedVideoReader object" ## Set self.videos to bufferedVideoReader self.videos = bufferedVideoReader.video_readers ## Set self.paths_videos to bufferedVideoReader.paths_videos self.paths_videos = bufferedVideoReader.paths_videos ## Set self.metadata to bufferedVideoReader.metadata self.metadata = bufferedVideoReader.metadata ## set frame rate if frame_rate_clamp is None: frame_rates = self.metadata["frame_rate"] ## warn if any video's frame rate is very different from others max_diff = float((np.max(frame_rates) - np.min(frame_rates)) / np.mean(frame_rates)) print(f"FR WARNING: max frame rate difference is large: {max_diff*100:.2f}%") if ((max_diff > 0.1) and (self.verbose > 0)) else None self.frame_rate = float(np.median(frame_rates)) else: self.frame_rate = float(frame_rate_clamp) self.num_frames_total = int(np.sum(self.metadata["num_frames"])) self.frame_height_width = self.metadata["frame_height_width"][0] self.num_channels = self.metadata["num_channels"][0] self.paths_videos = [str(path) for path in self.paths_videos] ## ensure paths are strings ## Materialize the example frame as a CPU numpy array. When the ## underlying BufferedVideoReader uses NVDEC (device='cuda'), frames ## come back as CUDA torch tensors, and h5py.create_dataset() later ## fails to call .numpy() on them. Forcing CPU here keeps the on-disk ## representation independent of the decode device. _ex = self.videos[0][0] if hasattr(_ex, 'detach'): _ex = _ex.detach() if hasattr(_ex, 'cpu'): _ex = _ex.cpu() if hasattr(_ex, 'numpy'): _ex = _ex.numpy() self.example_image = np.asarray(_ex) ## For FR_Module compatibility self.config = { "paths_videos": self.paths_videos, "contiguous": contiguous, "frame_rate_clamp": frame_rate_clamp, "verbose": verbose, } self.run_info = { "frame_rate": self.frame_rate, "num_frames_total": self.num_frames_total, "frame_height_width": self.frame_height_width, "num_channels": self.num_channels, "metadata": self.metadata, } self.run_data = { "example_image": self.example_image, } ## Append the self.run_info data to self.run_data # self.run_data.update(self.run_info) def __repr__(self): """Returns a one-line summary of dataset shape, frame rate, and channel count.""" return f"Dataset_videos, num_videos={len(self.paths_videos)}, num_frames_total={self.num_frames_total}, frame_rate={self.frame_rate}, frame_height_width={self.frame_height_width}, num_channels={self.num_channels}" ## Define methods for loading and handling videos def __getitem__(self, index): """Returns the lazy video reader at position ``index``.""" return self.videos[index] def __len__(self): """Returns the number of videos in the dataset.""" return len(self.videos) def __iter__(self): """Returns an iterator over the per-video lazy readers.""" return iter(self.videos) def __next__(self): """Returns the next per-video lazy reader from the iterator.""" return next(self.videos)