#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Automatic baseline model selection for EPR data.
This module provides intelligent automatic selection of the best baseline
correction model using statistical criteria (AIC, BIC, R²).
"""
import sys
from io import StringIO
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from ..logging_config import get_logger
logger = get_logger(__name__)
# Import correction functions from our new modules
from .correction import (
baseline_bi_exponential_1d,
baseline_polynomial_1d,
baseline_stretched_exponential_1d,
)
from .interactive import RegionSelector, is_interactive_available
def _calculate_model_metrics(y_data, y_pred, n_params, n_points):
"""
Calculate model selection metrics (AIC, BIC, R²).
Args:
y_data: Observed data
y_pred: Model predictions
n_params: Number of model parameters
n_points: Number of data points
Returns:
dict: Dictionary with 'aic', 'bic', 'r2', 'rss'
"""
residuals = y_data - y_pred
rss = np.sum(residuals**2)
# Avoid division by zero
if rss == 0:
rss = 1e-10
# R-squared
ss_tot = np.sum((y_data - np.mean(y_data)) ** 2)
r2 = 1 - rss / ss_tot if ss_tot > 0 else 0
# AIC and BIC
try:
aic = 2 * n_params + n_points * np.log(rss / n_points)
bic = n_params * np.log(n_points) + n_points * np.log(rss / n_points)
except (ValueError, OverflowError):
# Handle edge cases
aic = np.inf
bic = np.inf
return {"aic": aic, "bic": bic, "r2": r2, "rss": rss}
def _test_polynomial_models(
x, y, params, common_args, selection_criterion="aic", verbose=True
):
"""Test polynomial models of different orders and return the best one."""
best_result = None
best_criterion = np.inf if selection_criterion in ["aic", "bic"] else -np.inf
for order in [1, 2, 3, 4]:
try:
# Suppress output for cleaner comparison
old_stdout = sys.stdout
sys.stdout = StringIO()
try:
# Polynomial functions have different parameter names
poly_args = {
"manual_regions": common_args.get("manual_regions"),
"region_mode": common_args.get("region_mode"),
"interactive": False,
"exclude_center": (
True if not common_args.get("manual_regions") else False
),
"center_fraction": 0.3,
}
# Remove None values
poly_args = {k: v for k, v in poly_args.items() if v is not None}
corrected, baseline = baseline_polynomial_1d(
x, y, params, order=order, **poly_args
)
finally:
sys.stdout = old_stdout
# Calculate metrics
metrics = _calculate_model_metrics(y, baseline, order + 1, len(y))
criterion_value = metrics[selection_criterion]
# Check if this is the best so far
is_better = (
selection_criterion in ["aic", "bic"]
and criterion_value < best_criterion
) or (selection_criterion == "r2" and criterion_value > best_criterion)
if is_better:
best_criterion = criterion_value
best_result = {
"corrected": corrected,
"baseline": baseline,
"order": order,
"n_params": order + 1,
"metrics": metrics,
}
except Exception as e:
if verbose:
logger.warning(f" Polynomial order {order}: FAILED - {e}")
continue
return best_result
def _test_stretched_exponential_model(
x, y, params, common_args, selection_criterion="aic", verbose=True
):
"""Test stretched exponential model."""
try:
# Suppress output for cleaner comparison
old_stdout = sys.stdout
sys.stdout = StringIO()
try:
corrected, baseline = baseline_stretched_exponential_1d(
x, y, params, **common_args
)
finally:
sys.stdout = old_stdout
# Calculate metrics (4 parameters: A, tau, beta, offset)
metrics = _calculate_model_metrics(y, baseline, 4, len(y))
result = {
"corrected": corrected,
"baseline": baseline,
"n_params": 4,
"metrics": metrics,
}
return result
except Exception as e:
if verbose:
logger.warning(f" Stretched exponential model failed: {e}")
return None
def _test_bi_exponential_model(
x, y, params, common_args, selection_criterion="aic", verbose=True
):
"""Test bi-exponential model."""
try:
# Suppress output for cleaner comparison
old_stdout = sys.stdout
sys.stdout = StringIO()
try:
corrected, baseline = baseline_bi_exponential_1d(
x, y, params, **common_args
)
finally:
sys.stdout = old_stdout
# Calculate metrics (5 parameters: A1, tau1, A2, tau2, offset)
metrics = _calculate_model_metrics(y, baseline, 5, len(y))
result = {
"corrected": corrected,
"baseline": baseline,
"n_params": 5,
"metrics": metrics,
}
return result
except Exception as e:
if verbose:
logger.warning(f" Bi-exponential model failed: {e}")
return None
[docs]
def baseline_auto_1d(
x: Union[np.ndarray, None],
y: np.ndarray,
params: Optional[Dict[str, Any]] = None,
models: Optional[List[str]] = None,
selection_criterion: str = "aic",
use_real_part: bool = True,
exclude_initial: int = 0,
exclude_final: int = 0,
manual_regions: Optional[List[Tuple[float, float]]] = None,
region_mode: str = "exclude",
interactive: bool = False,
verbose: bool = True,
) -> Tuple[np.ndarray, np.ndarray, Dict[str, Any]]:
"""
Automatic baseline model selection and correction for 1D EPR data.
This function tests multiple baseline models and automatically selects
the best one based on information criteria (AIC/BIC) or other metrics.
Args:
x: X-axis data from eprload (can be None)
y: 1D spectral data array from eprload
params: Parameter dictionary from eprload (optional)
models: List of models to test. Options:
'polynomial', 'stretched_exponential',
'bi_exponential'
selection_criterion: 'aic' (Akaike), 'bic'
(Bayesian), or 'r2' (R-squared)
use_real_part: If True, fit only real part of
complex data
exclude_initial: Number of initial points to
exclude from fitting
exclude_final: Number of final points to
exclude from fitting
manual_regions: List of manually selected
regions as [(x1, x2), ...]
region_mode: 'exclude' to exclude
manual_regions, 'include' to use only
manual_regions
interactive: If True, open interactive region
selector
verbose: If True, print detailed model
comparison
Returns:
tuple: (corrected_data, best_baseline,
model_info) where model_info contains:
{'best_model': str, 'criteria': dict,
'parameters': dict}
Examples:
# Automatic model selection
corrected, baseline, info = baseline_auto_1d(
x, y, params)
print(f"Best model: {info['best_model']}")
# Restrict to specific models
corrected, baseline, info = baseline_auto_1d(
x, y, params,
models=['polynomial',
'stretched_exponential'])
# Use BIC for model selection
corrected, baseline, info = baseline_auto_1d(
x, y, params,
selection_criterion='bic')
"""
if models is None:
models = ["polynomial", "stretched_exponential", "bi_exponential"]
if y is None or y.ndim != 1:
raise ValueError("y must be a 1D array")
if selection_criterion not in ["aic", "bic", "r2"]:
raise ValueError("selection_criterion must be 'aic', 'bic', or 'r2'")
n_points = len(y)
# Handle complex data
if np.iscomplexobj(y):
if use_real_part:
y_fit = np.real(y)
if verbose:
logger.info("ℹ Using real part of complex data for fitting")
else:
y_fit = np.abs(y)
if verbose:
logger.info("ℹ Using magnitude of complex data for fitting")
else:
y_fit = y.copy()
# Create x-coordinates
if x is None or len(x) != n_points:
x_coords = np.arange(n_points, dtype=float)
if verbose:
logger.info("ℹ No x-axis provided, using index as x-coordinates")
else:
x_coords = x.astype(float)
# Interactive region selection if requested
selected_regions = manual_regions if manual_regions is not None else []
if interactive:
if not is_interactive_available():
logger.warning("Interactive selection may not" " work in this environment.")
if verbose:
logger.info("🖱️ Interactive region selection enabled...")
selector = RegionSelector()
selected_regions = selector.select_regions_1d(
x_coords,
y_fit,
f"Select regions to {region_mode.upper()} from automatic baseline fitting",
)
if verbose:
logger.info(f"✅ Selected {len(selected_regions)} regions")
# Prepare common arguments for all models
common_args = {
"use_real_part": use_real_part,
"exclude_initial": exclude_initial,
"exclude_final": exclude_final,
"manual_regions": selected_regions,
"region_mode": region_mode,
"interactive": False, # We already did interactive selection above
}
# Test each model
model_results = {}
if verbose:
logger.info(f"\n🧪 Testing {len(models)} baseline models...")
# Test polynomial model
if "polynomial" in models:
if verbose:
logger.info(" Testing polynomial baseline...")
poly_result = _test_polynomial_models(
x_coords, y_fit, params, common_args, selection_criterion, verbose
)
if poly_result is not None:
model_results["polynomial"] = poly_result
if verbose:
metrics = poly_result["metrics"]
crit = selection_criterion
val = metrics[crit]
order = poly_result["order"]
logger.info(
f" Polynomial (order {order}):" f" {crit.upper()}={val:.2f}"
)
# Test stretched exponential model
if "stretched_exponential" in models:
if verbose:
logger.info(" Testing stretched exponential baseline...")
stretch_result = _test_stretched_exponential_model(
x_coords, y_fit, params, common_args, selection_criterion, verbose
)
if stretch_result is not None:
model_results["stretched_exponential"] = stretch_result
if verbose:
metrics = stretch_result["metrics"]
crit = selection_criterion
val = metrics[crit]
logger.info(f" Stretched exponential:" f" {crit.upper()}={val:.2f}")
# Test bi-exponential model
if "bi_exponential" in models:
if verbose:
logger.info(" Testing bi-exponential baseline...")
bi_result = _test_bi_exponential_model(
x_coords, y_fit, params, common_args, selection_criterion, verbose
)
if bi_result is not None:
model_results["bi_exponential"] = bi_result
if verbose:
metrics = bi_result["metrics"]
crit = selection_criterion
val = metrics[crit]
logger.info(f" Bi-exponential:" f" {crit.upper()}={val:.2f}")
# Select best model
if not model_results:
raise ValueError("All baseline models failed. Check your data and parameters.")
# Find the best model based on the selection criterion
best_model = None
best_criterion = np.inf if selection_criterion in ["aic", "bic"] else -np.inf
criteria_dict = {}
for model_name, result in model_results.items():
criterion_value = result["metrics"][selection_criterion]
criteria_dict[model_name] = criterion_value
is_better = (
selection_criterion in ["aic", "bic"] and criterion_value < best_criterion
) or (selection_criterion == "r2" and criterion_value > best_criterion)
if is_better:
best_criterion = criterion_value
best_model = model_name
if best_model is None:
raise ValueError("Could not select best baseline model")
# Get the best result
best_result = model_results[best_model]
# Prepare return information
model_info = {
"best_model": best_model,
"criteria": criteria_dict,
"parameters": best_result["metrics"].copy(),
"n_params": best_result["n_params"],
}
# Add model-specific information
if best_model == "polynomial":
model_info["polynomial_order"] = best_result["order"]
if verbose:
logger.info(f"\n🏆 Best model: {best_model}")
logger.info(f"📊 {selection_criterion.upper()} = {best_criterion:.2f}")
logger.info(f"📊 R² = {best_result['metrics']['r2']:.4f}")
if len(model_results) > 1:
logger.info(f"\n📋 Model comparison ({selection_criterion.upper()}):")
sorted_models = sorted(
criteria_dict.items(),
key=lambda x: x[1],
reverse=(selection_criterion == "r2"),
)
for i, (model, criterion_val) in enumerate(sorted_models):
marker = "🥇" if model == best_model else f"{i+1}. "
logger.info(f" {marker} {model}: {criterion_val:.2f}")
# Return the corrected data in the original format
corrected_data = best_result["corrected"]
baseline = best_result["baseline"]
return corrected_data, baseline, model_info
[docs]
def compare_models_detailed(
x: Union[np.ndarray, None],
y: np.ndarray,
params: Optional[Dict[str, Any]] = None,
models: Optional[List[str]] = None,
**kwargs,
) -> Dict[str, Dict[str, Any]]:
"""Fit every candidate baseline model and return detailed metrics.
Parameters
----------
x : np.ndarray or None
Axis from :func:`epyr.eprload`. Falls back to indices if None.
y : np.ndarray
1D signal.
params : dict, optional
Parameter dictionary from :func:`epyr.eprload`.
models : list of str, optional
Subset of ``['polynomial', 'stretched_exponential', 'bi_exponential']``.
Default: all three.
**kwargs
Passed through to the underlying ``baseline_*`` functions
(``use_real_part``, ``exclude_initial``, ``manual_regions``, etc.).
Returns
-------
dict
``{model_name: {'aic': ..., 'r_squared': ..., 'parameters': ...}}``
for each model that converged.
Examples
--------
>>> from epyr import eprload
>>> from epyr.baseline import compare_models_detailed
>>> path = "examples/data/ESEdecay_2D_rotgon_035_07.3K_h80_9.73687GHz_B3.DSC"
>>> x, y, params, _ = eprload(path)
>>> results = compare_models_detailed(x[0], y[0], params)
>>> set(results) <= {"polynomial", "stretched_exponential", "bi_exponential"}
True
"""
if models is None:
models = ["polynomial", "stretched_exponential", "bi_exponential"]
# Remove verbose and selection_criterion from kwargs for individual model testing
kwargs_clean = kwargs.copy()
kwargs_clean.pop("verbose", None)
kwargs_clean.pop("selection_criterion", None)
results = {}
# Prepare data
y_fit = y
if np.iscomplexobj(y):
if kwargs.get("use_real_part", True):
y_fit = np.real(y)
else:
y_fit = np.abs(y)
n_points = len(y_fit)
x_coords = x if x is not None else np.arange(n_points, dtype=float)
# Common arguments
common_args = {
"use_real_part": kwargs.get("use_real_part", True),
"exclude_initial": kwargs.get("exclude_initial", 0),
"exclude_final": kwargs.get("exclude_final", 0),
"manual_regions": kwargs.get("manual_regions"),
"region_mode": kwargs.get("region_mode", "exclude"),
"interactive": False,
}
# Test each model
if "polynomial" in models:
poly_result = _test_polynomial_models(
x_coords, y_fit, params, common_args, "aic", False
)
if poly_result:
results["polynomial"] = poly_result
if "stretched_exponential" in models:
stretch_result = _test_stretched_exponential_model(
x_coords, y_fit, params, common_args, "aic", False
)
if stretch_result:
results["stretched_exponential"] = stretch_result
if "bi_exponential" in models:
bi_result = _test_bi_exponential_model(
x_coords, y_fit, params, common_args, "aic", False
)
if bi_result:
results["bi_exponential"] = bi_result
return results
[docs]
def get_model_recommendations(
data_type: str = None, experiment_type: str = None
) -> List[str]:
"""Suggest baseline models given the experiment type.
Parameters
----------
data_type : {'cw', 'pulsed'}, optional
Continuous-wave or pulsed EPR.
experiment_type : str, optional
Sub-type when ``data_type='pulsed'``. Known values: ``'t1'``,
``'t2'``, ``'echo'``, ``'rabi'``, ``'nutation'``.
Returns
-------
list of str
Recommended model names, in order of preference.
Examples
--------
>>> from epyr.baseline import get_model_recommendations
>>> get_model_recommendations("cw")
['polynomial']
>>> get_model_recommendations("pulsed", "t2")
['stretched_exponential', 'bi_exponential', 'polynomial']
"""
if data_type == "cw":
return ["polynomial"]
elif data_type == "pulsed":
if experiment_type in ["t1", "t2", "echo"]:
return ["stretched_exponential", "bi_exponential", "polynomial"]
elif experiment_type in ["rabi", "nutation"]:
return ["polynomial"]
else:
return ["stretched_exponential", "polynomial", "bi_exponential"]
else:
# General case - try all models
return ["polynomial", "stretched_exponential", "bi_exponential"]
[docs]
def auto_baseline_with_recommendations(
x: Union[np.ndarray, None],
y: np.ndarray,
params: Optional[Dict[str, Any]] = None,
data_type: str = None,
experiment_type: str = None,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray, Dict[str, Any]]:
"""Automatic baseline correction, restricted to models that fit the experiment.
Wraps :func:`baseline_auto_1d` after pruning the candidate-model list via
:func:`get_model_recommendations`.
Parameters
----------
x : np.ndarray or None
Axis from :func:`epyr.eprload`.
y : np.ndarray
1D signal.
params : dict, optional
Parameter dictionary from :func:`epyr.eprload`.
data_type : {'cw', 'pulsed'}, optional
Used to filter the candidate models.
experiment_type : str, optional
Refines the filter for pulsed experiments.
**kwargs
Passed through to :func:`baseline_auto_1d`.
Returns
-------
corrected : np.ndarray
baseline : np.ndarray
info : dict
Best-model metadata (name, score, parameters, fit metrics).
Examples
--------
>>> from epyr import eprload
>>> from epyr.baseline import auto_baseline_with_recommendations
>>> x, y, params, _ = eprload(
... "examples/data/ESEdecay_2D_rotgon_035_07.3K_h80_9.73687GHz_B3.DSC"
... )
>>> corrected, baseline, info = auto_baseline_with_recommendations(
... x[0], y[0], params, data_type="pulsed", experiment_type="t2", verbose=False,
... )
>>> info["best_model"] in {"polynomial", "stretched_exponential", "bi_exponential"}
True
"""
recommended_models = get_model_recommendations(data_type, experiment_type)
kwargs["models"] = recommended_models
kwargs.setdefault("verbose", True)
if kwargs.get("verbose"):
dt = data_type or "unknown"
et = experiment_type or "data"
logger.info(f"Recommended models for {dt}" f" {et}: {recommended_models}")
return baseline_auto_1d(x, y, params, **kwargs)