"""
Model Evaluation Module
Provides functions for model training, evaluation, and result analysis
"""
import os
import json
from typing import Dict, List, Tuple, Union, Any, Optional, Literal
import numpy as np
import pandas as pd
from .metrics import calculate_metrics, calculate_metrics_youden, delong_roc_ci
from ..visualization.plotting import Plotter
from ..statistics.delong_test import delong_roc_test, delong_roc_ci
from habit.utils.log_utils import get_module_logger
[文档]
class ModelEvaluator:
[文档]
def __init__(self, output_dir: str):
"""
Initialize the model evaluator.
Args:
output_dir (str): Directory where evaluation results and plots will be saved
"""
self.output_dir = output_dir
self.plotter = Plotter(self.output_dir)
self.logger = get_module_logger('evaluation.model')
[文档]
def evaluate(self, model: Any, X: pd.DataFrame, y: pd.Series,
dataset_name: str = "test") -> Dict[str, Any]:
"""
Evaluate a single model on a single dataset.
Args:
model (Any): Trained model with predict and predict_proba methods
X (pd.DataFrame): Feature data
y (pd.Series): Label data
dataset_name (str): Name of the dataset
Returns:
Dict[str, Any]: Dictionary containing evaluation results
"""
# Prediction
y_pred = model.predict(X)
y_pred_proba = model.predict_proba(X)
# Handle potential 2D probability results
if isinstance(y_pred_proba, np.ndarray) and y_pred_proba.ndim == 2:
y_pred_proba = y_pred_proba[:, 1]
# Convert data format
y_values = y.values if hasattr(y, 'values') else y
# Calculate metrics
metrics = calculate_metrics(y_values, y_pred, y_pred_proba)
# Return results (same format as before)
return {
'metrics': metrics,
'y_true': y_values.tolist() if hasattr(y_values, 'tolist') else list(y_values),
'y_pred': y_pred.tolist() if hasattr(y_pred, 'tolist') else list(y_pred),
'y_pred_proba': y_pred_proba.tolist() if hasattr(y_pred_proba, 'tolist') else list(y_pred_proba)
}
[文档]
def plot_curves(
self,
model_data: Dict[str, Dict[str, Dict[str, List]]],
curve_type: Literal['roc', 'dca', 'calibration', 'pr', 'all'] = 'all',
title: str = 'evaluation',
output_dir: Optional[str] = None,
prefix: str = '',
n_bins: int = 10
) -> Dict[str, str]:
"""
Plot various evaluation curves using methods from plotting.py
Args:
model_data (Dict): Dictionary containing model evaluation results
Format: {dataset_name: {model_name: {'y_true': [...], 'y_pred_proba': [...]}}}
curve_type (Literal): Type of curve to plot, can be 'roc', 'dca', 'calibration', 'pr', or 'all'
title (str): Keyword for chart title
output_dir (Optional[str]): Output directory, defaults to self.output_dir
prefix (str): Prefix for output filenames
n_bins (int): Number of bins for calibration curve
Returns:
Dict[str, str]: Dictionary containing paths to generated chart files
"""
# Ensure output directory exists
if output_dir is None:
output_dir = self.output_dir
os.makedirs(output_dir, exist_ok=True)
# Create or use Plotter instance
plotter = self.plotter
if output_dir != self.output_dir:
plotter = Plotter(output_dir)
# Used to store paths to generated chart files
result_files = {}
# Generate charts for each dataset (e.g., train or test)
for dataset, models in model_data.items():
# Convert current dataset data to plotting format
plotting_data = {}
for model_name, data in models.items():
y_true = np.array(data.get('y_true', []))
y_pred_proba = np.array(data.get('y_pred_proba', []))
plotting_data[model_name] = (y_true, y_pred_proba)
# Generate file name prefix for current dataset
dataset_prefix = f"{prefix}{dataset}_" if prefix else f"{dataset}_"
dataset_title = f"{title}_{dataset}"
# Plot ROC curve
if curve_type in ['roc', 'all']:
roc_filename = f'{dataset_prefix}roc_curve.pdf'
plotter.plot_roc_v2(plotting_data, save_name=roc_filename, title=dataset_title)
result_files[f'roc_{dataset}'] = os.path.join(output_dir, roc_filename)
# Plot decision curve
if curve_type in ['dca', 'all']:
dca_filename = f'{dataset_prefix}decision_curve.pdf'
plotter.plot_dca_v2(plotting_data, save_name=dca_filename, title=dataset_title)
result_files[f'dca_{dataset}'] = os.path.join(output_dir, dca_filename)
# Plot calibration curve
if curve_type in ['calibration', 'all']:
calibration_filename = f'{dataset_prefix}calibration_curve.pdf'
plotter.plot_calibration_v2(plotting_data, save_name=calibration_filename, title=dataset_title, n_bins=n_bins)
result_files[f'calibration_{dataset}'] = os.path.join(output_dir, calibration_filename)
# Plot precision-recall curve
if curve_type in ['pr', 'all']:
pr_filename = f'{dataset_prefix}precision_recall_curve.pdf'
plotter.plot_pr_curve(plotting_data, save_name=pr_filename, title=dataset_title)
result_files[f'pr_{dataset}'] = os.path.join(output_dir, pr_filename)
return result_files
[文档]
def compare_models(self, test_data: Dict[str, Tuple[List, List]]) -> None:
"""
Compare the performance of multiple models (using DeLong test)
Args:
test_data (Dict[str, Tuple[List, List]]): Test data dictionary, keys are model names,
values are (y_true, y_pred_proba) tuples
"""
self.logger.info("="*80)
self.logger.info("Model AUC Comparison (DeLong test)")
self.logger.info("="*80)
# Get list of model names
model_names = list(test_data.keys())
# Used to store comparison results
comparison_results = []
# Compare each pair of models
for i in range(len(model_names)):
for j in range(i+1, len(model_names)):
model1 = model_names[i]
model2 = model_names[j]
y_true = test_data[model1][0]
y_pred1 = test_data[model1][1]
y_pred2 = test_data[model2][1]
# Ensure all data are numpy array format
y_true = np.array(y_true)
y_pred1 = np.array(y_pred1)
y_pred2 = np.array(y_pred2)
# Perform DeLong test
p_value = delong_roc_test(y_true, y_pred1, y_pred2)
p_value = p_value[0][0]
# Calculate AUC and confidence interval for each model
auc1, ci1 = delong_roc_ci(y_true, y_pred1)
auc2, ci2 = delong_roc_ci(y_true, y_pred2)
# Store results
comparison_result = {
'comparison': f"{model1} vs {model2}",
f'{model1}_auc': float(auc1),
f'{model1}_ci_lower': float(ci1[0]),
f'{model1}_ci_upper': float(ci1[1]),
f'{model2}_auc': float(auc2),
f'{model2}_ci_lower': float(ci2[0]),
f'{model2}_ci_upper': float(ci2[1]),
'p_value': float(p_value),
'significant_difference': bool(p_value < 0.05),
'conclusion': f"{model1} and {model2} AUC exists significant difference (p<0.05)" if p_value < 0.05 else f"{model1} and {model2} AUC no significant difference (p≥0.05)"
}
comparison_results.append(comparison_result)
# Log results
self.logger.info(f"Comparison: {model1} vs {model2}")
self.logger.info(f"{model1} AUC: {auc1:.4f} (95% CI: {ci1[0]:.4f}-{ci1[1]:.4f})")
self.logger.info(f"{model2} AUC: {auc2:.4f} (95% CI: {ci2[0]:.4f}-{ci2[1]:.4f})")
self.logger.info(f"DeLong test p-value: {p_value:.4f}")
if p_value < 0.05:
self.logger.info(f"Conclusion: Significant difference in AUC between {model1} and {model2} (p<0.05)")
else:
self.logger.info(f"Conclusion: No significant difference in AUC between {model1} and {model2} (p≥0.05)")
self.logger.info("-"*80)
# Save comparison results
comparison_file = os.path.join(self.output_dir, 'delong_comparison.json')
with open(comparison_file, 'w', encoding='utf-8') as f:
json.dump(comparison_results, f, ensure_ascii=False, indent=4)
def _print_performance_table(self, results: Dict[str, Any]) -> None:
"""
Print model performance table using dynamically registered metrics.
"""
from .metrics import METRIC_REGISTRY
self.logger.info("="*80)
self.logger.info("Model Performance Evaluation Table")
self.logger.info("="*80)
# Get available model names
available_models = set()
if 'train' in results: available_models.update(results['train'].keys())
if 'test' in results: available_models.update(results['test'].keys())
if not available_models:
self.logger.warning("No model results available")
return
header = ["Metric"]
for model_name in sorted(available_models):
if 'train' in results: header.append(f"{model_name} (Train)")
if 'test' in results: header.append(f"{model_name} (Test)")
self.logger.info(" | ".join([f"{h:^15}" for h in header]))
self.logger.info("-"*80)
# Dynamically iterate through all registered metrics
for m_id, info in METRIC_REGISTRY.items():
row = [info['display_name']]
for model_name in sorted(available_models):
if 'train' in results:
val = results['train'].get(model_name, {}).get('metrics', {}).get(m_id, 'N/A')
row.append(f"{val:.4f}" if isinstance(val, (int, float)) and not np.isnan(val) else str(val))
if 'test' in results:
val = results['test'].get(model_name, {}).get('metrics', {}).get(m_id, 'N/A')
row.append(f"{val:.4f}" if isinstance(val, (int, float)) and not np.isnan(val) else str(val))
self.logger.info(" | ".join([f"{cell:^15}" for cell in row]))
self.logger.info("="*80)
def _save_performance_table(self, results: Dict[str, Any], filename: str = "performance_table.csv") -> None:
"""
Save model performance table to CSV file using registered metrics.
"""
from .metrics import METRIC_REGISTRY
# Get available model names
available_models = set()
if 'train' in results: available_models.update(results['train'].keys())
if 'test' in results: available_models.update(results['test'].keys())
if not available_models:
self.logger.warning("No model results available for saving")
return
performance_data = []
for m_id, info in METRIC_REGISTRY.items():
row_data = {
'Metric': info['display_name'],
'Metric_Code': m_id
}
for model_name in sorted(available_models):
if 'train' in results:
row_data[f"{model_name}_Train"] = results['train'].get(model_name, {}).get('metrics', {}).get(m_id, np.nan)
if 'test' in results:
row_data[f"{model_name}_Test"] = results['test'].get(model_name, {}).get('metrics', {}).get(m_id, np.nan)
performance_data.append(row_data)
df = pd.DataFrame(performance_data)
output_path = os.path.join(self.output_dir, filename)
df.to_csv(output_path, index=False)
self.logger.info(f"Performance table saved to: {output_path}")
# Also save a detailed summary with additional statistics
detailed_filename = filename.replace('.csv', '_detailed.csv')
self._save_detailed_performance_summary(results, detailed_filename)
def _save_detailed_performance_summary(self, results: Dict[str, Any], filename: str = "performance_detailed.csv") -> None:
"""
Save detailed performance summary with dynamic metrics.
"""
from .metrics import METRIC_REGISTRY
available_models = set()
if 'train' in results: available_models.update(results['train'].keys())
if 'test' in results: available_models.update(results['test'].keys())
if not available_models: return
detailed_data = []
for dataset_type in ['train', 'test']:
if dataset_type not in results: continue
for model_name in sorted(available_models):
if model_name not in results[dataset_type]: continue
metrics = results[dataset_type][model_name].get('metrics', {})
row = {'Model': model_name, 'Dataset': dataset_type.capitalize()}
# Add all registered metrics
for m_id, info in METRIC_REGISTRY.items():
row[info['display_name'].replace(' ', '_')] = metrics.get(m_id, np.nan)
# Add specialized interpretations
hl_p = metrics.get('hosmer_lemeshow_p_value', np.nan)
if not np.isnan(hl_p):
row['Hosmer_Lemeshow_Interpretation'] = "Good calibration (p≥0.05)" if hl_p >= 0.05 else "Poor calibration (p<0.05)"
sp_p = metrics.get('spiegelhalter_z_p_value', np.nan)
if not np.isnan(sp_p):
row['Spiegelhalter_Z_Interpretation'] = "Good calibration (p≥0.05)" if sp_p >= 0.05 else "Poor calibration (p<0.05)"
detailed_data.append(row)
df_detailed = pd.DataFrame(detailed_data)
output_path = os.path.join(self.output_dir, filename)
df_detailed.to_csv(output_path, index=False)
self.logger.info(f"Detailed performance summary saved to: {output_path}")
[文档]
class MultifileEvaluator:
[文档]
def __init__(self, output_dir: str) -> None:
"""
初始化多文件评估器
Args:
output_dir (str): 图表输出目录
"""
self.output_dir = output_dir
self.plotter = Plotter(output_dir)
self.data = None
self.models_data = {}
self.label_col = None
self.subject_id_col = None
self.logger = get_module_logger('evaluation.multifile')
[文档]
def read_prediction_files(self, files_config: List[Dict]) -> 'MultifileEvaluator':
"""
从多个文件读取预测结果
Args:
files_config (List[Dict]): 文件配置列表,每个元素包含:
- path: 文件路径
- model_name: 模型名称
- subject_id_col: 受试者ID列名
- label_col: 真实标签列名
- prob_col: 预测概率列名
- pred_col: 预测标签列名(可选)
Returns:
MultifileEvaluator: 自身实例,用于方法链式调用
"""
# Simple and clear data fusion approach
self.logger.info(f"Reading data from multiple files: {len(files_config)} files")
# Step 1: Read and standardize all files
standardized_dfs = []
original_subject_id_col = None
original_label_col = None
used_model_names = set()
for idx, file_config in enumerate(files_config):
# Convert dict to ComparisonFileConfig to trigger validators
# This ensures name -> model_name mapping and proper defaults
from ..config_schemas import ComparisonFileConfig
if not isinstance(file_config, ComparisonFileConfig):
config_obj = ComparisonFileConfig(**file_config)
else:
config_obj = file_config
file_path = config_obj.path
# After validation, model_name is guaranteed to be set
model_name = config_obj.model_name
model_name = self._ensure_unique_model_name(model_name, used_model_names)
subject_id_col = config_obj.subject_id_col
label_col = config_obj.label_col
prob_col = config_obj.prob_col
pred_col = config_obj.pred_col
# Check required parameters
if not all([subject_id_col, label_col, prob_col]):
raise ValueError(f"Missing required columns for file {file_path}")
# Store original column names from first file
if idx == 0:
original_subject_id_col = subject_id_col
original_label_col = label_col
self.logger.info(f"Reading file: {file_path}")
self.logger.info(f"Model name: {model_name}, Subject ID: {subject_id_col}, Label: {label_col}, Prob: {prob_col}")
# Read file and check columns exist
df = pd.read_csv(file_path)
required_cols = [subject_id_col, label_col, prob_col]
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
raise ValueError(f"Missing columns {missing_cols} in file {file_path}")
# Standardize dataframe: use consistent column names for merging
standardized_df = pd.DataFrame({
'subject_id': df[subject_id_col].astype(str),
'label': df[label_col],
f'{model_name}_prob': df[prob_col]
})
# Add prediction column if specified and exists
if pred_col and pred_col in df.columns:
standardized_df[f'{model_name}_pred'] = df[pred_col]
standardized_dfs.append((model_name, standardized_df))
# Step 2: Merge all dataframes using standardized column names
self.logger.info("Merging all datasets...")
merged_df = None
for model_name, df in standardized_dfs:
if merged_df is None:
# First dataframe becomes the base, set original column names
merged_df = df.copy()
self.subject_id_col = original_subject_id_col
self.label_col = original_label_col
else:
# Merge subsequent dataframes on standardized column names
merge_cols = ['subject_id', 'label']
prob_cols = [col for col in df.columns if col.endswith('_prob') or col.endswith('_pred')]
df_to_merge = df[merge_cols + prob_cols]
merged_df = merged_df.merge(df_to_merge, on=merge_cols, how='outer')
# Step 3: Set subject_id as index and save data
merged_df.set_index('subject_id', inplace=True)
# Reorder columns: label first, then all model columns
label_col_list = ['label']
model_cols = [col for col in merged_df.columns if col != 'label']
merged_df = merged_df[label_col_list + model_cols]
self.data = merged_df
# Prepare models_data dictionary for plotting module
# Keys are model names, values are (y_true, y_pred_proba, y_pred) tuples
for model_name, _ in standardized_dfs:
prob_column_name = f"{model_name}_prob"
pred_column_name = f"{model_name}_pred"
if prob_column_name in self.data.columns:
y_true = self.data['label'].values
y_pred_proba = self.data[prob_column_name].values
# Check if prediction column exists
if pred_column_name in self.data.columns:
y_pred = self.data[pred_column_name].values
self.models_data[model_name] = (y_true, y_pred_proba, y_pred)
else:
# Fallback to tuple without y_pred
self.models_data[model_name] = (y_true, y_pred_proba)
return self
def _ensure_unique_model_name(self, model_name: str, used_names: set) -> str:
"""
Ensure model names are unique to avoid column collisions in merged data.
Args:
model_name: Original model name from config
used_names: Set of already used model names
Returns:
Unique model name
"""
base_name = str(model_name).strip()
if not base_name:
base_name = "model"
unique_name = base_name
suffix = 2
while unique_name in used_names:
unique_name = f"{base_name}_{suffix}"
suffix += 1
if unique_name != base_name:
self.logger.warning(
f"Duplicate model name '{base_name}' detected. Using '{unique_name}' instead."
)
used_names.add(unique_name)
return unique_name
[文档]
def save_merged_data(self, filename: str = "merged_predictions.csv") -> None:
"""
保存合并后的数据到CSV文件
Args:
filename (str): 输出文件名
"""
if self.data is not None:
# 创建一个包含索引的副本
output_df = self.data.copy()
output_df.reset_index(inplace=True) # 将索引变为常规列
# 打印列中NaN值的数量
nan_counts = output_df.isna().sum()
self.logger.info("NaN value statistics:")
for col, count in nan_counts.items():
if count > 0:
total = len(output_df)
percent = (count / total) * 100
self.logger.info(f" {col}: {count}/{total} ({percent:.2f}%)")
output_path = os.path.join(self.output_dir, filename)
output_df.to_csv(output_path, index=False)
self.logger.info(f"Merged data saved to {output_path}")
else:
self.logger.warning("No data to save. Please read prediction files first.")
[文档]
def plot_roc(self, save_name: str = "ROC.pdf", title: str = "evaluation") -> None:
"""
为所有模型绘制ROC曲线
Args:
save_name (str): 保存文件名
title (str): 图表标题
"""
if not self.models_data:
self.logger.warning("No models data available. Please read prediction files first.")
return
self.plotter.plot_roc_v2(self.models_data, save_name=save_name, title=title)
self.logger.info(f"ROC curve saved to {os.path.join(self.output_dir, save_name)}")
[文档]
def plot_dca(self, save_name: str = "DCA.pdf", title: str = "evaluation") -> None:
"""
为所有模型绘制决策曲线分析(DCA)
Args:
save_name (str): 保存文件名
title (str): 图表标题
"""
if not self.models_data:
self.logger.warning("No models data available. Please read prediction files first.")
return
self.plotter.plot_dca_v2(self.models_data, save_name=save_name, title=title)
self.logger.info(f"DCA curve saved to {os.path.join(self.output_dir, save_name)}")
[文档]
def plot_calibration(self, save_name: str = "Calibration.pdf", n_bins: int = 5, title: str = "evaluation") -> None:
"""
为所有模型绘制校准曲线
Args:
save_name (str): 保存文件名
n_bins (int): 校准曲线的分箱数
title (str): 图表标题
"""
if not self.models_data:
self.logger.warning("No models data available. Please read prediction files first.")
return
self.plotter.plot_calibration_v2(self.models_data, save_name=save_name, n_bins=n_bins, title=title)
self.logger.info(f"Calibration curve saved to {os.path.join(self.output_dir, save_name)}")
[文档]
def plot_pr_curve(self, save_name: str = "PR_curve.pdf", title: str = "evaluation") -> None:
"""
为所有模型绘制精确率-召回率曲线
Args:
save_name (str): 保存文件名
title (str): 图表标题
"""
if not self.models_data:
self.logger.warning("No models data available. Please read prediction files first.")
return
self.plotter.plot_pr_curve(self.models_data, save_name=save_name, title=title)
self.logger.info(f"PR curve saved to {os.path.join(self.output_dir, save_name)}")
[文档]
def run_delong_test(self, output_json: Optional[str] = "delong_test_results.json") -> List[Dict]:
"""
对所有模型对执行DeLong检验
Args:
output_json (Optional[str]): 输出JSON文件名,如不需要保存设为None
Returns:
List[Dict]: DeLong检验结果列表
"""
if not self.models_data or len(self.models_data) < 2:
self.logger.warning("Need at least two models for DeLong test.")
return []
results = []
model_names = list(self.models_data.keys())
# 获取真实标签
y_true = self.data['label'].values
# 执行成对比较
for i in range(len(model_names)):
for j in range(i + 1, len(model_names)):
model1 = model_names[i]
model2 = model_names[j]
# 创建一个包含共同有效数据的DataFrame
temp_df = pd.DataFrame({
'y_true': y_true,
f'{model1}_prob': self.data[f"{model1}_prob"].values,
f'{model2}_prob': self.data[f"{model2}_prob"].values
})
# 删除任何包含NaN的行
temp_df = temp_df.dropna()
if len(temp_df) == 0:
self.logger.warning(f"{model1} 和 {model2} 没有足够的共同有效样本进行DeLong检验")
continue
self.logger.info(f"执行DeLong检验: {model1} vs {model2},有效样本数: {len(temp_df)}")
# 获取清理后的数据
clean_y_true = temp_df['y_true'].values
clean_y_pred1 = temp_df[f'{model1}_prob'].values
clean_y_pred2 = temp_df[f'{model2}_prob'].values
# 计算AUC和置信区间
auc1, ci1 = delong_roc_ci(clean_y_true, clean_y_pred1)
auc2, ci2 = delong_roc_ci(clean_y_true, clean_y_pred2)
# 计算p值
p_value = delong_roc_test(clean_y_true, clean_y_pred1, clean_y_pred2)
# 创建比较结果
comparison_result = {
'comparison': f"{model1} vs {model2}",
f'{model1}_auc': float(auc1),
f'{model1}_ci_lower': float(ci1[0]),
f'{model1}_ci_upper': float(ci1[1]),
f'{model2}_auc': float(auc2),
f'{model2}_ci_lower': float(ci2[0]),
f'{model2}_ci_upper': float(ci2[1]),
'p_value': float(p_value),
'significant_difference': bool(p_value < 0.05),
'conclusion': f"{model1} and {model2} have significantly different AUCs (p<0.05)" if p_value < 0.05 else f"{model1} and {model2} do not have significantly different AUCs (p≥0.05)"
}
results.append(comparison_result)
# 输出结果
self.logger.info("DeLong Test Results:")
self.logger.info("=" * 50)
for result in results:
self.logger.info(f"{result['comparison']}")
self.logger.info(f"P-value: {result['p_value']:.4f}")
self.logger.info(f"Conclusion: {result['conclusion']}")
self.logger.info("AUCs with 95% CI:")
model1, model2 = result['comparison'].split(" vs ")
self.logger.info(f"{model1}: {result[f'{model1}_auc']:.3f} ({result[f'{model1}_ci_lower']:.3f}-{result[f'{model1}_ci_upper']:.3f})")
self.logger.info(f"{model2}: {result[f'{model2}_auc']:.3f} ({result[f'{model2}_ci_lower']:.3f}-{result[f'{model2}_ci_upper']:.3f})")
# 保存结果
if output_json:
import json
output_path = os.path.join(self.output_dir, output_json)
with open(output_path, 'w') as f:
json.dump(results, f, indent=4)
self.logger.info(f"Results saved to {output_path}")
return results