Source code for zea.data.convert.picmus

"""
Script to convert the PICMUS database to the zea format.

Example usage:
```bash
python zea/data/convert/picmus.py \
--src_dir /mnt/data/PICMUS \
--output_dir converted_PICMUS_dir
```
"""

import argparse
import logging
import os
from pathlib import Path

import h5py
import numpy as np

from zea import log
from zea.beamform.delays import compute_t0_delays_planewave
from zea.data.data_format import generate_zea_dataset


[docs] def convert_picmus(source_path, output_path, overwrite=False): """Converts the PICMUS database to the zea format. Args: source_path (str, pathlike): The path to the original PICMUS file. output_path (str, pathlike): The path to the output file. overwrite (bool, optional): Set to True to overwrite existing file. Defaults to False. """ # Check if output file already exists and remove if os.path.exists(output_path): if overwrite: os.remove(output_path) else: logging.warning("Output file already exists. Skipping conversion.") return # Open the file file = h5py.File(source_path, "r") # Get the group containing the dataset file = file["US"]["US_DATASET0000"] if "data" not in file: raise ValueError("The file does not contain the data group.") # Extract I- and Q-data (shape (tx, el, ax)) i_data = file["data"]["real"][:] q_data = file["data"]["imag"][:] if np.abs(np.sum(q_data)) < 0.1: # Use only the I-data, add dummy dimension (shape (tx, el, ax, ch=1)) raw_data = i_data[..., None] else: # Stack I- and Q-data (shape (tx, el, ax, 2)) raw_data = np.stack([i_data, q_data], axis=-1) # Add dummy frame dimension (shape (frame=1, tx, el, ax, ch=1)) raw_data = raw_data[None] raw_data = np.transpose(raw_data, (0, 1, 3, 2, 4)) _, n_tx, _, n_el, _ = raw_data.shape center_frequency = int(file["modulation_frequency"][:][0]) # Fix a mistake in one of the PICMUS files if center_frequency == 0: center_frequency = 5.208e6 sampling_frequency = int(file["sampling_frequency"][:][0]) probe_geometry = np.transpose(file["probe_geometry"][:], (1, 0)) sound_speed = float(file["sound_speed"][:][0]) focus_distances = np.zeros((n_tx,), dtype=np.float32) polar_angles = file["angles"][:] azimuth_angles = np.zeros((n_tx,), dtype=np.float32) t0_delays = np.zeros((n_tx, n_el), dtype=np.float32) tx_apodizations = np.ones((n_tx, n_el), dtype=np.float32) initial_times = np.zeros((n_tx,)) for n in range(n_tx): v = np.array([np.sin(polar_angles[n]), 0, np.cos(0)]) initial_times[n] = -np.min(np.sum(probe_geometry * v[None], axis=1)) / sound_speed t0_delays[n] = compute_t0_delays_planewave( probe_geometry=probe_geometry, polar_angles=polar_angles[n], sound_speed=sound_speed, ) # This line changes the data format to work with the old beamformer, # which is not in accordance with the new zea format generate_zea_dataset( path=output_path, raw_data=raw_data, center_frequency=center_frequency, sampling_frequency=sampling_frequency, probe_geometry=probe_geometry, initial_times=initial_times, sound_speed=sound_speed, t0_delays=t0_delays, focus_distances=focus_distances, polar_angles=polar_angles, azimuth_angles=azimuth_angles, tx_apodizations=tx_apodizations, probe_name="verasonics_l11_4v", description="PICMUS dataset converted to zea format", )
[docs] def get_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser( description=( "Converts the PICMUS database to the zea format. The " "src_dir is scanned for hdf5 files ending in iq or rf. These files are" "converted and stored in output_dir under the same relative path as " "they came from in src_dir." ) ) parser.add_argument( "--src_dir", type=str, help="Source directory where the original PICMUS data is stored.", ) parser.add_argument("--output_dir", type=str, help="Output directory of the converted database") return parser.parse_args()
if __name__ == "__main__": # Parse the arguments args = get_args() # Get the source and output directories base_dir = Path(args.src_dir) output_dir = Path(args.output_dir) # Check if the source directory exists and create the output directory assert base_dir.exists(), f"Source directory {base_dir} does not exist." output_dir.mkdir(parents=True, exist_ok=False) # Traverse the source directory and convert all files for file in base_dir.rglob("*.hdf5"): str_file = str(file) # Select only the data files that actually contain rf or iq data # (There are also files containing the geometry of the phantoms or # images) if ( not str_file.endswith("iq.hdf5") or not str_file.endswith("rf.hdf5") ) and "img" in str_file: log.info("Skipping %s", file.name) continue log.info("Converting %s", file.name) # Find the folder relative to the base directory to retain the # folder structure in the output directory output_file = output_dir / file.relative_to(base_dir) # Define the output path # NOTE: I added output_file.stem to put each file in its own # folder. This makes it possible to use it as a dataset because # it ensures there are never different types of data file in # the same folder. output_file = output_file.parent / output_file.stem / f"{output_file.stem}.hdf5" # Convert the file try: # Create the output directory if it does not exist already output_file.parent.mkdir(parents=True, exist_ok=True) convert_picmus(file, output_file, overwrite=True) except Exception: output_file.parent.rmdir() log.error("Failed to convert %s", str_file) continue