Source code for pint.models.noise_model

"""Pulsar timing noise models."""

import copy
from typing import Callable, List, Optional, Tuple
import warnings

import astropy.units as u
import numpy as np
from loguru import logger as log

from pint import DMconst, dmu
from pint.models.parameter import Parameter, floatParameter, intParameter, maskParameter
from pint.models.timing_model import Component
from pint.toa import TOAs


[docs]class NoiseComponent(Component): introduces_dm_errors = False def __init__( self, ): super().__init__() self.covariance_matrix_funcs = [] self.scaled_toa_sigma_funcs = [] # Need to move this to a special place. self.scaled_dm_sigma_funcs = [] # TODO This works right now. But if we want to expend noise model, we # need to think about the design now. If we do not define the list # here and calling the same name from other component, it will get # it from the component that hosts it. It has the risk to duplicate # the list elements. self.dm_covariance_matrix_funcs_component = [] self.basis_funcs = [] @property def introduces_correlated_errors(self) -> bool: return isinstance(self, CorrelatedNoiseComponent)
[docs]class WhiteNoiseComponent(NoiseComponent): """Abstract base class for all white noise components.""" pass
[docs]class CorrelatedNoiseComponent(NoiseComponent): """Abstract base class for all correlated noise components.""" is_time_correlated = False def get_noise_basis(self, toas): raise NotImplementedError def get_noise_weights(self, toas): raise NotImplementedError
[docs] def get_dm_noise_basis(self, toas): """The DM part of the basis matrix for wideband datasets. This is non-zero only for DM noise. The output is a numpy array but it has units of dmu/s by convention since the noise amplitudes are defined to have dimensions of time.""" toa_noise_basis = self.get_noise_basis(toas) if self.introduces_dm_errors: freqs = self._parent.barycentric_radio_freq(toas) return (toa_noise_basis * (freqs**2 / DMconst)[:, None]).to_value(dmu / u.s) else: return np.zeros_like(toa_noise_basis)
[docs] def get_wideband_noise_basis(self, toas): """The wideband noise basis including both TOA and DM parts. The TOA part of the matrix is dimensionless but the DM part of the basis has units of dmu/s.""" M_toa = self.get_noise_basis(toas) M_dm = self.get_dm_noise_basis(toas) return np.vstack((M_toa, M_dm))
[docs]class ScaleToaError(WhiteNoiseComponent): """Correct the reported TOA uncertainties. The corrections account for imperfections in the TOA measurement and pulse jitter. Parameters supported: .. paramtable:: :class: pint.models.noise_model.ScaleToaError Note ---- Ref: NANOGrav 11 yrs data """ register = True category = "scale_toa_error" def __init__( self, ): super().__init__() self.add_param( maskParameter( name="EFAC", units="", aliases=["T2EFAC", "TNEF"], description="A multiplication factor on the measured TOA uncertainties,", convert_tcb2tdb=False, ) ) self.add_param( maskParameter( name="EQUAD", units="us", aliases=["T2EQUAD"], description="An error term added in quadrature to the scaled (by EFAC) TOA uncertainty.", convert_tcb2tdb=False, ) ) self.add_param( maskParameter( name="TNEQ", units=u.LogUnit(physical_unit=u.second), description="An error term added in quadrature to the scaled (by EFAC) TOA uncertainty in units of log10(second).", convert_tcb2tdb=False, ) ) self.covariance_matrix_funcs += [self.sigma_scaled_cov_matrix] self.scaled_toa_sigma_funcs += [self.scale_toa_sigma] self.toasigma_deriv_funcs = {}
[docs] def setup(self): super().setup() self.EFACs = {} self.EQUADs = {} self.TNEQs = {} for mask_par in self.get_params_of_type("maskParameter"): if mask_par.startswith("EFAC"): par = getattr(self, mask_par) self.EFACs[mask_par] = (par.key, par.key_value) elif mask_par.startswith("EQUAD"): par = getattr(self, mask_par) self.EQUADs[mask_par] = (par.key, par.key_value) elif mask_par.startswith("TNEQ"): par = getattr(self, mask_par) self.TNEQs[mask_par] = (par.key, par.key_value) else: continue # convert all the TNEQ to EQUAD for tneq, value in self.TNEQs.items(): tneq_par = getattr(self, tneq) if tneq_par.key is None: continue if value in list(self.EQUADs.values()): log.warning( f"'{tneq} {tneq_par.key} {tneq_par.key_value}' is provided by parameter EQUAD, using EQUAD instead. " ) else: EQUAD_name = f"EQUAD{str(tneq_par.index)}" if EQUAD_name not in list(self.EQUADs.keys()): self.add_param( maskParameter( name="EQUAD", units="us", index=tneq_par.index, aliases=["T2EQUAD"], description="An error term added in quadrature to the scaled (by EFAC) TOA uncertainty.", convert_tcb2tdb=False, ) ) EQUAD_par = getattr(self, EQUAD_name) EQUAD_par.quantity = tneq_par.quantity.to(u.us) EQUAD_par.key_value = tneq_par.key_value EQUAD_par.key = tneq_par.key for pp in self.params: if pp.startswith("EQUAD"): par = getattr(self, pp) self.EQUADs[pp] = (par.key, par.key_value) for ef in self.EFACs: self.register_toasigma_deriv_funcs(self.d_toasigma_d_EFAC, ef) for eq in self.EQUADs: self.register_toasigma_deriv_funcs(self.d_toasigma_d_EQUAD, eq)
def register_toasigma_deriv_funcs(self, func: Callable, param: str): pn = self.match_param_aliases(param) if pn not in list(self.toasigma_deriv_funcs.keys()): self.toasigma_deriv_funcs[pn] = [func] elif func in self.toasigma_deriv_funcs[pn]: return else: self.toasigma_deriv_funcs[pn] += [func]
[docs] def validate(self): super().validate() # check duplicate for el in ["EFACs", "EQUADs"]: l = list(getattr(self, el).values()) if [x for x in l if l.count(x) > 1] != []: raise ValueError(f"'{el}' have duplicated keys and key values.")
def scale_toa_sigma(self, toas: TOAs, warn: bool = True) -> u.Quantity: sigma_scaled = toas.table["error"].quantity.copy() for equad_name in self.EQUADs: equad = getattr(self, equad_name) if equad.quantity is None: continue mask = equad.select_toa_mask(toas) if len(mask) > 0: sigma_scaled[mask] = np.hypot(sigma_scaled[mask], equad.quantity) elif warn: warnings.warn(f"EQUAD {equad} has no TOAs") for efac_name in self.EFACs: efac = getattr(self, efac_name) mask = efac.select_toa_mask(toas) if len(mask) > 0: sigma_scaled[mask] *= efac.quantity elif warn: warnings.warn(f"EFAC {efac} has no TOAs") return sigma_scaled def sigma_scaled_cov_matrix(self, toas: TOAs) -> np.ndarray: scaled_sigma = self.scale_toa_sigma(toas).to(u.s).value ** 2 return np.diag(scaled_sigma) def d_toasigma_d_EFAC(self, toas: TOAs, param: str) -> u.Quantity: par = getattr(self, param) mask = par.select_toa_mask(toas) result = np.zeros(len(toas)) << u.s result[mask] = self.scale_toa_sigma(toas[mask], warn=False).to( u.s ) / par.quantity.to(u.dimensionless_unscaled) return result def d_toasigma_d_EQUAD(self, toas: TOAs, param: str) -> u.Quantity: par = getattr(self, param) mask = par.select_toa_mask(toas) toas_mask = toas[mask] result = np.zeros(len(toas)) << u.dimensionless_unscaled sigma_mask = self.scale_toa_sigma(toas_mask, warn=False) sigma2_mask_noefac = toas_mask.get_errors().to(u.s) ** 2 for equad_name in self.EQUADs: equad = getattr(self, equad_name) if equad.quantity is None: continue eqmask = equad.select_toa_mask(toas_mask) if np.any(eqmask): sigma2_mask_noefac[eqmask] += equad.quantity**2 result[mask] = (sigma_mask * par.quantity / sigma2_mask_noefac).to( u.dimensionless_unscaled ) return result
[docs]class ScaleDmError(WhiteNoiseComponent): """Correction for estimated wideband DM measurement uncertainty. Parameters supported: .. paramtable:: :class: pint.models.noise_model.ScaleDmError Note ---- Ref: NANOGrav 12.5 yrs wideband data """ register = True category = "scale_dm_error" introduces_dm_errors = True def __init__( self, ): super().__init__() self.add_param( maskParameter( name="DMEFAC", units="", description="A multiplication factor on the measured DM uncertainties,", convert_tcb2tdb=False, ) ) self.add_param( maskParameter( name="DMEQUAD", units="pc / cm ^ 3", description="An error term added in quadrature to the scaled (by EFAC) TOA uncertainty.", convert_tcb2tdb=False, ) ) self.dm_covariance_matrix_funcs_component = [self.dm_sigma_scaled_cov_matrix] self.scaled_dm_sigma_funcs += [self.scale_dm_sigma] self._paired_DMEFAC_DMEQUAD = None
[docs] def setup(self): super().setup() # Get all the EFAC parameters and EQUAD self.DMEFACs = {} self.DMEQUADs = {} for mask_par in self.get_params_of_type("maskParameter"): if mask_par.startswith("DMEFAC"): par = getattr(self, mask_par) if par.key is not None: self.DMEFACs[mask_par] = (par.key, tuple(par.key_value)) elif mask_par.startswith("DMEQUAD"): par = getattr(self, mask_par) if par.key is not None: self.DMEQUADs[mask_par] = (par.key, tuple(par.key_value)) else: continue
# if len(self.DMEFACs) != len(self.DMEQUADs): # self._match_DMEFAC_DMEQUAD() # else: # self._paired_DMEFAC_DMEQUAD = self.pair_DMEFAC_DMEQUAD()
[docs] def validate(self): super().validate() # check duplicate for el in ["DMEFACs", "DMEQUADs"]: l = list(getattr(self, el).values()) if [x for x in l if l.count(x) > 1] != []: raise ValueError(f"'{el}' have duplicated keys and key values.")
[docs] def scale_dm_sigma(self, toas: TOAs) -> u.Quantity: """ Scale the DM uncertainty. Parameters ---------- toas: `pint.toa.TOAs` object Input DM error object. We assume DM error is stored in the TOA objects. """ sigma_scaled = copy.deepcopy(toas.get_dm_errors()) # Apply DMEQUAD first for dmequad_name in self.DMEQUADs: dmequad = getattr(self, dmequad_name) if dmequad.quantity is None: continue mask = dmequad.select_toa_mask(toas) sigma_scaled[mask] = np.hypot(sigma_scaled[mask], dmequad.quantity) # Then apply the DMEFAC for dmefac_name in self.DMEFACs: dmefac = getattr(self, dmefac_name) sigma_scaled[dmefac.select_toa_mask(toas)] *= dmefac.quantity return sigma_scaled
def dm_sigma_scaled_cov_matrix(self, toas: TOAs) -> np.ndarray: scaled_sigma = self.scale_dm_sigma(toas).to_value(u.pc / u.cm**3) ** 2 return np.diag(scaled_sigma)
[docs]class EcorrNoise(CorrelatedNoiseComponent): """Noise correlated between nearby TOAs. This can occur, for example, if multiple TOAs were taken at different frequencies simultaneously: pulsar intrinsic emission jitters back and forth within the average profile, and this effect is the same for all frequencies. Thus these TOAs have correlated errors. Parameters supported: .. paramtable:: :class: pint.models.noise_model.EcorrNoise Note ---- Ref: NANOGrav 11 yrs data """ register = True category = "ecorr_noise" def __init__( self, ): super().__init__() self.add_param( maskParameter( name="ECORR", units="us", aliases=["TNECORR"], description="An error term that is correlated among all TOAs in an observing epoch.", convert_tcb2tdb=False, ) ) self.covariance_matrix_funcs += [self.ecorr_cov_matrix] self.basis_funcs += [self.ecorr_basis_weight_pair]
[docs] def setup(self): super().setup() # Get all the EFAC parameters and EQUAD self.ECORRs = {} for mask_par in self.get_params_of_type("maskParameter"): if mask_par.startswith("ECORR"): par = getattr(self, mask_par) self.ECORRs[mask_par] = (par.key, par.key_value) else: continue
[docs] def validate(self): super().validate() # check duplicate for el in ["ECORRs"]: l = list(getattr(self, el).values()) if [x for x in l if l.count(x) > 1] != []: raise ValueError(f"'{el}' have duplicated keys and key values.")
def get_ecorrs(self) -> List[Parameter]: return [getattr(self, ecorr) for ecorr in self.ECORRs.keys()]
[docs] def get_noise_basis(self, toas: TOAs) -> np.ndarray: """Return the quantization matrix for ECORR. A quantization matrix maps TOAs to observing epochs. """ tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value ecorrs = self.get_ecorrs() umats = [] for ec in ecorrs: mask = ec.select_toa_mask(toas) if np.any(mask): umats.append(create_ecorr_quantization_matrix(t[mask])) else: warnings.warn(f"ECORR {ec} has no TOAs") umats.append(np.zeros((0, 0))) nc = sum(u.shape[1] for u in umats) umat = np.zeros((len(t), nc)) nctot = 0 for ct, ec in enumerate(ecorrs): mask = ec.select_toa_mask(toas) nn = umats[ct].shape[1] umat[mask, nctot : nn + nctot] = umats[ct] nctot += nn return umat
[docs] def get_noise_weights(self, toas: TOAs, nweights: int = None) -> np.ndarray: """Return the ECORR weights The weights used are the square of the ECORR values. """ ecorrs = self.get_ecorrs() if nweights is None: ts = (toas.table["tdbld"].quantity * u.day).to(u.s).value nweights = [ get_ecorr_nweights(ts[ec.select_toa_mask(toas)]) for ec in ecorrs ] nc = sum(nweights) weights = np.zeros(nc) nctot = 0 for ec, nn in zip(ecorrs, nweights): weights[nctot : nn + nctot] = ec.quantity.to(u.s).value ** 2 nctot += nn return weights
[docs] def ecorr_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: """Return a quantization matrix and ECORR weights. A quantization matrix maps TOAs to observing epochs. The weights used are the square of the ECORR values. """ return (self.get_noise_basis(toas), self.get_noise_weights(toas))
[docs] def ecorr_cov_matrix(self, toas: TOAs) -> np.ndarray: """Full ECORR covariance matrix.""" U, Jvec = self.ecorr_basis_weight_pair(toas) return np.dot(U * Jvec[None, :], U.T)
[docs]class PLDMNoise(CorrelatedNoiseComponent): """Model of DM variations as radio frequency-dependent noise with a power-law spectrum. Variations in DM over time result from both the proper motion of the pulsar and the changing electron number density along the line of sight from the solar wind and ISM. In particular, Kolmogorov turbulence in the ionized ISM will induce stochastic DM variations with a power law spectrum. Timing errors due to unmodelled DM variations can therefore appear very similar to intrinsic red noise, however the amplitude of these variations will scale with the inverse of the square of the (Earth Doppler corrected) radio frequency. Parameters supported: .. paramtable:: :class: pint.models.noise_model.PLDMNoise References ---------- - Lentati et al. 2014, MNRAS 437(3), 3004-3023 [1]_ .. [1] https://ui.adsabs.harvard.edu/abs/2014MNRAS.437.3004L/abstract """ register = True category = "pl_DM_noise" introduces_dm_errors = True is_time_correlated = True def __init__( self, ): super().__init__() self.add_param( floatParameter( name="TNDMAMP", units="", aliases=[], description="Amplitude of powerlaw DM noise in tempo2 format", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="TNDMGAM", units="", aliases=[], description="Spectral index of powerlaw DM noise in tempo2 format", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( intParameter( name="TNDMC", units="", aliases=[], description="Number of DM noise frequencies.", ) ) self.add_param( intParameter( name="TNDMFLOG", units="", description="Number of logarithmically spaced DM noise frequencies in the basis.", ) ) self.add_param( floatParameter( name="TNDMFLOG_FACTOR", units="", description="Scaling factor for the log-spaced DM frequencies (2 -> [1/8, 1/4, 1/2, ...]).", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="TNDMTSPAN", units="year", description="Time span corresponding to the fundamental frequency of the DM noise Fourier series (data span is used by default).", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.covariance_matrix_funcs += [self.pl_dm_cov_matrix] self.basis_funcs += [self.pl_dm_basis_weight_pair]
[docs] def get_plc_vals(self) -> Tuple[float, float, int, int, float]: """ Retrieve power-law parameters and frequency-basis parameters from the model, substituting defaults if unspecified. """ n_lin = int(self.TNDMC.value) if self.TNDMC.value is not None else 30 n_log = int(self.TNDMFLOG.value) if (self.TNDMFLOG.value is not None) else None dm_log_factor = ( self.TNDMFLOG_FACTOR.value if (self.TNDMFLOG_FACTOR.value is not None) else 2 ) amp, gam = 10**self.TNDMAMP.value, self.TNDMGAM.value f_min_ratio = 1 / (dm_log_factor**n_log) if n_log is not None else 1 return amp, gam, n_lin, n_log, f_min_ratio
[docs] def get_time_frequencies(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: """Return the frequencies of the noise model""" tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value T = ( np.max(t) - np.min(t) if self.TNDMTSPAN.quantity is None else self.TNDMTSPAN.quantity ) (_, _, n_lin, n_log, f_min_ratio) = self.get_plc_vals() f_min = f_min_ratio / T return t, get_rednoise_freqs( t, n_lin, Tspan=T, logmode=0, f_min=f_min, nlog=n_log )
[docs] def get_noise_basis(self, toas: TOAs) -> np.ndarray: """Return a Fourier design matrix for DM noise. See the documentation for pl_dm_basis_weight_pair function for details.""" t, f = self.get_time_frequencies(toas) Fmat = create_fourier_design_matrix(t, f) freqs = self._parent.barycentric_radio_freq(toas).to(u.MHz) fref = 1400 * u.MHz D = (fref.value / freqs.value) ** 2 return Fmat * D[:, None]
[docs] def get_noise_weights(self, toas: TOAs) -> np.ndarray: """Return power law DM noise weights. See the documentation for pl_dm_basis_weight_pair for details.""" (amp, gam, _, _, _) = self.get_plc_vals() _, f = self.get_time_frequencies(toas) df = np.diff(np.concatenate([[0], f])) return powerlaw(f.repeat(2), amp, gam) * df.repeat(2)
[docs] def pl_dm_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: """Return a Fourier design matrix and power law DM noise weights. A Fourier design matrix contains the sine and cosine basis_functions in a Fourier series expansion. Here we scale the design matrix by (fref/f)**2, where fref = 1400 MHz to match the convention used in enterprise. The weights used are the power-law PSD values at frequencies n/T, where n is in [1, TNDMC] and T is the total observing duration of the dataset. """ return (self.get_noise_basis(toas), self.get_noise_weights(toas))
def pl_dm_cov_matrix(self, toas: TOAs) -> np.ndarray: Fmat, phi = self.pl_dm_basis_weight_pair(toas) return np.dot(Fmat * phi[None, :], Fmat.T)
[docs]class PLSWNoise(CorrelatedNoiseComponent): """Model of solar wind DM variations as radio frequency-dependent noise with a power-law spectrum. Commonly used as perturbations on top of a deterministic solar wind model. Parameters supported: .. paramtable:: :class: pint.models.noise_model.PLSWNoise References ---------- - Lentati et al. 2014, MNRAS 437(3), 3004-3023 [1]_ - van Haasteren & Vallisneri, 2014, MNRAS 446(2), 1170-1174 [2]_ - Hazboun et al. 2022, APJ, Volume 929, Issue 1, id.39, 11 pp. [3]_ - Susurla et al. 2024, A&A, Volume 692, id.A18, 18 pp.[4]_ .. [1] https://ui.adsabs.harvard.edu/abs/2014MNRAS.437.3004L/abstract .. [2] https://ui.adsabs.harvard.edu/abs/2015MNRAS.446.1170V/abstract .. [3] https://ui.adsabs.harvard.edu/abs/2022ApJ...929...39H/abstract .. [4] https://ui.adsabs.harvard.edu/abs/2024A%26A...692A..18S/abstract """ register = True category = "pl_SW_noise" introduces_dm_errors = True is_time_correlated = True def __init__( self, ): super().__init__() self.add_param( floatParameter( name="TNSWAMP", units="", aliases=[], description="Amplitude of power-law SW DM noise in tempo2 format", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="TNSWGAM", units="", aliases=[], description="Spectral index of power-law " "SW DM noise in tempo2 format", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="TNSWC", units="", aliases=[], description="Number of SW DM noise frequencies.", convert_tcb2tdb=False, ) ) self.add_param( floatParameter( name="TNSWFLOG", units="", description="Number of logarithmically solar wind frequencies in the basis.", convert_tcb2tdb=False, ) ) self.add_param( floatParameter( name="TNSWFLOG_FACTOR", units="", description="Scaling factor for the log-spaced solar wind frequencies (2 -> [1/8,1/4,1/2,...])", convert_tcb2tdb=False, ) ) self.covariance_matrix_funcs += [self.pl_sw_cov_matrix] self.basis_funcs += [self.pl_sw_basis_weight_pair]
[docs] def get_plc_vals(self) -> Tuple[float, float, int, int, float]: """ Retrieve power-law parameters and frequency-basis parameters from the model, substituting defaults if unspecified. """ n_lin = int(self.TNSWC.value) if self.TNSWC.value is not None else 100 n_log = int(self.TNSWFLOG.value) if (self.TNSWFLOG.value is not None) else None sw_log_factor = ( self.TNSWFLOG_FACTOR.value if (self.TNSWFLOG_FACTOR.value is not None) else 2 ) amp, gam = 10**self.TNSWAMP.value, self.TNSWGAM.value f_min_ratio = 1 / (sw_log_factor**n_log) if n_log is not None else 1 return amp, gam, n_lin, n_log, f_min_ratio
[docs] def get_time_frequencies(self, toas: TOAs) -> np.ndarray: """Return the frequencies of the noise model""" tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value T = np.max(t) - np.min(t) (_, _, n_lin, n_log, f_min_ratio) = self.get_plc_vals() f_min = f_min_ratio / T return t, get_rednoise_freqs( t, n_lin, Tspan=T, logmode=0, f_min=f_min, nlog=n_log )
[docs] def get_noise_basis(self, toas: TOAs) -> np.ndarray: """Return a Fourier design matrix for SW DM noise. See the documentation for pl_sw_basis_weight_pair function for details.""" freqs = self._parent.barycentric_radio_freq(toas).to(u.MHz) # get the achromatic Fourier design matrix t, f = self.get_time_frequencies(toas) Fmat = create_fourier_design_matrix(t, f) # get solar wind geometry from pint.models.solar_wind_dispersion.SolarWindDispersion solar_wind_geometry = self._parent.solar_wind_geometry(toas) # since this is the SW DM value if n_earth = 1 cm^-3. the GP will scale it. dt_DM = (solar_wind_geometry * DMconst / (freqs**2)).value return Fmat * dt_DM[:, None]
[docs] def get_noise_weights(self, toas: TOAs) -> np.ndarray: """Return power law SW noise weights. See the documentation for pl_sw_basis_weight_pair for details.""" (amp, gam, _, _, _) = self.get_plc_vals() _, f = self.get_time_frequencies(toas) df = np.diff(np.concatenate([[0], f])) return powerlaw(f.repeat(2), amp, gam) * df.repeat(2)
[docs] def pl_sw_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: """Return a Fourier design matrix and power law SW noise weights. A Fourier design matrix contains the sine and cosine basis_functions in a Fourier series expansion. Here we scale the design matrix by (fref/f)**2 and a geometric factor where fref = 1400 MHz to match the convention used in enterprise. The weights used are the power-law PSD values at frequencies n/T, where n is in [1, TNSWC] and T is the total observing duration of the dataset. """ return (self.get_noise_basis(toas), self.get_noise_weights(toas))
def pl_sw_cov_matrix(self, toas: TOAs) -> np.ndarray: Fmat, phi = self.pl_sw_basis_weight_pair(toas) return np.dot(Fmat * phi[None, :], Fmat.T)
[docs]class PLChromNoise(CorrelatedNoiseComponent): """Model of a radio frequency-dependent noise with a power-law spectrum and arbitrary chromatic index. Such variations are usually attributed to time-variable scattering in the ISM. Scattering smears/broadens the shape of the pulse profile by convolving it with a transfer function that is determined by the geometry and electron distribution in the scattering screen(s). The scattering timescale is typically a decreasing function of the observing frequency. Scatter broadening causes systematic offsets in the TOA measurements due to the pulse shape mismatch. While this offset need not be a simple function of frequency, it has been often modeled using a delay that is proportional to f^-alpha where alpha is known as the chromatic index. This model should be used in combination with the ChromaticCM model. Parameters supported: .. paramtable:: :class: pint.models.noise_model.PLChromNoise References ---------- - Lentati et al. 2014, MNRAS 437(3), 3004-3023 [1]_ - van Haasteren & Vallisneri, 2014, MNRAS 446(2), 1170-1174 [2]_ .. [1] https://ui.adsabs.harvard.edu/abs/2014MNRAS.437.3004L/abstract .. [2] https://ui.adsabs.harvard.edu/abs/2015MNRAS.446.1170V/abstract """ register = True category = "pl_chrom_noise" is_time_correlated = True def __init__( self, ): super().__init__() self.add_param( floatParameter( name="TNCHROMAMP", units="", aliases=[], description="Amplitude of powerlaw chromatic noise in tempo2 format", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="TNCHROMGAM", units="", aliases=[], description="Spectral index of powerlaw chromatic noise in tempo2 format", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( intParameter( name="TNCHROMC", units="", aliases=[], description="Number of chromatic noise frequencies.", ) ) self.add_param( intParameter( name="TNCHROMFLOG", units="", description="Number of logarithmically spaced chromatic noise frequencies in the basis.", ) ) self.add_param( floatParameter( name="TNCHROMFLOG_FACTOR", units="", description="Scaling factor for the log-spaced chromatic frequencies (2 -> [1/8,1/4,1/2,...])", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="TNCHROMTSPAN", units="year", description="Time span corresponding to the fundamental frequency of the chromatic noise Fourier series (data span is used by default).", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.covariance_matrix_funcs += [self.pl_chrom_cov_matrix] self.basis_funcs += [self.pl_chrom_basis_weight_pair]
[docs] def get_plc_vals(self) -> Tuple[float, float, int, int, float]: """ Retrieve power-law parameters and frequency-basis parameters from the model, substituting defaults if unspecified. """ n_lin = int(self.TNCHROMC.value) if self.TNCHROMC.value is not None else 30 n_log = ( int(self.TNCHROMFLOG.value) if (self.TNCHROMFLOG.value is not None) else None ) chrom_log_factor = ( self.TNCHROMFLOG_FACTOR.value if (self.TNCHROMFLOG_FACTOR.value is not None) else 2 ) amp, gam = 10**self.TNCHROMAMP.value, self.TNCHROMGAM.value f_min_ratio = 1 / (chrom_log_factor**n_log) if n_log is not None else 1 return amp, gam, n_lin, n_log, f_min_ratio
[docs] def get_time_frequencies(self, toas: TOAs) -> np.ndarray: """Return the frequencies of the noise model""" tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value T = ( np.max(t) - np.min(t) if self.TNCHROMTSPAN.quantity is None else self.TNCHROMTSPAN.quantity ) (_, _, n_lin, n_log, f_min_ratio) = self.get_plc_vals() f_min = f_min_ratio / T return t, get_rednoise_freqs( t, n_lin, Tspan=T, logmode=0, f_min=f_min, nlog=n_log )
[docs] def get_noise_basis(self, toas: TOAs) -> np.ndarray: """Return a Fourier design matrix for chromatic noise. See the documentation for pl_chrom_basis_weight_pair function for details.""" t, f = self.get_time_frequencies(toas) Fmat = create_fourier_design_matrix(t, f) freqs = self._parent.barycentric_radio_freq(toas).to(u.MHz) fref = 1400 * u.MHz alpha = self._parent.TNCHROMIDX.value D = (fref.value / freqs.value) ** alpha return Fmat * D[:, None]
[docs] def get_noise_weights(self, toas: TOAs) -> np.ndarray: """Return power law chromatic noise weights. See the documentation for pl_chrom_basis_weight_pair for details.""" (amp, gam, _, _, _) = self.get_plc_vals() _, f = self.get_time_frequencies(toas) df = np.diff(np.concatenate([[0], f])) return powerlaw(f.repeat(2), amp, gam) * df.repeat(2)
[docs] def pl_chrom_basis_weight_pair(self, toas: TOAs) -> np.ndarray: """Return a Fourier design matrix and power law chromatic noise weights. A Fourier design matrix contains the sine and cosine basis_functions in a Fourier series expansion. Here we scale the design matrix by (fref/f)**2, where fref = 1400 MHz to match the convention used in enterprise. The weights used are the power-law PSD values at frequencies n/T, where n is in [1, TNCHROMC] and T is the total observing duration of the dataset. """ return (self.get_noise_basis(toas), self.get_noise_weights(toas))
def pl_chrom_cov_matrix(self, toas: TOAs) -> np.ndarray: Fmat, phi = self.pl_chrom_basis_weight_pair(toas) return np.dot(Fmat * phi[None, :], Fmat.T)
[docs]class PLRedNoise(CorrelatedNoiseComponent): """Timing noise with a power-law spectrum. Over the long term, pulsars are observed to experience timing noise dominated by low frequencies. This can occur, for example, if the torque on the pulsar varies randomly. If the torque experiences white noise, the phase we observe will experience "red" noise, that is noise dominated by the lowest frequency. This results in errors that are correlated between TOAs over fairly long time spans. Parameters supported: .. paramtable:: :class: pint.models.noise_model.PLRedNoise References ---------- - Lentati et al. 2014, MNRAS 437(3), 3004-3023 [1]_ - van Haasteren & Vallisneri, 2014, MNRAS 446(2), 1170-1174 [2]_ .. [1] https://ui.adsabs.harvard.edu/abs/2014MNRAS.437.3004L/abstract .. [2] https://ui.adsabs.harvard.edu/abs/2015MNRAS.446.1170V/abstract """ register = True category = "pl_red_noise" is_time_correlated = True def __init__( self, ): super().__init__() self.add_param( floatParameter( name="RNAMP", units="", aliases=[], description="Amplitude of powerlaw red noise.", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="RNIDX", units="", aliases=[], description="Spectral index of powerlaw red noise.", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="TNREDAMP", units="", aliases=[], description="Amplitude of powerlaw red noise in tempo2 format", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="TNREDGAM", units="", aliases=[], description="Spectral index of powerlaw red noise in tempo2 format", convert_tcb2tdb=False, ) ) self.add_param( intParameter( name="TNREDC", units="", aliases=[], description="Number of red noise frequencies.", ) ) self.add_param( intParameter( name="TNREDFLOG", units="", description="Number of logarithmically spaced red noise frequencies in the basis.", ) ) self.add_param( floatParameter( name="TNREDFLOG_FACTOR", units="", description="Scaling factor for the log-spaced frequencies (2 -> [1/8,1/4,1/2,...])", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.add_param( floatParameter( name="TNREDTSPAN", units="year", description="Time span corresponding to the fundamental frequency of the achromatic red noise Fourier series (data span is used by default).", convert_tcb2tdb=True, tcb2tdb_scale_factor=1, ) ) self.covariance_matrix_funcs += [self.pl_rn_cov_matrix] self.basis_funcs += [self.pl_rn_basis_weight_pair]
[docs] def get_plc_vals(self) -> Tuple[float, float, int, int, float]: """ Retrieve power-law parameters and frequency-basis parameters from the model, substituting defaults if unspecified. """ n_lin = int(self.TNREDC.value) if self.TNREDC.value is not None else 30 n_log = ( int(self.TNREDFLOG.value) if (self.TNREDFLOG.value is not None) else None ) red_log_factor = ( self.TNREDFLOG_FACTOR.value if (self.TNREDFLOG_FACTOR.value is not None) else 2 ) if self.TNREDAMP.value is not None and self.TNREDGAM.value is not None: amp, gam = 10**self.TNREDAMP.value, self.TNREDGAM.value elif self.RNAMP.value is not None and self.RNIDX is not None: fac = (86400.0 * 365.24 * 1e6) / (2.0 * np.pi * np.sqrt(3.0)) amp, gam = self.RNAMP.value / fac, -1 * self.RNIDX.value f_min_ratio = 1 / (red_log_factor**n_log) if n_log is not None else 1 return amp, gam, n_lin, n_log, f_min_ratio
[docs] def get_time_frequencies(self, toas: TOAs) -> np.ndarray: """Return the frequencies of the noise model""" tbl = toas.table t = (tbl["tdbld"].quantity * u.day).to(u.s).value T = ( np.max(t) - np.min(t) if self.TNREDTSPAN.quantity is None else self.TNREDTSPAN.quantity ) (_, _, n_lin, n_log, f_min_ratio) = self.get_plc_vals() f_min = f_min_ratio / T return t, get_rednoise_freqs( t, n_lin, Tspan=T, logmode=0, f_min=f_min, nlog=n_log )
[docs] def get_noise_basis(self, toas: TOAs) -> np.ndarray: """Return a Fourier design matrix for red noise. See the documentation for pl_rn_basis_weight_pair function for details.""" t, f = self.get_time_frequencies(toas) Fmat = create_fourier_design_matrix(t, f) return Fmat
[docs] def get_noise_weights(self, toas: TOAs) -> np.ndarray: """Return power law red noise weights. See the documentation for pl_rn_basis_weight_pair for details.""" (amp, gam, _, _, _) = self.get_plc_vals() _, f = self.get_time_frequencies(toas) df = np.diff(np.concatenate([[0], f])) return powerlaw(f.repeat(2), amp, gam) * df.repeat(2)
[docs] def pl_rn_basis_weight_pair(self, toas: TOAs) -> Tuple[np.ndarray, np.ndarray]: """Return a Fourier design matrix and power law red noise weights. A Fourier design matrix contains the sine and cosine basis_functions in a Fourier series expansion. The weights used are the power-law PSD values at frequencies n/T, where n is in [1, TNREDC] and T is the total observing duration of the dataset. """ return (self.get_noise_basis(toas), self.get_noise_weights(toas))
def pl_rn_cov_matrix(self, toas: TOAs) -> np.ndarray: Fmat, phi = self.pl_rn_basis_weight_pair(toas) return np.dot(Fmat * phi[None, :], Fmat.T)
[docs]def get_ecorr_epochs(toas_table: np.ndarray, dt: float = 1, nmin: int = 2) -> List[int]: """Find only epochs with more than 1 TOA for applying ECORR.""" if len(toas_table) == 0: return [] isort = np.argsort(toas_table) bucket_ref = [toas_table[isort[0]]] bucket_ind = [[isort[0]]] for i in isort[1:]: if toas_table[i] - bucket_ref[-1] < dt: bucket_ind[-1].append(i) else: bucket_ref.append(toas_table[i]) bucket_ind.append([i]) return [ind for ind in bucket_ind if len(ind) >= nmin]
[docs]def get_ecorr_nweights(toas_table: np.ndarray, dt: float = 1, nmin: int = 2) -> int: """Get the number of epochs associated with each ECORR. This is equal to the number of weights of that ECORR.""" return len(get_ecorr_epochs(toas_table, dt=dt, nmin=nmin))
[docs]def create_ecorr_quantization_matrix( toas_table: np.ndarray, dt: float = 1, nmin: int = 2 ) -> np.ndarray: """Create quantization matrix mapping TOAs to observing epochs. Only epochs with more than 1 TOA are included.""" bucket_ind2 = get_ecorr_epochs(toas_table, dt=dt, nmin=nmin) U = np.zeros((len(toas_table), len(bucket_ind2)), "d") for i, l in enumerate(bucket_ind2): U[l, i] = 1 return U
[docs]def get_rednoise_freqs( t, nmodes: int, Tspan: Optional[u.Quantity] = None, logmode: Optional[int] = None, f_min: Optional[float] = None, nlog: Optional[int] = None, ) -> np.ndarray: """ Compute an array of red-noise frequencies, optionally mixing log- and linearly spaced frequencies. If log-spaced parameters (`logmode`, `f_min`, `nlog`) are provided and valid, this function will prepend `nlog` log-spaced frequencies and then append `nmodes` linearly spaced frequencies. Otherwise, it uses purely linear spacing for `nmodes` frequencies. :param nmodes: int Number of linear frequency modes (if using purely linear spacing). If log-spacing is used, these will be the number of linear modes appended after the log-spaced part. :param Tspan: float, optional Span of the data in seconds. If None, but `t` is provided, it is taken as `max(t) - min(t)`. :param t: array-like, optional Vector of time series (TOAs) in seconds. Only required if `Tspan` is None, so we can calculate `Tspan` internally. :param logmode: int, optional The linear mode index at which to switch to log spacing. If < 0 or None, the function reverts to purely linear spacing. Must be >= 0 for log modes. :param f_min: float, optional Minimum frequency for log spacing, expressed as a fraction of 1/Tspan. Only used if logmode >= 0. :param nlog: int, optional Number of log-spaced frequencies. Only used if logmode >= 0. :return: freqs : ndarray Frequencies array of length either `nmodes` (linear-only) or `(nlog + nmodes)` (log + linear). """ if Tspan is None: if t is None: raise ValueError("Must provide either Tspan or t.") Tspan = np.max(t) - np.min(t) def _get_linear_freqs(n_lin, T): """ Return an array of n_lin linearly spaced frequencies: [1/T, 2/T, ..., n_lin/T]. """ return np.arange(1, n_lin + 1) / T def _get_loglin_freqs(logmode_, f_min_, n_log, n_lin, T): """ Return an array of n_log log-spaced frequencies from f_min_ up to (1+logmode_)/T, then append n_lin linearly spaced frequencies from (1+logmode_)/T onward. """ if logmode_ < 0: raise ValueError("Cannot do log-spacing when logmode < 0.") # Linear portion df_lin = 1.0 / T f_min_lin = (1.0 + logmode_) / T f_lin = np.linspace(f_min_lin, f_min_lin + (n_lin - 1) * df_lin, n_lin) # Log portion f_log = np.logspace( np.log10(f_min_), np.log10((1 + logmode_) / T), n_log, endpoint=False ) # Combine log + linear return np.concatenate((f_log, f_lin)) have_logmode = logmode is not None and logmode >= 0 have_nlog = nlog is not None and nlog > 0 have_fmin = f_min is not None and f_min > 0 use_log = all([have_logmode, have_nlog, have_fmin]) if not use_log and np.any([have_logmode, have_nlog, have_fmin]): log.warning( "Log-spaced parameters are ignored because " "logmode, nlog, and f_min ALL neeed to be set" "Use: logmode > 0 and nlog > 0 and f_min > 0." ) if not use_log: # Purely linear spacing: nmodes frequencies freqs = _get_linear_freqs(nmodes, Tspan) else: # Log + linear: nlog log-freqs + nmodes linear-freqs freqs = _get_loglin_freqs(logmode, f_min, nlog, nmodes, Tspan) return freqs
[docs]def create_fourier_design_matrix(t, f) -> np.ndarray: """ Construct a Fourier design matrix from a given set of frequencies. The matrix alternates sine and cosine columns. :param t: array-like Vector of time series (TOAs) in seconds. :param f: array-like Array of frequencies (e.g., from get_rednoise_freqs). :return: F : ndarray Fourier design matrix of shape (len(t), 2 * len(f)). freqs : ndarray The same frequencies array `f` is returned for convenience. """ t = np.asarray(t) f = np.asarray(f) N = len(t) nfreqs = len(f) # Initialize design matrix F = np.zeros((N, 2 * nfreqs)) # Fill sine (even columns) and cosine (odd columns) F[:, 0::2] = np.sin(2.0 * np.pi * t[:, None] * f) F[:, 1::2] = np.cos(2.0 * np.pi * t[:, None] * f) return F
[docs]def powerlaw( f, A: float = 1e-16, gamma: float = 5.0, f_low_cut: Optional[float] = None ): """Power-law PSD. :param f: Sampling frequencies :param A: Amplitude of red noise [GW units] :param gamma: Spectral index of red noise process :param f_low_cut: Minimum frequency to include [Hz] :return: Power spectral density """ f_low_cut = f_low_cut if f_low_cut is not None else np.min(f) above_fl = np.array(f >= f_low_cut, dtype=float) fyr = (1 / u.year).to_value(u.Hz) return A**2 / 12.0 / np.pi**2 * fyr ** (gamma - 3) * f ** (-gamma) * above_fl