habit.utils.io_utils 源代码

"""
I/O utilities for habitat analysis
"""

import os
import json
import pandas as pd
import SimpleITK as sitk
import numpy as np
from typing import Dict, Any, Optional, List
import yaml
import logging
from habit.core.common.config_loader import load_config, save_config, validate_config
from habit.utils.habitat_postprocess_utils import remove_small_connected_components


def _scan_folder_for_paths(root_folder: str, keyword_of_raw_folder: str = "images", keyword_of_mask_folder: str = "masks") -> tuple:
    """
    Scan folder structure for image and mask paths (internal function)
    
    Args:
        root_folder (str): Root directory
        keyword_of_raw_folder (str, optional): Name of the images folder
        keyword_of_mask_folder (str, optional): Name of the masks folder
    
    Returns:
        tuple: Dictionary of image paths and dictionary of mask paths
    """
    # Get image paths
    images_paths = {}
    images_root = os.path.join(root_folder, keyword_of_raw_folder)
    # Filter out .DS_Store and other hidden files
    subjects = [f for f in os.listdir(images_root) if not f.startswith('.')]
    
    for subj in subjects:
        images_paths[subj] = {}
        subj_path = os.path.join(images_root, subj)
        # Filter out .DS_Store and other hidden files
        img_subfolders = [f for f in os.listdir(subj_path) if not f.startswith('.')]
        
        for img_subfolder in img_subfolders:
            img_subfolder_path = os.path.join(subj_path, img_subfolder)
            if os.path.isdir(img_subfolder_path):
                # Filter out .DS_Store and other hidden files
                img_files = [f for f in os.listdir(img_subfolder_path) if not f.startswith('.')]
                # Check if no files found
                if len(img_files) == 0:
                    print(f"Warning: No image files found in {subj}/{img_subfolder}, skipping")
                    continue
                # Warning if multiple files
                if len(img_files) > 1:
                    print(f"Warning: Multiple image files in {subj}/{img_subfolder}")
                img_file = img_files[0]
                images_paths[subj][img_subfolder] = os.path.join(img_subfolder_path, img_file)
    
    # Get mask paths
    mask_paths = {}
    masks_root = os.path.join(root_folder, keyword_of_mask_folder)

    # if no masks folder, return empty mask_paths
    if not os.path.exists(masks_root):
        return images_paths, {}
    
    # Filter out .DS_Store and other hidden files
    subjects = [f for f in os.listdir(masks_root) if not f.startswith('.')]
    for subj in subjects:
        mask_paths[subj] = {}
        subj_path = os.path.join(masks_root, subj)
        # Filter out .DS_Store and other hidden files
        mask_subfolders = [f for f in os.listdir(subj_path) if not f.startswith('.')]
        
        for mask_subfolder in mask_subfolders:
            mask_subfolder_path = os.path.join(subj_path, mask_subfolder)
            if os.path.isdir(mask_subfolder_path):
                # Filter out .DS_Store and other hidden files
                mask_files = [f for f in os.listdir(mask_subfolder_path) if not f.startswith('.')]
                # Check if no files found
                if len(mask_files) == 0:
                    print(f"Warning: No mask files found in {subj}/{mask_subfolder}, skipping")
                    continue
                # Warning if multiple files
                if len(mask_files) > 1:
                    print(f"Warning: Multiple mask files in {subj}/{mask_subfolder}")
                mask_file = mask_files[0]
                mask_paths[subj][mask_subfolder] = os.path.join(mask_subfolder_path, mask_file)
    
    return images_paths, mask_paths

[文档] def get_image_and_mask_paths(root_folder: str, keyword_of_raw_folder: str = "images", keyword_of_mask_folder: str = "masks", auto_select_first_file: bool = True) -> tuple: """ Get paths for all image and mask files Args: root_folder (str): Root directory or path to YAML configuration file keyword_of_raw_folder (str, optional): Name of the images folder (only used when root_folder is a directory) keyword_of_mask_folder (str, optional): Name of the masks folder (only used when root_folder is a directory) auto_select_first_file (bool, optional): If True, automatically select the first file when path is a directory. If False, keep directory path as is. Defaults to True. Returns: tuple: Dictionary of image paths and dictionary of mask paths Note: If root_folder is a YAML file, it should contain the following structure: ```yaml images: subject1: image_type1: /path/to/image1 image_type2: /path/to/image2 subject2: image_type1: /path/to/image1 masks: subject1: image_type1: /path/to/mask1 image_type2: /path/to/mask2 subject2: image_type1: /path/to/mask1 # Optional: control whether to automatically select first file in directory auto_select_first_file: true # or false ``` """ # Check if input is a YAML configuration file if os.path.isfile(root_folder) and root_folder.lower().endswith(('.yaml', '.yml')): # Load configuration from YAML file # Using load_config from config_utils which handles path resolution config = load_config(root_folder) # Check if auto_select_first_file is specified in config file # Config file takes precedence over function parameter if 'auto_select_first_file' in config: auto_select_first_file = config['auto_select_first_file'] # Extract images and masks paths from config images_paths = config.get('images', {}) mask_paths = config.get('masks', {}) # Validate that all paths exist for subject, img_dict in images_paths.items(): for img_type, img_path in img_dict.items(): if not os.path.exists(img_path): print(f"Warning: Image file not found: {img_path} for {subject}/{img_type}") for subject, mask_dict in mask_paths.items(): for mask_type, mask_path in mask_dict.items(): if not os.path.exists(mask_path): print(f"Warning: Mask file not found: {mask_path} for {subject}/{mask_type}") # if is dir and auto_select_first_file is True, get the first file in the dir if auto_select_first_file: for subject, img_dict in images_paths.items(): for img_type, img_path in img_dict.items(): if os.path.isdir(img_path): files = [f for f in os.listdir(img_path) if not f.startswith('.')] if files: img_dict[img_type] = os.path.join(img_path, files[0]) for subject, mask_dict in mask_paths.items(): for mask_type, mask_path in mask_dict.items(): if os.path.isdir(mask_path): files = [f for f in os.listdir(mask_path) if not f.startswith('.')] if files: mask_dict[mask_type] = os.path.join(mask_path, files[0]) return images_paths, mask_paths # Use folder scanning logic return _scan_folder_for_paths(root_folder, keyword_of_raw_folder, keyword_of_mask_folder)
[文档] def load_timestamp(file_path: str, subjID_column: str = "Name") -> dict: """ Load scan timestamps from Excel file Args: file_path (str): Path to the Excel file subjID_column (str, optional): Name of the subject ID column Returns: dict: Dictionary with subject names as keys and timestamp lists as values """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") df = pd.read_excel(file_path, index_col=subjID_column) # convert index to string df.index = df.index.astype(str) return df
[文档] def save_results(out_folder: str, results: pd.DataFrame, config: dict = None, file_name: str = "habitats.csv") -> None: """ Save clustering results Args: out_folder (str): Output directory results (DataFrame): Results DataFrame config (dict, optional): Configuration dictionary, saved as JSON if not None file_name (str, optional): Name of the CSV file to save """ # Create output folder if it doesn't exist if not os.path.exists(out_folder): os.makedirs(out_folder) # Save configuration if config: with open(os.path.join(out_folder, "config.json"), "w") as f: json.dump(config, f, indent=4) # Save CSV results results.to_csv(os.path.join(out_folder, file_name), index=True) print(f"Results saved to {os.path.join(out_folder, file_name)}")
[文档] def save_supervoxel_image(subject: str, supervoxel_labels: np.ndarray, mask_path: str, out_folder: str) -> str: """ Save supervoxel image Args: subject (str): Subject name supervoxel_labels (ndarray): Supervoxel labels mask_path (str): Path to the mask file out_folder (str): Output directory Returns: str: Path to the saved file """ # Load mask mask = sitk.ReadImage(mask_path) mask_array = sitk.GetArrayFromImage(mask) # Create supervoxel image supervoxel_map = np.zeros_like(mask_array) supervoxel_map[mask_array > 0] = supervoxel_labels # Convert to SimpleITK image and save supervoxel_img = sitk.GetImageFromArray(supervoxel_map) supervoxel_img.CopyInformation(mask) output_path = os.path.join(out_folder, f"{subject}_supervoxel.nrrd") sitk.WriteImage(supervoxel_img, output_path) return output_path
[文档] def save_habitat_image( subject: str, habitats_df: pd.DataFrame, supervoxel_path: str, out_folder: str, postprocess_settings: Optional[Dict[str, Any]] = None, ) -> str: """ Save habitat image Args: subject (str): Subject name habitats_df (DataFrame): Habitat DataFrame containing Supervoxel and Habitats columns supervoxel_path (str): Path to the supervoxel image out_folder (str): Output directory Returns: str: Path to the saved file TODO: 1. 某个团块的体素只有很少的几个,是否需要删除,或者归位其他相似的团块中去 """ # Load supervoxel image supervoxel = sitk.ReadImage(supervoxel_path) supervoxel_array = sitk.GetArrayFromImage(supervoxel) # Create habitat image habitats_array = np.zeros_like(supervoxel_array) habitats_subj = habitats_df.loc[subject] n_clusters_supervoxel = habitats_subj.shape[0] for i in range(n_clusters_supervoxel): # Assert that habitats_subj[habitats_subj['Supervoxel'] == i+1]['Habitats'] has exactly one value # assert habitats_subj[habitats_subj['Supervoxel'] == i+1].shape[0] == 1, f"Multiple rows for supervoxel {i+1} in subject {subject}, please check the data table" if (supervoxel_array == i+1).sum() > 0: habitats_array[supervoxel_array == i+1] = habitats_subj[habitats_subj['Supervoxel'] == i+1]['Habitats'].values[0] roi_mask = supervoxel_array > 0 if postprocess_settings and postprocess_settings.get("enabled", False): habitats_array = remove_small_connected_components( label_map=habitats_array.astype(np.int32, copy=False), roi_mask=roi_mask, settings=postprocess_settings ) # Convert to SimpleITK image and save habitats_img = sitk.GetImageFromArray(habitats_array) habitats_img.CopyInformation(supervoxel) output_path = os.path.join(out_folder, f"{subject}_habitats.nrrd") sitk.WriteImage(habitats_img, output_path) return output_path
[文档] def save_json(data: Any, file_path: str) -> None: """Saves data to a JSON file.""" os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=4)
[文档] def save_csv(df: pd.DataFrame, file_path: str) -> None: """Saves DataFrame to a CSV file.""" os.makedirs(os.path.dirname(file_path), exist_ok=True) df.to_csv(file_path, index=False)
[文档] def detect_image_names(images_paths: dict) -> list: """ Automatically detect image names Args: images_paths (dict): Dictionary of image paths Returns: list: List of all unique image names """ # Collect all image names all_image_names = [] for subj in images_paths: for img_name in images_paths[subj].keys(): all_image_names.append(img_name) # Get unique image names and sort unique_image_names = sorted(list(set(all_image_names))) return unique_image_names
[文档] def check_data_structure(images_paths: dict, mask_paths: dict, image_names: list, time_dict: dict = None) -> bool: """ Validate data structure Args: images_paths (dict): Dictionary of image paths mask_paths (dict): Dictionary of mask paths image_names (list): List of image names time_dict (dict, optional): Dictionary of timestamps, if None, not checked Raises: ValueError: If data structure is invalid Returns: bool: True if data structure is valid """ # Check data structure for each subject for subj in images_paths.keys(): img_names = images_paths[subj].keys() mask_names = mask_paths[subj].keys() # Check if image and mask names match diff_img_mask = set(img_names) - set(mask_names) if len(diff_img_mask) > 0: raise ValueError(f"Image and mask names don't match for {subj}, difference: {diff_img_mask}") # Check if required images exist diff_img = set(image_names) - set(img_names) if len(diff_img) > 0: raise ValueError(f"Image names don't match for {subj}, difference: {diff_img}") return True
[文档] def setup_logging(out_dir: str, debug: bool = False) -> logging.Logger: """ Set up logging configuration using centralized log system. NOTE: This is a legacy function for backward compatibility. New code should use habit.utils.log_utils.setup_logger() directly. Args: out_dir (str): Output directory for log files debug (bool, optional): Whether to enable debug mode. Defaults to False. Returns: logging.Logger: Configured logger instance """ from habit.utils.log_utils import setup_logger log_level = logging.DEBUG if debug else logging.INFO return setup_logger( name='habit', output_dir=out_dir, log_filename='processing.log', level=log_level )
[文档] def export_paths_to_yaml(root_folder: str, output_yaml_path: str, keyword_of_raw_folder: str = "images", keyword_of_mask_folder: str = "masks") -> None: """ Export folder structure to YAML configuration file Args: root_folder (str): Root directory to scan output_yaml_path (str): Path to save the YAML configuration file keyword_of_raw_folder (str, optional): Name of the images folder keyword_of_mask_folder (str, optional): Name of the masks folder """ # Get paths using the folder scanning method images_paths, mask_paths = _scan_folder_for_paths(root_folder, keyword_of_raw_folder, keyword_of_mask_folder) # Create configuration dictionary config = { 'images': images_paths, 'masks': mask_paths } # Save to YAML file save_config(config, output_yaml_path) print(f"Paths configuration exported to {output_yaml_path}")