utils 模块
这里包含了一系列通用的工具函数,可用于数据处理、文件操作和并行计算。
输入输出 (I/O)
I/O utilities for habitat analysis
- habit.utils.io_utils.get_image_and_mask_paths(root_folder: str, keyword_of_raw_folder: str = 'images', keyword_of_mask_folder: str = 'masks', auto_select_first_file: bool = True) tuple[源代码]
Get paths for all image and mask files
- 参数:
root_folder (str) -- Root directory or path to YAML configuration file
keyword_of_raw_folder (str, optional) -- Name of the images folder (only used when root_folder is a directory)
keyword_of_mask_folder (str, optional) -- Name of the masks folder (only used when root_folder is a directory)
auto_select_first_file (bool, optional) -- If True, automatically select the first file when path is a directory. If False, keep directory path as is. Defaults to True.
- 返回:
Dictionary of image paths and dictionary of mask paths
- 返回类型:
备注
If root_folder is a YAML file, it should contain the following structure: ```yaml images:
- subject1:
image_type1: /path/to/image1 image_type2: /path/to/image2
- subject2:
image_type1: /path/to/image1
- masks:
- subject1:
image_type1: /path/to/mask1 image_type2: /path/to/mask2
- subject2:
image_type1: /path/to/mask1
# Optional: control whether to automatically select first file in directory auto_select_first_file: true # or false ```
- habit.utils.io_utils.load_timestamp(file_path: str, subjID_column: str = 'Name') dict[源代码]
Load scan timestamps from Excel file
- habit.utils.io_utils.save_results(out_folder: str, results: DataFrame, config: dict | None = None, file_name: str = 'habitats.csv') None[源代码]
Save clustering results
- habit.utils.io_utils.save_supervoxel_image(subject: str, supervoxel_labels: ndarray, mask_path: str, out_folder: str) str[源代码]
Save supervoxel image
- habit.utils.io_utils.save_habitat_image(subject: str, habitats_df: DataFrame, supervoxel_path: str, out_folder: str, postprocess_settings: Dict[str, Any] | None = None) str[源代码]
Save habitat image
- 参数:
- 返回:
Path to the saved file
- 返回类型:
TODO: 1. 某个团块的体素只有很少的几个,是否需要删除,或者归位其他相似的团块中去
- habit.utils.io_utils.save_csv(df: DataFrame, file_path: str) None[源代码]
Saves DataFrame to a CSV file.
- habit.utils.io_utils.detect_image_names(images_paths: dict) list[源代码]
Automatically detect image names
- habit.utils.io_utils.check_data_structure(images_paths: dict, mask_paths: dict, image_names: list, time_dict: dict | None = None) bool[源代码]
Validate data structure
- 参数:
- 抛出:
ValueError -- If data structure is invalid
- 返回:
True if data structure is valid
- 返回类型:
- habit.utils.io_utils.setup_logging(out_dir: str, debug: bool = False) Logger[源代码]
Set up logging configuration using centralized log system.
NOTE: This is a legacy function for backward compatibility. New code should use habit.utils.log_utils.setup_logger() directly.
- 参数:
- 返回:
Configured logger instance
- 返回类型:
DICOM 处理
DICOM utilities for reading and extracting DICOM file information
- habit.utils.dicom_utils.get_dicom_files(input_path: str | Path, recursive: bool = True) List[Path][源代码]
Get all DICOM files from a directory or file path. Uses io_utils functionality to handle YAML config files or directory paths.
- 参数:
input_path -- Path to DICOM directory, file, or YAML config file
recursive -- Whether to search recursively in subdirectories
- 返回:
List of Path objects pointing to DICOM files
- habit.utils.dicom_utils.get_one_dicom_per_folder(input_path: str | Path, dicom_extensions: set | None = None, include_no_extension: bool = False, num_workers: int | None = None, max_depth: int | None = None) List[Path][源代码]
Fast method to get one DICOM file per folder by traversing directories first. This is much faster than recursively finding all DICOM files when there are hundreds of thousands of files.
Uses multi-threading for parallel I/O operations to significantly speed up scanning.
Two strategies based on max_depth:
When max_depth is None (unlimited): - Traverse all directories and find one DICOM per folder
When max_depth is specified (FAST MODE): - Quickly locate folders at the target depth (no deep scanning) - For each target folder, find ONE DICOM file (may be in subfolders) - Stop searching each folder as soon as a DICOM is found - This is extremely fast because it only reads the minimum required
- 参数:
input_path -- Path to root directory to search
dicom_extensions -- Set of valid DICOM extensions (with dot, lowercase). Default: {'.dcm', '.dicom'}
include_no_extension -- If True, also check files without extensions. These will be validated by reading DICOM magic bytes. Default: False
num_workers -- Number of worker threads for parallel processing. Default: min(32, cpu_count + 4) for I/O bound tasks
max_depth -- Target depth for folder search. When specified, finds folders at this depth and gets ONE DICOM from each (searching into subfolders if needed, but stopping as soon as one is found). 0 = only search in root directory 1 = root + immediate subdirectories 2 = root + 2 levels of subdirectories None = unlimited depth, one DICOM per folder (default) Example: For structure root/patient/study/series/*.dcm, use max_depth=2 to get one DICOM per study (faster than scanning all series folders).
- 返回:
List of Path objects, one DICOM file per target folder
- habit.utils.dicom_utils.read_dicom_tags(dicom_path: str | Path, tags: List[str | int | tuple] | None = None, force: bool = True) Dict[str, Any][源代码]
Read specified DICOM tags from a DICOM file.
- 参数:
dicom_path -- Path to DICOM file
tags -- List of tags to read. Can be: - Tag names (e.g., 'PatientName', 'StudyDate') - Tag numbers (e.g., 0x00100010) - Tag tuples (e.g., (0x0010, 0x0010)) If None, reads all standard tags
force -- Whether to force reading even if file is not a valid DICOM
- 返回:
Dictionary mapping tag names/numbers to values
- habit.utils.dicom_utils.batch_read_dicom_info(input_path: str | Path, tags: List[str | int | tuple] | None = None, recursive: bool = True, output_file: str | Path | None = None, output_format: str = 'csv', group_by_series: bool = True, one_file_per_folder: bool = False, dicom_extensions: set | None = None, include_no_extension: bool = False, num_workers: int | None = None, max_depth: int | None = None) DataFrame[源代码]
Batch read DICOM information from multiple files. Uses io_utils functionality to handle YAML config files or directory paths. Uses multi-threading for parallel I/O operations to significantly speed up scanning.
- 参数:
input_path -- Path to DICOM directory, file, or YAML config file
tags -- List of tags to read. If None, reads standard tags
recursive -- Whether to search recursively in subdirectories (only used when one_file_per_folder=False)
output_file -- Optional path to save results. If None, results are not saved
output_format -- Format to save results ('csv', 'excel', 'json')
group_by_series -- If True, group files by SeriesInstanceUID and only read one file per series. If False, read all files. Default is True.
one_file_per_folder -- If True, only take one DICOM file per folder to speed up scanning. This uses a fast directory traversal method instead of listing all files, which is much faster when there are hundreds of thousands of DICOM files. Useful when each folder contains exactly one series. Note: When enabled, --recursive is ignored (always recursive with depth control).
dicom_extensions -- Set of valid DICOM file extensions (e.g., {'.dcm', '.dicom', '.ima'}). Only used when one_file_per_folder=True. Default: {'.dcm', '.dicom'}
include_no_extension -- If True, also check files without extensions by reading DICOM magic bytes. Only used when one_file_per_folder=True. This is useful for some medical devices that produce DICOM files without file extensions. Default: False
num_workers -- Number of worker threads for parallel processing. Default: min(32, cpu_count + 4) for I/O bound tasks. Set to 1 to disable parallel processing.
max_depth -- Maximum recursion depth for directory traversal. Only used when one_file_per_folder=True. 0 = only search in root directory 1 = root + immediate subdirectories None = unlimited depth (default) Example: For typical DICOM structure (root/patient/study/series/), set max_depth=3 to search up to the series level.
- 返回:
DataFrame with DICOM information, one row per series (if group_by_series=True) or per file
- habit.utils.dicom_utils.list_available_tags(dicom_path: str | Path, num_samples: int = 1) List[str][源代码]
List all available tags in DICOM file(s).
- 参数:
dicom_path -- Path to DICOM file or directory
num_samples -- Number of files to sample (if directory)
- 返回:
List of available tag names
- class habit.utils.image_converter.ImageConverter[源代码]
基类:
objectUtility class for converting between different image formats.
- static get_metadata(meta_dict: Dict[str, Any], ndim: int) Tuple[tuple, tuple, tuple][源代码]
Extract and validate metadata from dictionary.
- static tensor_to_numpy(tensor) ndarray[源代码]
Convert torch tensor to numpy array.
- 参数:
tensor -- Input tensor in format [C,Z,Y,X] or [C,H,W].
- 返回:
Numpy array with channel dimension removed if single channel.
- 返回类型:
np.ndarray
- 抛出:
ImportError -- If torch is not installed.
- static numpy_to_tensor(array: ndarray, dtype=None, device=None)[源代码]
Convert numpy array to torch tensor.
- 参数:
array (np.ndarray) -- Input array in [Z,Y,X] format.
dtype -- Target tensor dtype (requires torch).
device -- Target tensor device (requires torch).
- 返回:
Torch tensor with added channel dimension [1,Z,Y,X].
- 返回类型:
torch.Tensor
- 抛出:
ImportError -- If torch is not installed.
并行处理 (Parallel Processing)
Parallel processing utilities for HABIT project.
This module provides a unified interface for parallel and sequential processing, eliminating code duplication across different modules that need multiprocessing.
- class habit.utils.parallel_utils.ProcessingResult(item_id: Any, result: Any | None = None, error: Exception | None = None)[源代码]
基类:
objectContainer for processing result with error handling.
- item_id
Identifier for the processed item
- Type:
Any
- result
The processing result (None if failed)
- Type:
Any | None
- success
Whether processing was successful
- habit.utils.parallel_utils.parallel_map(func: Callable[[T], R], items: Iterable[T], n_processes: int = 1, desc: str = 'Processing', logger: Logger | None = None, show_progress: bool = True, log_file_path: Path | None = None, log_level: int = 20) Tuple[List[ProcessingResult], List[Any]][源代码]
Apply a function to items in parallel or sequentially with unified interface.
This function provides: - Automatic switching between parallel and sequential processing - Progress bar display - Error collection without stopping processing - Logging restoration in child processes (Windows compatibility)
- 参数:
func -- Function to apply to each item. Should return (item_id, result) tuple or just result. If processing fails, can return (item_id, Exception).
items -- Iterable of items to process
n_processes -- Number of parallel processes (1 = sequential)
desc -- Description for progress bar
logger -- Logger for status messages
show_progress -- Whether to show progress bar
log_file_path -- Path to log file for child process logging restoration
log_level -- Logging level for child processes
- 返回:
List of successful ProcessingResult objects
List of failed item IDs
- 返回类型:
Tuple[List[ProcessingResult], List[Any]]
示例
>>> def process_subject(subject_id): ... # Do processing ... return subject_id, processed_data >>> >>> results, failed = parallel_map( ... process_subject, ... subject_list, ... n_processes=4, ... desc="Processing subjects" ... )
- habit.utils.parallel_utils.parallel_map_simple(func: Callable[[T], R], items: Iterable[T], n_processes: int = 1, desc: str = 'Processing', show_progress: bool = True) Generator[R, None, None][源代码]
Simplified parallel map that yields results directly.
This is a simpler alternative to parallel_map when you don't need detailed error tracking. Results are yielded as they complete.
- 参数:
func -- Function to apply to each item
items -- Iterable of items to process
n_processes -- Number of parallel processes (1 = sequential)
desc -- Description for progress bar
show_progress -- Whether to show progress bar
- 生成器:
Results from the function (may include exceptions)
示例
>>> for result in parallel_map_simple(process_fn, items, n_processes=4): ... if isinstance(result, Exception): ... handle_error(result) ... else: ... handle_success(result)
- class habit.utils.parallel_utils.ParallelProcessor(n_processes: int = 1, logger: Logger | None = None)[源代码]
基类:
objectContext manager for parallel processing with automatic resource management.
This class provides a cleaner interface for batch parallel processing with proper resource cleanup and logging configuration.
示例
>>> with ParallelProcessor(n_processes=4) as processor: ... results = processor.map(process_fn, items, desc="Processing")
- __init__(n_processes: int = 1, logger: Logger | None = None)[源代码]
Initialize parallel processor.
- 参数:
n_processes -- Number of parallel processes
logger -- Logger for status messages
- __enter__() ParallelProcessor[源代码]
Enter context manager.
- map(func: Callable[[T], R], items: Iterable[T], desc: str = 'Processing', show_progress: bool = True) Tuple[List[ProcessingResult], List[Any]][源代码]
Map function over items using the processor's configuration.
- 参数:
func -- Function to apply to each item
items -- Iterable of items to process
desc -- Description for progress bar
show_progress -- Whether to show progress bar
- 返回:
Tuple of (successful_results, failed_items)
日志与监控 (Logging & Monitoring)
Centralized logging utility module for HABIT project.
This module provides a unified logging system with the following features: - Hierarchical logger management - Single log file per run (no duplicate logs folders) - Console and file output with different formats - Thread-safe logger initialization - Clear separation between main logs and module logs
Design principles: 1. One log file per application/script run 2. All logs stored in {output_dir}/processing.log (no logs/ subfolder) 3. Hierarchical logger names (habit.preprocessing, habit.habitat, etc.) 4. Console output: simple format for readability 5. File output: detailed format with file location and line numbers
- class habit.utils.log_utils.LoggerManager[源代码]
基类:
objectCentralized logger manager for the HABIT project.
This class ensures consistent logging across all modules with: - Single point of configuration - No duplicate handlers - Hierarchical logger structure
- setup_root_logger(log_file: Path | None = None, level: int = 20, console_level: int | None = None, append_mode: bool = False) Logger[源代码]
Setup the root logger for HABIT project.
This should be called once at the start of each application/script. All subsequent module loggers will inherit from this configuration.
- 参数:
log_file -- Path to the log file. If None, only console logging is enabled.
level -- Logging level for file output (default: INFO)
console_level -- Logging level for console output. If None, uses same as level.
append_mode -- If True, append to existing log file instead of overwriting. Used by child processes in multiprocessing to avoid overwriting.
- 返回:
The root logger for HABIT project
- 返回类型:
- habit.utils.log_utils.setup_logger(name: str, output_dir: Path | None = None, log_filename: str = 'processing.log', level: int = 20, console_level: int | None = None) Logger[源代码]
Setup a logger for a HABIT module or script.
This is the main entry point for setting up logging in HABIT applications.
- 参数:
name -- Name of the module/script (e.g., 'preprocessing', 'habitat')
output_dir -- Directory where log file will be created. If None, only console logging.
log_filename -- Name of the log file (default: 'processing.log')
level -- Logging level for file output (default: INFO)
console_level -- Logging level for console. If None, uses same as level.
- 返回:
Configured logger instance
- 返回类型:
示例
# In a script or CLI command: logger = setup_logger('preprocessing', output_dir=Path('/output'), level=logging.INFO) logger.info('Processing started')
# In a module: logger = get_module_logger(__name__) logger.debug('Debug information')
- habit.utils.log_utils.get_module_logger(module_name: str) Logger[源代码]
Get a logger for a module.
This should be called in modules that don't initialize logging themselves. The module will use the logging configuration set by the main script/CLI command.
- 参数:
module_name -- The __name__ of the module
- 返回:
Logger instance for the module
- 返回类型:
示例
# At the top of a module file: from habit.utils.log_utils import get_module_logger logger = get_module_logger(__name__)
- habit.utils.log_utils.disable_external_loggers()[源代码]
Disable verbose logging from external libraries.
Many libraries (like SimpleITK, scikit-learn) can be very verbose. This function sets them to WARNING level to reduce noise.
- habit.utils.log_utils.restore_logging_in_subprocess(log_file_path: Path | None = None, log_level: int = 20) None[源代码]
Restore logging configuration in a child process.
In Windows spawn mode (and forkserver), child processes don't inherit the parent's logging configuration. This function should be called at the beginning of any function that runs in a child process.
- 参数:
log_file_path -- Path to the log file (should be passed from parent process)
log_level -- Logging level (should be passed from parent process)
示例
# In parent process, store the log config: self._log_file_path = LoggerManager().get_log_file() self._log_level = logging.INFO
# In child process function: def process_in_child(self, data):
restore_logging_in_subprocess(self._log_file_path, self._log_level) # ... rest of processing
- habit.utils.log_utils.setup_output_logger(output_dir: Path, name: str, level: int = 20) Logger[源代码]
Legacy function for backward compatibility.
- 参数:
output_dir -- Directory where log file will be created
name -- Name of the logger
level -- Logging level
- 返回:
Configured logger instance
- 返回类型:
Progress bar utilities, providing custom progress bar display
- class habit.utils.progress_utils.CustomTqdm(total: int | None = None, desc: str = 'Progress')[源代码]
基类:
objectCustom progress bar class, used as an alternative to tqdm
This class is designed to be safe for multi-processing environments.
可视化工具 (Visualization Utils)
Visualization utilities for habitat analysis
- habit.utils.visualization.plot_cluster_scores(scores_dict: Dict[str, List[float]], cluster_range: List[int], methods: str | List[str] | None = None, clustering_algorithm: str = 'kmeans', figsize: Tuple[int, int] = (10, 10), outdir: str | None = None, save_path: str | None = None, show: bool = True, dpi: int = 600, best_n_clusters: Dict[str, int] | None = None)[源代码]
Plot the scoring curves for cluster evaluation
- 参数:
scores_dict -- Dictionary of scores, with method names as keys and score lists as values
cluster_range -- Range of cluster numbers to evaluate
methods -- Methods to plot, can be a string or list of strings, None means plot all methods
clustering_algorithm -- Name of the clustering algorithm
figsize -- Size of the figure
outdir -- Directory to save figures, None means do not save
save_path -- Explicit file path to save a single figure (overrides outdir)
show -- Whether to display the figure
dpi -- Image resolution
best_n_clusters -- Precomputed best cluster number per method to mark on the plot
- habit.utils.visualization.plot_elbow_curve(cluster_range, scores, score_type, title=None, save_path=None)[源代码]
Plot the elbow curve
- 参数:
cluster_range -- Range of cluster numbers
scores -- Corresponding scores
score_type -- Type of score for title and y-axis label
title -- Figure title, automatically generated if None
save_path -- Path to save the figure, do not save if None
- habit.utils.visualization.plot_multiple_scores(cluster_range, scores_dict, title=None, save_path=None)[源代码]
Plot multiple scoring methods on the same graph
- 参数:
cluster_range -- Range of cluster numbers
scores_dict -- Dictionary with scoring method names as keys and score lists as values
title -- Figure title, automatically generated if None
save_path -- Path to save the figure, do not save if None
- habit.utils.visualization.plot_cluster_results(X, labels, centers=None, title=None, feature_names=None, save_path=None, show=False, dpi=600, plot_3d=False, explained_variance=None, figsize: Tuple[int, int] | None = None, alpha: float = 0.7, marker_size: int = 20, marker: str = 'o', center_marker: str = 'X', center_size: int = 50, center_color: str = 'red', cmap: str = 'tab10', reduction_method: str = 'pca', show_colorbar: bool = True, show_grid: bool = True, grid_alpha: float = 0.3, max_legend_items: int = 10)[源代码]
Plot scatter plot of clustering results (2D or 3D)
- 参数:
X -- Input data, shape (n_samples, n_features)
labels -- Cluster labels, shape (n_samples,)
centers -- Cluster centers, plotted if not None
title -- Figure title
feature_names -- Feature names for x and y axis labels
save_path -- Path to save the figure, do not save if None
show -- Whether to display the figure (default False for batch processing)
dpi -- Image resolution (default 600)
plot_3d -- Whether to plot 3D scatter plot (default False)
explained_variance -- Explained variance ratio from PCA (for title)
figsize -- Figure size as (width, height), default (6, 5) for 2D, (7, 6) for 3D
alpha -- Transparency of scatter points (0-1, default 0.7)
marker_size -- Size of scatter points (default 20)
marker -- Marker style for data points (default 'o')
center_marker -- Marker style for cluster centers (default 'X')
center_size -- Size of center markers (default 50)
center_color -- Color of center markers (default 'red')
cmap -- Colormap for clusters, 'tab10' is good for discrete categories (default 'tab10')
reduction_method -- Dimensionality reduction method, 'pca' or 'tsne' (default 'pca')
show_colorbar -- Whether to show colorbar (default True)
show_grid -- Whether to show grid (default True)
max_legend_items -- Maximum number of legend items to show, hide legend if exceeded (default 10)
grid_alpha -- Transparency of grid lines (default 0.3)
数学与计算 (Math & Calculation)
- habit.utils.dice_calculator.compute_dice(mask1_path: str, mask2_path: str, label_id: int = 1) float[源代码]
Calculate Dice coefficient between two masks for a specific label using numpy arrays.
This function handles cases where masks have different physical space information (origin, spacing, direction, size) by resampling mask2 to match mask1's physical space.
- 参数:
mask1_path -- Path to the first mask image
mask2_path -- Path to the second mask image
label_id -- Label ID to calculate Dice for (default: 1)
- 返回:
Dice coefficient value (0.0 to 1.0), or NaN if an error occurs
- habit.utils.dice_calculator.run_dice_calculation(input1, input2, output, mask_keyword, label_id)[源代码]
Calculate Dice coefficient between two batches of images (ROI/mask).
This tool compares masks from two sources (directories or config files) and computes the Dice coefficient. It matches files based on Subject ID and Mask Type (subfolder name).