Source code for zea.data.convert.echonet

"""
Script to convert the EchoNet database to .npy and zea formats.
Will segment the images and convert them to polar coordinates.
"""

import os

os.environ["KERAS_BACKEND"] = "numpy"

import argparse
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path

import numpy as np
from scipy.interpolate import griddata
from tqdm import tqdm

from zea.config import Config
from zea.data import generate_zea_dataset
from zea.io_lib import load_video
from zea.utils import translate


[docs] def get_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description="Convert EchoNet to zea format") parser.add_argument( "--source", type=str, # path to EchoNet-Dynamic/Videos required=True, ) parser.add_argument( "--output", type=str, required=True, ) parser.add_argument( "--splits", type=str, default=None, ) parser.add_argument("--output_numpy", type=str, default=None) parser.add_argument("--no_hyperthreading", action="store_true") args = parser.parse_args() return args
[docs] def segment(tensor, number_erasing=0, min_clip=0): """Segments the background of the echonet images by setting it to 0 and creating a hard edge. Args: tensor (ndarray): Input image (sc) with 3 dimensions. (N, 112, 112) number_erasing (float, optional): number to fill the background with. Returns: tensor (ndarray): Segmented matrix of same dimensions as input """ # Start with the upper part # Height of the diagonal lines for the columns [0, 112] rows_left = np.linspace(67, 7, 61) rows_right = np.linspace(7, 57, 51) rows = np.concatenate([rows_left, rows_right], axis=0) for idx, row in enumerate(rows.astype(np.int32)): # Set everything above the edge to the number_erasing value. # Rows count up from 0 to 112 so row-1 is above. tensor[:, 0 : row - 1, idx] = number_erasing # Set minimum values for the edge if min_clip > 0: tensor[:, row, idx] = np.clip(tensor[:, row, idx], min_clip, 1) # Bottom left curve (manual fit) cols_left = np.linspace(0, 20, 21).astype(np.int32) rows_left = np.array( [ 102, 103, 103, 104, 104, 105, 105, 106, 106, 107, 107, 107, 108, 108, 109, 109, 109, 110, 110, 111, 111, ] ) # Bottom right curve (manual fit) cols_right = np.linspace(89, 111, 23).astype(np.int32) rows_right = np.array( [ 111, 111, 111, 110, 110, 110, 109, 109, 109, 108, 108, 107, 107, 107, 106, 106, 105, 105, 104, 104, 103, 103, 102, ] ) rows = np.concatenate([rows_left, rows_right], axis=0) cols = np.concatenate([cols_left, cols_right], axis=0) for row, col in zip(rows, cols): # Set everything under the edge to the number_erasing value. # Rows count up from 0 to 112 so row-1 is above. tensor[:, row:, col] = number_erasing # Set minimum values for the edge if min_clip > 0: tensor[:, row - 1, col] = np.clip(tensor[:, row - 1, col], min_clip, 1) return tensor
[docs] def accept_shape(tensor): """Acceptance algorithm that determines whether to reject an image based on left and right corner data. Args: tensor (ndarray): Input image (sc) with 2 dimensions. (112, 112) Returns: decision (bool): Whether or not the tensor should be rejected. """ decision = True # Test one, check if left bottom corner is populated with values rows_lower = np.linspace(78, 47, 21).astype(np.int32) rows_upper = np.linspace(67, 47, 21).astype(np.int32) counter = 0 for idx, row in enumerate(rows_lower): counter += np.sum(tensor[rows_upper[idx] : row, idx]) # If it is not populated, reject the image if counter < 0.1: decision = False # Test two, check if the bottom right cornered with values (that are not artifacts) cols = np.linspace(70, 111, 42).astype(np.int32) rows_bot = np.linspace(17, 57, 42).astype(np.int32) rows_top = np.linspace(17, 80, 42).astype(np.int32) # List all the values counter = [] for i, col in enumerate(cols): counter += [tensor[rows_bot[i] : rows_top[i], col]] flattened_counter = [float(item) for sublist in counter for item in sublist] # Sort and exclude the first 50 (likely artifacts) flattened_counter.sort(reverse=True) value = sum(flattened_counter[100:]) # Reject if the baseline is too low if value < 5: decision = False return decision
[docs] def rotate_coordinates(data_points, degrees): """Function that rotates the datapoints by a certain degree. Args: data_points (ndarray): tensor containing [N,2] (x and y) datapoints. degrees (int): angle to rotate the datapoints with Returns: rotated_points (ndarray): the rotated data_points. """ angle_radians = np.radians(degrees) cos_angle = np.cos(angle_radians) sin_angle = np.sin(angle_radians) rotation_matrix = np.array([[cos_angle, -sin_angle], [sin_angle, cos_angle]]) rotated_points = rotation_matrix @ data_points.T return rotated_points.T
[docs] def cartesian_to_polar_matrix( cartesian_matrix, tip=(61, 7), r_max=107, angle=0.79, interpolation="nearest" ): """ Function that converts a timeseries of a cartesian cone to a polar representation that is more compatible with CNN's/action selection. Args: - cartesian_matrix (2d array): (rows, cols) matrix containing time sequence of image_sc data. - tip (tuple, optional): coordinates (in indices) of the tip of the cone. Defaults to (61, 7). - r_max (int, optional): expected radius of the cone. Defaults to 107. - angle (float, optional): expected angle of the cone, will be used as (-angle, angle). Defaults to 0.79. - interpolation (str, optional): can be [nearest, linear, cubic]. Defaults to 'nearest'. Returns: polar_matrix (2d array): polar conversion of the input. """ rows, cols = cartesian_matrix.shape center_x, center_y = tip # Create cartesian coordinates of the image data x = np.linspace(-center_x, cols - center_x - 1, cols) y = np.linspace(-center_y, rows - center_y - 1, rows) x, y = np.meshgrid(x, y) # Flatten the grid and values data_points = np.column_stack((x.ravel(), y.ravel())) data_points = rotate_coordinates(data_points, -90) data_values = cartesian_matrix.ravel() # Define new points to sample from in the region of the data. # R_max and Theta are found manually. R_max differs from the number of rows in EchoNet! r = np.linspace(0, r_max, rows) theta = np.linspace(-angle, angle, cols) r, theta = np.meshgrid(r, theta) x_polar = r * np.cos(theta) y_polar = r * np.sin(theta) new_points = np.column_stack((x_polar.ravel(), y_polar.ravel())) # Interpolate and reshape to 2D matrix polar_values = griddata( data_points, data_values, new_points, method=interpolation, fill_value=0 ) polar_matrix = np.rot90(polar_values.reshape(cols, rows), k=-1) return polar_matrix
[docs] def find_split_for_file(file_dict, target_file): """Function that finds the split for a given file in a dictionary.""" for split, files in file_dict.items(): if target_file in files: return split return "rejected"
[docs] class H5Processor: """ Stores a few variables and paths to allow for hyperthreading. """ def __init__( self, path_out_h5, path_out=None, num_val=500, num_test=500, range_from=(0, 255), range_to=(-60, 0), splits=None, ): self.path_out_h5 = Path(path_out_h5) self.path_out = Path(path_out) if path_out else None self.num_val = num_val self.num_test = num_test self.range_from = range_from self.range_to = range_to self.splits = splits self._process_range = (0, 1) # Ensure train, val, test, rejected paths exist for folder in ["train", "val", "test", "rejected"]: if self._to_numpy: (self.path_out / folder).mkdir(parents=True, exist_ok=True) (self.path_out_h5 / folder).mkdir(parents=True, exist_ok=True) @property def _to_numpy(self): return self.path_out is not None
[docs] def translate(self, data): """Translate the data from the processing range to final range.""" return translate(data, self._process_range, self.range_to)
[docs] def get_split(self, hdf5_file: str, sequence): """Determine the split for a given file.""" # Always check acceptance accepted = accept_shape(sequence[0]) # Previous split if self.splits is not None: split = find_split_for_file(self.splits, hdf5_file) assert accepted == (split != "rejected"), "Rejection mismatch" return split # New split if not accepted: return "rejected" # This inefficient counter works with hyperthreading # TODO: but it is not reproducible! val_counter = len(list((self.path_out_h5 / "val").iterdir())) test_counter = len(list((self.path_out_h5 / "test").iterdir())) # Determine the split if val_counter < self.num_val: return "val" elif test_counter < self.num_test: return "test" else: return "train"
def __call__(self, avi_file): """ Processes a single h5 file using the class variables and the filename given. """ hdf5_file = avi_file.stem + ".hdf5" sequence = load_video(avi_file) assert sequence.min() >= self.range_from[0], f"{sequence.min()} < {self.range_from[0]}" assert sequence.max() <= self.range_from[1], f"{sequence.max()} > {self.range_from[1]}" # Translate to [0, 1] sequence = translate(sequence, self.range_from, self._process_range) sequence = segment(sequence, number_erasing=0, min_clip=0) split = self.get_split(hdf5_file, sequence) accepted = split != "rejected" out_h5 = self.path_out_h5 / split / hdf5_file if self._to_numpy: out_dir = self.path_out / split / avi_file.stem out_dir.mkdir(parents=True, exist_ok=True) polar_im_set = [] for i, im in enumerate(sequence): if self._to_numpy: np.save(out_dir / f"sc{str(i).zfill(3)}.npy", im) if not accepted: continue polar_im = cartesian_to_polar_matrix(im, interpolation="cubic") polar_im = np.clip(polar_im, *self._process_range) if self._to_numpy: np.save( out_dir / f"polar{str(i).zfill(3)}.npy", polar_im, ) polar_im_set.append(polar_im) if accepted: polar_im_set = np.stack(polar_im_set, axis=0) # Check the ranges assert sequence.min() >= self._process_range[0], sequence.min() assert sequence.max() <= self._process_range[1], sequence.max() zea_dataset = { "path": out_h5, "image_sc": self.translate(sequence), "probe_name": "generic", "description": "EchoNet dataset converted to zea format", } if accepted: zea_dataset["image"] = self.translate(polar_im_set) return generate_zea_dataset(**zea_dataset)
if __name__ == "__main__": args = get_args() if args.splits is not None: # Reproduce a previous split... split_yaml_dir = Path(args.splits) splits = {"train": None, "val": None, "test": None} for split in splits: yaml_file = split_yaml_dir / (split + ".yaml") assert yaml_file.exists(), f"File {yaml_file} does not exist." splits[split] = Config.from_yaml(yaml_file)["file_paths"] else: splits = None # List the files that have an entry in path_out_h5 already files_done = [] for _, _, filenames in os.walk(args.output): for filename in filenames: files_done.append(filename.replace(".hdf5", "")) # List all files of echonet and exclude those already processed path_in = Path(args.source) h5_files = path_in.glob("*.avi") h5_files = [file for file in h5_files if file.stem not in files_done] print(f"Files left to process: {len(h5_files)}") # Run the processor processor = H5Processor(path_out_h5=args.output, path_out=args.output_numpy, splits=splits) print("Starting the conversion process.") if not args.no_hyperthreading: with ProcessPoolExecutor() as executor: futures = {executor.submit(processor, file): file for file in h5_files} for future in tqdm(as_completed(futures), total=len(h5_files)): future.result() else: for file in tqdm(h5_files): processor(file) print("All tasks are completed.")