"""Unified model classes for UVHMR and HOD calculations.
This module provides a class-based interface for all galaxy population
modeling calculations, combining UV-halo mass relations with halo
occupation distributions.
"""
import numpy as np
from scipy import interpolate, integrate
from scipy.special import erf
from scipy.stats import norm
from .config import BARYON_FRACTION
from .luminosity import MUV_from_SFR
from .cosmology import get_halo_mass_function, get_halo_bias
try:
import halomod
import halomod.hod
HALOMOD_AVAILABLE = True
except ImportError:
HALOMOD_AVAILABLE = False
[docs]
class UVHMRModel:
"""Base class for UV-Halo Mass Relation models.
This class handles the relationship between halo mass and UV luminosity
through star formation. It serves as the foundation for more complex
models that include occupation distributions.
Parameters
----------
z : float
Redshift
eps0 : float, optional
Star formation efficiency normalization. Default from config.
Mc : float, optional
Characteristic halo mass in M_sun. Default from config.
a : float, optional
Low-mass power-law slope. Default from config.
b : float, optional
High-mass power-law slope. Default from config.
add_dust : bool, optional
Whether to include dust attenuation. Default is True.
Attributes
----------
z, eps0, Mc, a, b : float
Model parameters
add_dust : bool
Whether dust is included
Examples
--------
>>> # Use all defaults (fitted to Bouwens+2021)
>>> model = UVHMRModel(z=6.0)
>>>
>>> # Override specific parameters
>>> model = UVHMRModel(z=6.0, eps0=0.2, Mc=10**12)
"""
def __init__(self, z, eps0=None, Mc=None, a=None, b=None, add_dust=True):
"""Initialize UVHMR model."""
from .config import DEFAULT_HOD_PARAMS
self.z = z
self.eps0 = eps0 if eps0 is not None else DEFAULT_HOD_PARAMS['eps0']
self.Mc = Mc if Mc is not None else 10**DEFAULT_HOD_PARAMS['logMc']
self.a = a if a is not None else DEFAULT_HOD_PARAMS['a']
self.b = b if b is not None else DEFAULT_HOD_PARAMS['b']
self.add_dust = add_dust
[docs]
def halo_accretion_rate(self, Mh):
"""Compute halo mass accretion rate.
Parameters
----------
Mh : float or array_like
Halo mass in M_sun
Returns
-------
dMh_dt : float or ndarray
Halo mass accretion rate in M_sun/yr
"""
Mh = np.atleast_1d(Mh)
return 0.03e-9 * Mh * (Mh / 1e12)**0.14 * (1 + self.z)**2.5
[docs]
def sfr(self, Mh):
"""Star formation rate from halo mass.
Parameters
----------
Mh : float or array_like
Halo mass in M_sun
Returns
-------
sfr : float or ndarray
Star formation rate in M_sun/yr
"""
f_star = self.star_formation_efficiency(Mh)
return BARYON_FRACTION * f_star * self.halo_accretion_rate(Mh)
[docs]
def MUV(self, Mh):
"""UV absolute magnitude from halo mass.
Parameters
----------
Mh : float or array_like
Halo mass in M_sun
Returns
-------
MUV : float or ndarray
UV absolute magnitude
"""
sfr = self.sfr(Mh)
return MUV_from_SFR(sfr, self.z, add_dust=self.add_dust)
[docs]
def Mhalo(self, MUV, M_min=1e6, M_max=1e16, num_points=1000):
"""Inverse: halo mass from UV magnitude.
Parameters
----------
MUV : float or array_like
UV absolute magnitude
M_min, M_max : float, optional
Halo mass range for interpolation
num_points : int, optional
Number of points for interpolation
Returns
-------
Mh : float or ndarray
Halo mass in M_sun
"""
# Create forward relation
Mhalo_grid = np.logspace(np.log10(M_min), np.log10(M_max), num_points)
MUV_grid = self.MUV(Mhalo_grid)
# Invert using interpolation
interp_func = interpolate.interp1d(MUV_grid, Mhalo_grid,
fill_value='extrapolate')
return interp_func(MUV)
[docs]
def update_parameters(self, **kwargs):
"""Update model parameters.
Parameters
----------
**kwargs : dict
Parameters to update: z, eps0, Mc, a, b, add_dust
"""
if 'z' in kwargs:
self.z = kwargs['z']
if 'eps0' in kwargs:
self.eps0 = kwargs['eps0']
if 'Mc' in kwargs:
self.Mc = kwargs['Mc']
if 'a' in kwargs:
self.a = kwargs['a']
if 'b' in kwargs:
self.b = kwargs['b']
if 'add_dust' in kwargs:
self.add_dust = kwargs['add_dust']
def __repr__(self):
return (f"UVHMRModel(z={self.z}, eps0={self.eps0:.3f}, "
f"Mc={self.Mc:.2e}, a={self.a:.2f}, b={self.b:.2f})")
[docs]
class HODModel(UVHMRModel):
"""Halo Occupation Distribution model.
This class extends UVHMRModel to include central and satellite
galaxy occupation functions.
Parameters
----------
z : float
Redshift
eps0 : float, optional
Star formation efficiency normalization. Default from config.
Mc : float, optional
Characteristic halo mass in M_sun. Default from config.
a : float, optional
Low-mass power-law slope. Default from config.
b : float, optional
High-mass power-law slope. Default from config.
sigma_UV : float, optional
UV magnitude scatter. Default from config.
Mcut : float, optional
Satellite cutoff mass in M_sun. Default from config.
Msat : float, optional
Satellite normalization mass in M_sun. Default from config.
asat : float, optional
Satellite power-law slope. Default from config.
add_dust : bool, optional
Whether to include dust attenuation. Default is True.
Attributes
----------
sigma_UV, Mcut, Msat, asat : float
HOD parameters
Examples
--------
>>> # Use all defaults
>>> model = HODModel(z=6.0)
>>>
>>> # Compute occupation numbers
>>> import numpy as np
>>> Mh = np.logspace(10, 13, 100)
>>> N_cen = model.Ncen(Mh, MUV_thresh=-19.0)
>>> N_sat = model.Nsat(Mh, MUV_thresh=-19.0)
"""
def __init__(self, z, eps0=None, Mc=None, a=None, b=None,
sigma_UV=None, Mcut=None, Msat=None, asat=None,
add_dust=True):
"""Initialize HOD model."""
from .config import DEFAULT_HOD_PARAMS
# Initialize parent class
super().__init__(z, eps0, Mc, a, b, add_dust)
# Set HOD parameters
self.sigma_UV = sigma_UV if sigma_UV is not None else DEFAULT_HOD_PARAMS['sigma_UV']
self.Mcut = Mcut if Mcut is not None else 10**DEFAULT_HOD_PARAMS['Mcut']
self.Msat = Msat if Msat is not None else 10**DEFAULT_HOD_PARAMS['Msat']
self.asat = asat if asat is not None else DEFAULT_HOD_PARAMS['asat']
[docs]
def Ncen(self, Mh, MUV_thresh):
"""Central galaxy occupation function.
Average number of central galaxies with M_UV < MUV_thresh.
Parameters
----------
Mh : float or array_like
Halo mass in M_sun
MUV_thresh : float
UV magnitude threshold (brighter than)
Returns
-------
N_cen : float or ndarray
Average number of central galaxies
"""
MUV_mean = self.MUV(Mh)
return 0.5 * (1 - erf((MUV_mean - MUV_thresh) /
(np.sqrt(2) * self.sigma_UV)))
[docs]
def Nsat(self, Mh, MUV_thresh):
"""Satellite galaxy occupation function.
Average number of satellite galaxies with M_UV < MUV_thresh.
Parameters
----------
Mh : float or array_like
Halo mass in M_sun
MUV_thresh : float
UV magnitude threshold
Returns
-------
N_sat : float or ndarray
Average number of satellite galaxies
"""
Mh = np.atleast_1d(Mh)
N_cen = self.Ncen(Mh, MUV_thresh)
# Satellites only exist above cutoff mass
N_sat = np.zeros_like(Mh)
mask = Mh > self.Mcut
N_sat[mask] = ((Mh[mask] - self.Mcut) / self.Msat)**self.asat
return N_sat * N_cen
[docs]
def Ngal(self, Mh, MUV_thresh):
"""Total galaxy occupation (central + satellite).
Parameters
----------
Mh : float or array_like
Halo mass in M_sun
MUV_thresh : float
UV magnitude threshold
Returns
-------
N_gal : float or ndarray
Average total number of galaxies
"""
return self.Ncen(Mh, MUV_thresh) + self.Nsat(Mh, MUV_thresh)
[docs]
def update_parameters(self, **kwargs):
"""Update model parameters.
Parameters
----------
**kwargs : dict
Parameters to update: z, eps0, Mc, a, b, sigma_UV,
Mcut, Msat, asat, add_dust
"""
super().update_parameters(**kwargs)
if 'sigma_UV' in kwargs:
self.sigma_UV = kwargs['sigma_UV']
if 'Mcut' in kwargs:
self.Mcut = kwargs['Mcut']
if 'Msat' in kwargs:
self.Msat = kwargs['Msat']
if 'asat' in kwargs:
self.asat = kwargs['asat']
def __repr__(self):
return (f"HODModel(z={self.z}, eps0={self.eps0:.3f}, "
f"Mc={self.Mc:.2e}, a={self.a:.2f}, b={self.b:.2f}, "
f"sigma_UV={self.sigma_UV:.2f})")
def __str__(self):
s = f"HOD Model at z={self.z}\n"
s += "="*50 + "\n"
s += "UVHMR Parameters:\n"
s += f" eps0 = {self.eps0:.3f}\n"
s += f" Mc = {self.Mc:.2e} M_sun\n"
s += f" a = {self.a:.2f}\n"
s += f" b = {self.b:.2f}\n"
s += "\nHOD Parameters:\n"
s += f" sigma_UV = {self.sigma_UV:.2f} mag\n"
s += f" Mcut = {self.Mcut:.2e} M_sun\n"
s += f" Msat = {self.Msat:.2e} M_sun\n"
s += f" asat = {self.asat:.2f}\n"
s += "\nSettings:\n"
s += f" add_dust = {self.add_dust}\n"
s += "="*50
return s
[docs]
class Observables:
"""Compute observables from an HOD model.
This class provides methods to compute various observables like
luminosity functions, galaxy bias, and correlation functions from
an HOD model.
All observable methods accept optional ``**params`` keyword arguments
(eps0, Mc, a, b, sigma_UV, Mcut, Msat, asat) to update the underlying
HOD model before computing. This enables efficient MCMC loops without
recreating any objects.
For correlation functions specifically, use the initialize/update pattern:
1. Call initialize_correlation_model() once
2. Call update_correlation_model() in your MCMC loop
Parameters
----------
hod_model : HODModel
The HOD model to use for computations
Attributes
----------
hod_model : HODModel
The underlying HOD model
Examples
--------
>>> # Basic usage
>>> model = HODModel(z=6.0)
>>> obs = Observables(model)
>>> MUV = np.linspace(-22, -16, 20)
>>> phi = obs.luminosity_function(MUV)
>>> # MCMC fitting - all observables support inline parameter updates
>>> obs = Observables(HODModel(z=6.0))
>>> for eps0, sigma_UV in mcmc_samples:
... phi = obs.luminosity_function(MUV, eps0=eps0, sigma_UV=sigma_UV)
... bg = obs.galaxy_bias(MUV, eps0=eps0, sigma_UV=sigma_UV)
>>> # Correlation function MCMC (uses halomod caching)
>>> obs.initialize_correlation_model(MUV_thresh1=-19.1)
>>> for eps0, sigma_UV in mcmc_samples:
... result = obs.update_correlation_model(eps0=eps0, sigma_UV=sigma_UV)
... w_theta = result['correlation']
"""
# HOD parameter names accepted by update_parameters
_HOD_PARAM_NAMES = frozenset([
'z', 'eps0', 'Mc', 'a', 'b', 'sigma_UV', 'Mcut', 'Msat', 'asat', 'add_dust'
])
def __init__(self, hod_model):
"""Initialize observables calculator.
Parameters
----------
hod_model : HODModel
The HOD model to use
"""
self.hod_model = hod_model
self._hmf_cache = {}
# For efficient correlation function updates
self._halomod_model = None
self._halomod_config = {}
self._hod_wrapper = None
def _apply_params(self, params):
"""Apply parameter updates to the underlying HOD model.
Extracts recognized HOD parameters from ``params``, applies them
to ``self.hod_model``, and returns any remaining kwargs.
Parameters
----------
params : dict
Keyword arguments, possibly containing HOD parameter overrides.
Returns
-------
remaining : dict
Any kwargs that are not HOD parameters (passed through).
"""
hod_updates = {k: v for k, v in params.items() if k in self._HOD_PARAM_NAMES}
remaining = {k: v for k, v in params.items() if k not in self._HOD_PARAM_NAMES}
if hod_updates:
self.hod_model.update_parameters(**hod_updates)
return remaining
def _get_hmf(self, recalculate=False):
"""Get HMF - from halomod if available, otherwise from Colossus.
Use halomod HMF when available for
consistency with correlation functions, otherwise use Colossus.
Parameters
----------
recalculate : bool, optional
Force recalculation. Default is False.
Returns
-------
log10_mass : ndarray
Array of log10(M/M_sun) in physical units
hmf : ndarray
Halo mass function dn/dln(M) in physical units of Mpc^-3
"""
# If halomod model exists, use its HMF for consistency
if self._halomod_model is not None:
# Extract from halomod
M_h = self._halomod_model.m # M_sun/h
dndlnm_h = self._halomod_model.dndlnm # (h/Mpc)^3
# Convert to physical units
h = self._halomod_config['h']
M_physical = M_h / h # M_sun
hmf_physical = dndlnm_h * h**3 # Mpc^-3
return np.log10(M_physical), hmf_physical
# Otherwise use Colossus (backward compatible)
cache_key = self.hod_model.z
if recalculate or cache_key not in self._hmf_cache:
log10_Mh, hmf = get_halo_mass_function(
self.hod_model.z, M_min=6, M_max=15, num_points=2048
)
self._hmf_cache[cache_key] = (log10_Mh, hmf)
return self._hmf_cache[cache_key]
[docs]
def luminosity_function(self, MUV_arr, dMUV=0.05, **params):
"""Compute UV luminosity function.
Parameters
----------
MUV_arr : array_like
Array of UV absolute magnitudes
dMUV : float, optional
Magnitude bin width. Default is 0.05.
**params : dict, optional
HOD parameter overrides (eps0, Mc, a, b, sigma_UV, Mcut,
Msat, asat, add_dust). Applied to the model before computing.
Returns
-------
phi : ndarray
Luminosity function Φ(M_UV) in Mpc^-3 mag^-1
Examples
--------
>>> obs.luminosity_function(MUV) # current params
>>> obs.luminosity_function(MUV, eps0=0.3, sigma_UV=0.5) # override
"""
self._apply_params(params)
MUV_arr = np.atleast_1d(MUV_arr)
log10_Mh, hmf = self._get_hmf()
Mh = 10**log10_Mh
phi = np.zeros_like(MUV_arr)
for i, muv in enumerate(MUV_arr):
# Number in magnitude bin
N_cen_bin = (self.hod_model.Ncen(Mh, muv + dMUV/2) -
self.hod_model.Ncen(Mh, muv - dMUV/2))
N_sat_bin = (self.hod_model.Nsat(Mh, muv + dMUV/2) -
self.hod_model.Nsat(Mh, muv - dMUV/2))
integrand = (N_cen_bin + N_sat_bin) * hmf
phi[i] = integrate.simpson(integrand, x=log10_Mh) / dMUV
return phi
[docs]
def galaxy_bias(self, MUV_arr, dMUV=0.05, **params):
"""Compute galaxy bias as a function of UV magnitude.
Parameters
----------
MUV_arr : array_like
Array of UV absolute magnitudes
dMUV : float, optional
Magnitude bin width
**params : dict, optional
HOD parameter overrides (eps0, Mc, a, b, sigma_UV, Mcut,
Msat, asat, add_dust). Applied to the model before computing.
Returns
-------
bias : ndarray
Galaxy bias
"""
self._apply_params(params)
MUV_arr = np.atleast_1d(MUV_arr)
log10_Mh, hmf = self._get_hmf()
Mh = 10**log10_Mh
bias_halo = get_halo_bias(Mh, self.hod_model.z, mdef='vir', model='tinker10')
bg = np.zeros_like(MUV_arr)
for i, muv in enumerate(MUV_arr):
N_cen_bin = (self.hod_model.Ncen(Mh, muv + dMUV/2) -
self.hod_model.Ncen(Mh, muv - dMUV/2))
N_sat_bin = (self.hod_model.Nsat(Mh, muv + dMUV/2) -
self.hod_model.Nsat(Mh, muv - dMUV/2))
bg_integrand = (N_cen_bin + N_sat_bin) * hmf * bias_halo
ngal_integrand = (N_cen_bin + N_sat_bin) * hmf
ngal = integrate.simpson(ngal_integrand, x=log10_Mh)
if ngal > 0:
bg[i] = integrate.simpson(bg_integrand, x=log10_Mh) / ngal
else:
bg[i] = 0.0
return bg
[docs]
def mean_halo_mass(self, MUV_thresh, **params):
"""Compute mean halo mass for galaxies brighter than threshold.
Parameters
----------
MUV_thresh : float
UV magnitude threshold
**params : dict, optional
HOD parameter overrides.
Returns
-------
log10_Mh_mean : float
Mean log10 halo mass
"""
self._apply_params(params)
log10_Mh, hmf = self._get_hmf()
Mh = 10**log10_Mh
N_gal = self.hod_model.Ngal(Mh, MUV_thresh)
mh_integrand = N_gal * hmf * Mh
ngal_integrand = N_gal * hmf
ngal = integrate.simpson(ngal_integrand, x=log10_Mh)
if ngal > 0:
mh_mean = integrate.simpson(mh_integrand, x=log10_Mh) / ngal
return np.log10(mh_mean)
else:
return np.nan
[docs]
def mean_bias(self, MUV_thresh, **params):
"""Compute mean galaxy bias for galaxies brighter than threshold.
Parameters
----------
MUV_thresh : float
UV magnitude threshold
**params : dict, optional
HOD parameter overrides.
Returns
-------
bias_mean : float
Mean galaxy bias
"""
self._apply_params(params)
log10_Mh, hmf = self._get_hmf()
Mh = 10**log10_Mh
bias_halo = get_halo_bias(Mh, self.hod_model.z, mdef='vir', model='tinker10')
N_gal = self.hod_model.Ngal(Mh, MUV_thresh)
bg_integrand = N_gal * hmf * bias_halo
ngal_integrand = N_gal * hmf
ngal = integrate.simpson(ngal_integrand, x=log10_Mh)
if ngal > 0:
return integrate.simpson(bg_integrand, x=log10_Mh) / ngal
else:
return np.nan
[docs]
def number_density(self, MUV_thresh, **params):
"""Compute cumulative galaxy number density brighter than threshold.
Parameters
----------
MUV_thresh : float
UV magnitude threshold
**params : dict, optional
HOD parameter overrides.
Returns
-------
n_gal : float
Galaxy number density in Mpc^-3
"""
self._apply_params(params)
log10_Mh, hmf = self._get_hmf()
Mh = 10**log10_Mh
N_gal = self.hod_model.Ngal(Mh, MUV_thresh)
integrand = N_gal * hmf
return integrate.simpson(integrand, x=log10_Mh)
[docs]
def initialize_correlation_model(self, MUV_thresh1, MUV_thresh2=0,
cosmo_model=None, correlation_type='angular',
zmin=None, zmax=None, znum=150,
p1=None,
theta_min=1.0, theta_max=7200.0, theta_num=100,
rmin=1e-3, rmax=200.0, rnum=200,
rp_min=0.1, rp_max=100.0, rp_num=100,
pi_max=100.0,
hmf_model='Watson', bias_model='Tinker10',
concentration_model='Duffy08',
halo_profile='NFW',
exclusion_model='DblSphere_',
sd_bias_model=None,
**kwargs):
"""Initialize halomod model for efficient parameter updates.
Call this once before MCMC fitting. Then use update_correlation_model()
to efficiently update HOD parameters.
This creates and caches the halomod model, allowing fast parameter
updates via halomod's update() method.
Parameters are the same as compute_correlation_function().
Returns
-------
result : dict
Dictionary with 'model', 'separation', 'correlation', 'type'
Examples
--------
>>> # Initialize once
>>> obs = Observables(model)
>>> result = obs.initialize_correlation_model(
... MUV_thresh1=-19.1, correlation_type='angular'
... )
>>>
>>> # Update efficiently in MCMC loop
>>> for eps0 in mcmc_chain:
... result = obs.update_correlation_model(eps0=eps0)
... chi2 = compute_chi2(result['correlation'], data)
"""
if not HALOMOD_AVAILABLE:
raise ImportError("halomod package required")
# Set up redshift range
z_center = self.hod_model.z
if zmin is None:
zmin = max(0.1, z_center - 1.0)
if zmax is None:
zmax = z_center + 1.0
# Set up cosmology and extract h
if cosmo_model is None:
from .config import DEFAULT_COSMO_PARAMS
cosmo_params = {
'Om0': DEFAULT_COSMO_PARAMS['Om0'],
'Ob0': DEFAULT_COSMO_PARAMS['Ob0'],
'H0': DEFAULT_COSMO_PARAMS['H0'],
}
h = DEFAULT_COSMO_PARAMS['H0'] / 100.0
elif isinstance(cosmo_model, dict):
cosmo_params = cosmo_model
if 'H0' not in cosmo_params:
raise ValueError("cosmo_model dict must include 'H0'")
h = cosmo_params['H0'] / 100.0
else:
cosmo_params = cosmo_model
h = cosmo_model.H0.value / 100.0
# Store configuration for updates
self._halomod_config = {
'MUV_thresh1': MUV_thresh1,
'MUV_thresh2': MUV_thresh2,
'correlation_type': correlation_type,
'h': h,
'z_center': z_center,
'zmin': zmin,
'zmax': zmax,
'znum': znum,
'cosmo_params': cosmo_params,
}
# Halomod parameters
halomod_params = {
'cosmo_params' : cosmo_params,
'z': z_center,
'rmin': rmin * h,
'rmax': rmax * h,
'rnum': rnum,
'Mmin': kwargs.pop('Mmin', 5),
'Mmax': kwargs.pop('Mmax', 16),
'dlog10m': kwargs.pop('dlog10m', 0.1),
'dlnk': kwargs.pop('dlnk', 0.2),
'lnk_min': kwargs.pop('lnk_min', -6),
'lnk_max': kwargs.pop('lnk_max', 7),
'hmf_model': hmf_model,
'tracer_concentration_model': concentration_model,
'halo_profile_model': halo_profile,
'hc_spectrum': kwargs.pop('hc_spectrum', 'nonlinear'),
'exclusion_model': exclusion_model,
'bias_model': bias_model,
'sd_bias_model': sd_bias_model,
'hod_model' : HalomodHOD,
'hod_params' : {
'z' : z_center,
'MUV_thresh1' : MUV_thresh1,
'MUV_thresh2' : MUV_thresh2,
'eps0' : self.hod_model.eps0,
'Mc' : self.hod_model.Mc,
'a' : self.hod_model.a,
'b' : self.hod_model.b,
'sigma_UV' : self.hod_model.sigma_UV,
'Mcut' : self.hod_model.Mcut,
'Msat' : self.hod_model.Msat,
'asat' : self.hod_model.asat,
'add_dust' : self.hod_model.add_dust,
'h' : h,
'M_min': kwargs.pop('M_min', 9.0)}
}
halomod_params.update(kwargs)
if correlation_type.lower() == 'angular':
theta_min_rad = theta_min / 3600.0 * (np.pi / 180.0)
theta_max_rad = theta_max / 3600.0 * (np.pi / 180.0)
if p1 is None:
# Create a normalized callable directly
def p1(z):
return norm.pdf(z, loc=z_center, scale=0.5)
self._halomod_model = halomod.AngularCF(
zmin=zmin,
zmax=zmax,
znum=znum,
p1=p1,
theta_min=theta_min_rad,
theta_max=theta_max_rad,
theta_num=theta_num,
logu_min=kwargs.pop('logu_min', -3),
logu_max=kwargs.pop('logu_max', 2.0),
unum=kwargs.pop('unum', 100),
dr_table=kwargs.pop('dr_table', 0.03),
**halomod_params)
separation = self._halomod_model.theta * (180.0 / np.pi) * 3600.0
correlation = self._halomod_model.angular_corr_gal
elif correlation_type.lower() == 'real':
self._halomod_model = halomod.TracerHaloModel(
**halomod_params
)
separation = self._halomod_model.r / h
correlation = self._halomod_model.corr_auto_tracer
elif correlation_type.lower() == 'projected':
self._halomod_model = halomod.ProjectedCF(
rp_min=rp_min * h,
rp_max=rp_max * h,
rp_num=rp_num,
proj_limit=pi_max * h,
**halomod_params
)
separation = self._halomod_model.r / h
correlation = self._halomod_model.projected_corr_gal
else:
raise ValueError(f"Unknown correlation_type: {correlation_type}")
return {
'model': self._halomod_model,
'separation': separation,
'correlation': correlation,
'type': correlation_type,
}
[docs]
def update_correlation_model(self, z_center=None, p_of_z=None, MUV_thresh1=None, MUV_thresh2=None,
eps0=None, Mc=None, a=None, b=None,
sigma_UV=None, Mcut=None, Msat=None, asat=None):
"""Update HOD parameters and recompute correlation function efficiently.
This uses halomod's update() method to efficiently recompute the
correlation function with new parameters, without recreating the
entire model.
Must call initialize_correlation_model() first.
Parameters
----------
z_center : float, optional
Update the redshift center
p_of_z : callable, optional
Update the redshift distribution function
MUV_thresh1 : float, optional
Update UV magnitude threshold
MUV_thresh2 : float, optional
Update faint-end threshold
eps0 : float, optional
Update star formation efficiency
Mc : float, optional
Update characteristic mass
a : float, optional
Update low-mass slope
b : float, optional
Update high-mass slope
sigma_UV : float, optional
Update UV scatter
Mcut : float, optional
Update satellite cutoff mass
Msat : float, optional
Update satellite normalization mass
asat : float, optional
Update satellite slope
Returns
-------
result : dict
Dictionary with 'model', 'separation', 'correlation', 'type'
Examples
--------
>>> # Initialize once
>>> result = obs.initialize_correlation_model(MUV_thresh1=-19.1)
>>>
>>> # Update in MCMC loop
>>> for eps0, sigma_UV in zip(eps0_chain, sigma_UV_chain):
... result = obs.update_correlation_model(
... eps0=eps0, sigma_UV=sigma_UV
... )
... likelihood = compute_likelihood(result['correlation'])
"""
if self._halomod_model is None:
raise ValueError(
"Must call initialize_correlation_model() before update_correlation_model(). "
"Or use compute_correlation_function() for one-off calculations."
)
# Update HODModel parameters
update_dict = {}
if eps0 is not None:
update_dict['eps0'] = eps0
if Mc is not None:
update_dict['Mc'] = Mc
if a is not None:
update_dict['a'] = a
if b is not None:
update_dict['b'] = b
if sigma_UV is not None:
update_dict['sigma_UV'] = sigma_UV
if Mcut is not None:
update_dict['Mcut'] = Mcut
if Msat is not None:
update_dict['Msat'] = Msat
if asat is not None:
update_dict['asat'] = asat
if update_dict:
self.hod_model.update_parameters(**update_dict)
# Update magnitude thresholds if provided
if MUV_thresh1 is not None:
self._halomod_config['MUV_thresh1'] = MUV_thresh1
if MUV_thresh2 is not None:
self._halomod_config['MUV_thresh2'] = MUV_thresh2
# Update z_center if provided
if z_center is not None:
self._halomod_config['z_center'] = z_center
# Build HOD params update dict (always include current values)
hod_params_update = {
'z': self._halomod_config['z_center'],
'MUV_thresh1': self._halomod_config['MUV_thresh1'],
'MUV_thresh2': self._halomod_config['MUV_thresh2'],
'eps0': self.hod_model.eps0,
'Mc': self.hod_model.Mc,
'a': self.hod_model.a,
'b': self.hod_model.b,
'sigma_UV': self.hod_model.sigma_UV,
'Mcut': self.hod_model.Mcut,
'Msat': self.hod_model.Msat,
'asat': self.hod_model.asat,
'add_dust': self.hod_model.add_dust,
}
# Build halomod update dict (only include non-None values)
halomod_update = {'hod_params': hod_params_update}
if z_center is not None:
halomod_update['z'] = z_center
if p_of_z is not None:
halomod_update['p1'] = p_of_z
# Update halomod model efficiently
self._halomod_model.update(**halomod_update)
# Extract updated results
correlation_type = self._halomod_config['correlation_type']
h = self._halomod_config['h']
if correlation_type.lower() == 'angular':
separation = self._halomod_model.theta * (180.0 / np.pi) * 3600.0
correlation = self._halomod_model.angular_corr_gal
elif correlation_type.lower() == 'real':
separation = self._halomod_model.r / h
correlation = self._halomod_model.corr_auto_tracer
elif correlation_type.lower() == 'projected':
separation = self._halomod_model.rp / h
correlation = self._halomod_model.projected_corr_gal
return {
'model': self._halomod_model,
'separation': separation,
'correlation': correlation,
'type': correlation_type,
}
[docs]
def compute_correlation_function(self, MUV_thresh1, MUV_thresh2=0, **kwargs):
"""Compute correlation function (convenience method).
This creates a new halomod model each time. For MCMC fitting,
use initialize_correlation_model() + update_correlation_model() instead.
Parameters are the same as initialize_correlation_model().
Returns
-------
result : dict
Dictionary with 'model', 'separation', 'correlation', 'type'
"""
return self.initialize_correlation_model(
MUV_thresh1, MUV_thresh2, **kwargs
)
[docs]
class HalomodHOD(halomod.hod.HODPoisson if HALOMOD_AVAILABLE else object):
"""Halomod-compatible HOD class wrapping the HODModel.
This class bridges our HODModel with halomod's expectations.
IMPORTANT: Handles unit conversions between halomod (M_sun/h) and
HODModel (M_sun).
Parameters
----------
z : float
Redshift
MUV_thresh1 : float, optional
Bright-end UV magnitude threshold
MUV_thresh2 : float, optional
Faint-end UV magnitude threshold
eps0, Mc, a, b, sigma_UV, Mcut, Msat, asat, add_dust : optional
HOD parameters. Default from config.
h : float, optional
Hubble parameter H0/100
Examples
--------
>>> hod = HalomodHOD(z=5.0, MUV_thresh1=-19.1, h=0.7)
"""
_defaults = {
"z": 5.0,
"MUV_thresh1": -16.0,
"MUV_thresh2": 0.0,
"eps0": None,
"Mc": None,
"a": None,
"b": None,
"sigma_UV": None,
"Mcut": None,
"Msat": None,
"asat": None,
"add_dust": True,
"M_min": 8.0,
}
def __init__(self, z=None, MUV_thresh1=None, MUV_thresh2=None,
eps0=None, Mc=None, a=None, b=None,
sigma_UV=None, Mcut=None, Msat=None, asat=None,
add_dust=None, h=None, M_min=None, **kwargs):
"""Initialize halomod HOD wrapper."""
if not HALOMOD_AVAILABLE:
raise ImportError("halomod package required")
super().__init__(**kwargs)
from .config import DEFAULT_HOD_PARAMS, DEFAULT_COSMO_PARAMS
# Set parameters
self.z = z if z is not None else self._defaults['z']
self.MUV_thresh1 = MUV_thresh1 if MUV_thresh1 is not None else self._defaults['MUV_thresh1']
self.MUV_thresh2 = MUV_thresh2 if MUV_thresh2 is not None else self._defaults['MUV_thresh2']
self.eps0 = eps0 if eps0 is not None else DEFAULT_HOD_PARAMS['eps0']
self.Mc = Mc if Mc is not None else 10**DEFAULT_HOD_PARAMS['logMc']
self.a = a if a is not None else DEFAULT_HOD_PARAMS['a']
self.b = b if b is not None else DEFAULT_HOD_PARAMS['b']
self.sigma_UV = sigma_UV if sigma_UV is not None else DEFAULT_HOD_PARAMS['sigma_UV']
self.Mcut = Mcut if Mcut is not None else 10**DEFAULT_HOD_PARAMS['Mcut']
self.Msat = Msat if Msat is not None else 10**DEFAULT_HOD_PARAMS['Msat']
self.asat = asat if asat is not None else DEFAULT_HOD_PARAMS['asat']
self.add_dust = add_dust if add_dust is not None else DEFAULT_HOD_PARAMS['add_dust']
self.M_min = M_min if M_min is not None else self._defaults['M_min'] # Add this
self.h = h if h is not None else DEFAULT_COSMO_PARAMS['H0'] / 100.0
# Create internal HODModel
self.hod_model = HODModel(
z=self.z, eps0=self.eps0, Mc=self.Mc, a=self.a, b=self.b,
sigma_UV=self.sigma_UV, Mcut=self.Mcut, Msat=self.Msat,
asat=self.asat, add_dust=self.add_dust
)
def _central_occupation(self, M):
"""Central occupation function for halomod.
UNIT CONVERSION: M [M_sun] = M [M_sun/h] / h
"""
M_physical = M / self.h
if self.MUV_thresh2 > 0:
N_c_bright = self.hod_model.Ncen(M_physical, self.MUV_thresh1)
N_c_faint = self.hod_model.Ncen(M_physical, self.MUV_thresh2)
return N_c_bright - N_c_faint
else:
return self.hod_model.Ncen(M_physical, self.MUV_thresh1)
def _satellite_occupation(self, M):
"""Satellite occupation function for halomod.
UNIT CONVERSION: M [M_sun] = M [M_sun/h] / h
"""
M_physical = M / self.h
if self.MUV_thresh2 > 0:
N_s_bright = self.hod_model.Nsat(M_physical, self.MUV_thresh1)
N_s_faint = self.hod_model.Nsat(M_physical, self.MUV_thresh2)
return N_s_bright - N_s_faint
else:
return self.hod_model.Nsat(M_physical, self.MUV_thresh1)