Source code for zea.data.convert.camus
"""Functionality to convert the camus dataset to the zea format.
Requires SimpleITK to be installed: pip install SimpleITK.
"""
from __future__ import annotations
import argparse
import importlib.util
import logging
import os
import sys
from pathlib import Path
from typing import Any, Dict, Tuple
import numpy as np
import scipy
from skimage.transform import resize
from tqdm import tqdm
# from zea.display import transform_sc_image_to_polar
from zea import log
from zea.data.data_format import generate_zea_dataset
from zea.utils import find_first_nonzero_index, translate
[docs]
def transform_sc_image_to_polar(image_sc, output_size=None, fit_outline=True):
"""
Transform a scan converted input image (cone) into square
using radial stretching and downsampling. Note that it assumes the background to be zero!
Please verify if your results make sense, especially if the image contains black parts
at the edges. This function is not perfect by any means, but it works for most cases.
Args:
image (numpy.ndarray): Input image as a 2D numpy array (height, width).
output_size (tuple, optional): Output size of the image as a tuple.
Defaults to image_sc.shape.
fit_outline (bool, optional): Whether to fit a polynomial the outline of the image.
Defaults to True. If this is set to False, and the ultrasound image contains
some black parts at the edges, weird artifacts can occur, because the jagged outline
is stretched to the desired width.
Returns:
numpy.ndarray: Squared image as a 2D numpy array (height, width).
"""
assert len(image_sc.shape) == 2, "function only allows for 2D data"
# Default output size is the input size
if output_size is None:
output_size = image_sc.shape
# Initialize an empty target array for polar_image
polar_image = np.zeros_like(image_sc)
# Flip along the x axis (such that curve of image_sc is pointing up)
flipped_image = np.flip(image_sc, axis=0)
# Find index of first non zero element along y axis (for every vertical line)
non_zeros_flipped = find_first_nonzero_index(flipped_image, 0)
# Remove any black vertical lines (columns) that do not contain image data
remove_vertical_lines = np.where(non_zeros_flipped == -1)[0]
polar_image = np.delete(polar_image, remove_vertical_lines, axis=1)
non_zeros_flipped = np.delete(non_zeros_flipped, remove_vertical_lines)
if fit_outline:
model_fitted_bottom = np.poly1d(
np.polyfit(range(len(non_zeros_flipped)), non_zeros_flipped, 4)
)
non_zeros_flipped = model_fitted_bottom(range(len(non_zeros_flipped)))
non_zeros_flipped = non_zeros_flipped.round().astype(np.int64)
non_zeros_flipped = np.clip(non_zeros_flipped, 0, None)
non_zeros = polar_image.shape[0] - non_zeros_flipped
# Find the middle of the width of the image
width = polar_image.shape[1]
width_middle = round(width / 2)
# For every vertical line in the image
for x_i in range(width):
# Move the flipped first non-zero element to the bottom of the image
polar_image[non_zeros_flipped[x_i] :, x_i] = image_sc[: non_zeros[x_i], x_i]
# Find indices of first and last non-zero element along x axis (for every horizontal line)
non_zeros_left = find_first_nonzero_index(polar_image, 1)
non_zeros_right = width - find_first_nonzero_index(np.flip(polar_image, 1), 1, width_middle)
# Remove any black horizontal lines (rows) that do not contain image data
remove_horizontal_lines = np.max(np.where(non_zeros_left == -1)) + 1
polar_image = polar_image[remove_horizontal_lines:, :]
non_zeros_left = non_zeros_left[remove_horizontal_lines:]
non_zeros_right = non_zeros_right[remove_horizontal_lines:]
if fit_outline:
model_fitted_left = np.poly1d(np.polyfit(range(len(non_zeros_left)), non_zeros_left, 2))
non_zeros_left = model_fitted_left(range(len(non_zeros_left)))
non_zeros_left = non_zeros_left.round().astype(np.int64)
model_fitted_right = np.poly1d(np.polyfit(range(len(non_zeros_right)), non_zeros_right, 2))
non_zeros_right = model_fitted_right(range(len(non_zeros_right)))
non_zeros_right = non_zeros_right.round().astype(np.int64)
# For every horizontal line in the image
for y_i in range(polar_image.shape[0]):
small_array = polar_image[y_i, non_zeros_left[y_i] : non_zeros_right[y_i]]
if len(small_array) <= 1:
# If the array is too small for interpolation, set it to the middle value.
polar_image[y_i, :] = polar_image[y_i, width_middle]
else:
# Perform linear interpolation to stretch the line to the desired width.
array_interp = scipy.interpolate.interp1d(np.arange(small_array.size), small_array)
polar_image[y_i, :] = array_interp(np.linspace(0, small_array.size - 1, width))
# Resize image to output_size
return resize(polar_image, output_size, preserve_range=True)
[docs]
def sitk_load(filepath: str | Path) -> Tuple[np.ndarray, Dict[str, Any]]:
"""Loads an image using SimpleITK and returns the image and its metadata.
Args:
filepath: Path to the image.
Returns:
- ([N], H, W), Image array.
- Collection of metadata.
"""
# Load image and save info
image = sitk.ReadImage(str(filepath))
all_metadata = {}
for k in image.GetMetaDataKeys():
all_metadata[k] = image.GetMetaData(k)
metadata = {
"origin": image.GetOrigin(),
"ElementSpacing": image.GetSpacing(),
"direction": image.GetDirection(),
"NDims": image.GetDimension(),
"metadata": all_metadata,
}
# Extract numpy array from the SimpleITK image object
im_array = sitk.GetArrayFromImage(image)
return im_array, metadata
[docs]
def convert_camus(source_path, output_path, overwrite=False):
"""Converts the camus database to the zea format.
Args:
source_path (str, pathlike): The path to the original camus file.
output_path (str, pathlike): The path to the output file.
overwrite (bool, optional): Set to True to overwrite existing file.
Defaults to False.
"""
# Check if output file already exists and remove
if os.path.exists(output_path):
if overwrite:
os.remove(output_path)
else:
logging.warning("Output file already exists. Skipping conversion.")
return
# Open the file
image_seq, _ = sitk_load(source_path)
# Convert to polar coordinates
image_seq_polar = []
for image in image_seq:
image_seq_polar.append(transform_sc_image_to_polar(image))
image_seq_polar = np.stack(image_seq_polar, axis=0)
# Change range to [-60, 0] dB
image_seq = translate(image_seq, (0, 255), (-60, 0))
image_seq_polar = translate(image_seq_polar, (0, 255), (-60, 0))
generate_zea_dataset(
path=output_path,
image=image_seq_polar,
image_sc=image_seq,
probe_name="generic",
description="camus dataset converted to zea format",
)
[docs]
def get_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser()
parser.add_argument(
"--source",
type=str,
# path to CAMUS_public/database_nifti
required=True,
)
parser.add_argument(
"--output",
type=str,
required=True,
)
args = parser.parse_args()
return args
splits = {"train": [1, 401], "val": [401, 451], "test": [451, 501]}
[docs]
def get_split(patient_id: int) -> str:
"""Determine the dataset split for a given patient ID."""
if splits["train"][0] <= patient_id < splits["train"][1]:
return "train"
elif splits["val"][0] <= patient_id < splits["val"][1]:
return "val"
elif splits["test"][0] <= patient_id < splits["test"][1]:
return "test"
else:
raise ValueError(f"Did not find split for patient: {patient_id}")
if __name__ == "__main__":
if importlib.util.find_spec("SimpleITK") is None:
log.error("SimpleITK not installed. Please install SimpleITK: `pip install SimpleITK`")
sys.exit()
import SimpleITK as sitk
args = get_args()
camus_source_folder = Path(args.source)
camus_output_folder = Path(args.output)
# check if output folders already exist
for split in splits:
assert not (camus_output_folder / split).exists(), (
f"Output folder {camus_output_folder / split} exists. Exiting program."
)
# clone folder structure of source to output using pathlib
# and run convert_camus() for every hdf5 found in there
files = list(camus_source_folder.glob("**/*_half_sequence.nii.gz"))
for source_file in tqdm(files):
# check if source file in camus database (ignore other files)
if "database_nifti" not in source_file.parts:
continue
patient = source_file.stem.split("_")[0]
patient_id = int(patient.removeprefix("patient"))
split = get_split(patient_id)
output_file = camus_output_folder / split / source_file.relative_to(camus_source_folder)
# Replace .nii.gz with .hdf5
output_file = output_file.with_suffix("").with_suffix(".hdf5")
# make sure folder exists
output_file.parent.mkdir(parents=True, exist_ok=True)
convert_camus(source_file, output_file, overwrite=False)