Source code for pint.models.troposphere_delay

"""Delay due to Earth's troposphere"""

import astropy.constants as const
import astropy.units as u
import numpy as np
import scipy.interpolate
from astropy.coordinates import AltAz, SkyCoord
from loguru import logger as log

from pint.models.parameter import boolParameter
from pint.models.timing_model import DelayComponent
from pint.observatory import get_observatory
from pint.observatory.topo_obs import TopoObs


[docs]class TroposphereDelay(DelayComponent): """Model for accounting for the troposphere delay for topocentric TOAs. Based on Davis zenith hydrostatic delay (Davis et al., 1985, Appendix A) Niell Mapping Functions (Niell, 1996, Eq 4) additional altitude correction to atmospheric pressure from CRC Handbook Chapter 14 page 19 "US Standard Atmosphere" The Zenith delay is the actual time delay for radio waves arriving directly overhead the observatory. The mapping function is a dimensionless number that scales the zenith delay to recover the correct delay for sources anywhere else in the sky (closer to the horizon). The hydrostatic delay is best described as the relatively non-changing component to the delay, depending primarily on atmospheric pressure. The wet delay represents changes due to dynamical variation in the atmosphere (ie changing water vapor) and is usually around 10% of the hydrostatic delay Parameters supported: .. paramtable:: :class: pint.models.troposphere_delay.TroposphereDelay """ register = True category = "troposphere" # is this the correct category? # zero padding will provide constant within 15degrees of the poles or equator A_AVG = ( np.array([0.0, 1.2769934, 1.2683230, 1.2465397, 1.2196049, 1.2045996, 0.0]) * 1e-3 ) B_AVG = ( np.array([0.0, 2.9153695, 2.9152299, 2.9288445, 2.9022565, 2.9024912, 0.0]) * 1e-3 ) C_AVG = ( np.array([0.0, 62.610505, 62.837393, 63.721774, 63.824265, 64.258455, 0.0]) * 1e-3 ) A_AMP = np.array([0.0, 0.0, 1.2709626, 2.6523662, 3.4000452, 4.1202191, 0.0]) * 1e-5 B_AMP = np.array([0.0, 0.0, 2.1414979, 3.0160779, 7.2562722, 11.723375, 0.0]) * 1e-5 C_AMP = np.array([0.0, 0.0, 9.0128400, 4.3497037, 84.795348, 170.37206, 0.0]) * 1e-5 A_HT = 2.53e-5 B_HT = 5.49e-3 C_HT = 1.14e-3 AW = ( np.array([0.0, 5.8021897, 5.6794847, 5.8118019, 5.9727542, 6.1641693, 0.0]) * 1e-4 ) BW = ( np.array([0.0, 1.4275268, 1.5138625, 1.4572752, 1.5007428, 1.7599082, 0.0]) * 1e-3 ) CW = ( np.array([0.0, 4.3472961, 4.6729510, 4.3908931, 4.4626982, 5.4736038, 0.0]) * 1e-2 ) LAT = np.array([0, 15, 30, 45, 60, 75, 90]) * u.deg # in degrees DOY_OFFSET = -28 # add this into the MJD value to get the right phase EARTH_R = 6356766 * u.m # earth radius at 45 degree latitude @staticmethod def _herring_map(alt, a, b, c): """equation 4 from the Niell mapping function. It is a modification to the plane-parallel atmosphere model (1 / sin(alt)) The coefficients a b and c provide the correction for the correct map near the horizon, while still producing the correct mapping at zenith (1) """ sinAlt = np.sin(alt) return 1 / ( (1 / (1 + a / (1 + b / (1 + c)))) / (1 / (sinAlt + a / (sinAlt + b / (sinAlt + c)))) ) def __init__(self): super().__init__() self.add_param( boolParameter( name="CORRECT_TROPOSPHERE", value="Y", description="Enable Troposphere Delay Model", ) ) self.delay_funcs_component += [self.troposphere_delay] # copy over the arrays to provide constant values within 15 deg # of the poles and equator for array in [ self.A_AVG, self.B_AVG, self.C_AVG, self.A_AMP, self.B_AMP, self.C_AMP, self.AW, self.BW, self.CW, ]: array[0] = array[1] array[-1] = array[-2] def _get_target_altitude(self, obs, grp, radec): """convert the sky coordinates of the target to the angular altitude at each TOA""" transformAltaz = AltAz(location=obs, obstime=grp["mjd"]) alt = radec.transform_to(transformAltaz).alt # * u.deg return alt def _get_target_skycoord(self): """return the sky coordinates for the target, either from equatorial or ecliptic coordinates""" try: radec = SkyCoord( self._parent.RAJ.value * self._parent.RAJ.units, self._parent.DECJ.value * self._parent.DECJ.units, ) # just do this once instead of adjusting over time except AttributeError: radec = SkyCoord( self._parent.ELONG.value * self._parent.ELONG.units, self._parent.ELAT.value * self._parent.ELAT.units, frame="barycentricmeanecliptic", ) return radec
[docs] def troposphere_delay(self, toas, acc_delay=None): """This is the main function for the troposphere delay. Pass in the TOAs and it will calculate the delay for each TOA, accounting for the observatory location, target coordinates, and time of observation """ tbl = toas.table delay = np.zeros(len(tbl)) # if not correcting for troposphere, return the default zero delay if self.CORRECT_TROPOSPHERE.value: radec = self._get_target_skycoord() # the only python for loop is to iterate through the unique observatory locations # all other math is computed through numpy for key, grp in toas.get_obs_groups(): obsobj = get_observatory(key) # exclude non topocentric observations if not isinstance(obsobj, TopoObs): log.debug( f"Skipping Troposphere delay for non Topocentric TOA: {obsobj.name}" ) continue obs = obsobj.earth_location_itrf() alt = self._get_target_altitude(obs, tbl[grp], radec) # now actually calculate the atmospheric delay based on the models delay[grp] = self.delay_model( alt, obs.lat, obs.height, tbl[grp]["tdbld"] ) return delay * u.s
def _validate_altitudes(self, alt, obs=""): """This method checks if any of the TOAs occur at invalid altitudes for example, if the pulsar position is incorrect, it would likely result in negative altitudes. To correct for this is two steps: first make a numpy boolean array to store whether each TOA is valid or not, to let me know for later. The boolean array is returned at the end of the function. Then, to allow for fast numpy math, correct the individual invalid TOAs to make them appear at the zenith, then afterwards make that part of the delay be zero. This altitude correction is applied to the alt numpy array passed in as argument optionally pass obs to list which observatory the invalid altitudes are from This has been tested and it does work, even though it's slightly convoluted """ isPositive = np.greater_equal(alt, 0 * u.deg) isLessThan90 = np.less_equal(alt, 90 * u.deg) isValid = np.logical_and(isPositive, isLessThan90) # now make corrections to alt based on the valid status # if not valid, make them appear at the zenith to make the math sensible if not np.all(isValid): # it's probably helpful to count how many are invalid numInvalid = len(isValid) - np.count_nonzero(isValid) message = "Invalid altitude calculated for %i TOAS" % numInvalid if obs: message += f" from observatory {obs}" log.warning(message) # now correct the values # first make the invalid altitudes zeros alt *= isValid # multiply valids by 1, else make zero alt += ( 90 * u.deg * np.logical_not(isValid) ) # increase the invalid ones to 90 deg (zenith) # this will prevent unexpected behavior from occurring for negative altitudes return isValid
[docs] def delay_model(self, alt, lat, H, mjd): """validate the observed altitudes, then combine dry and wet delays""" # make sure the altitudes are reasonable values, warn if not altIsValid = self._validate_altitudes(alt) delay = self.zenith_delay(lat, H.to(u.km)) * self.mapping_function( alt, lat, H, mjd ) + self.wet_zenith_delay() * self.wet_map(alt, lat) # modify the delay if any of the altitudes are invalid if not np.all(altIsValid): delay *= altIsValid # this will make the invalid delays zero return delay
[docs] def pressure_from_altitude(self, H): """From CRC Handbook Chapter 14 page 19 US Standard Atmosphere""" gph = self.EARTH_R * H / (self.EARTH_R + H) # geopotential height if gph > 11 * u.km: log.warning("Pressure approximation invalid for elevations above 11 km") T = 288.15 - 0.0065 * H.to(u.m).value # temperature lapse return 101.325 * (288.15 / T) ** -5.25575 * u.kPa
[docs] def zenith_delay(self, lat, H): """Calculate the hydrostatic zenith delay""" p = self.pressure_from_altitude(H) return (p / (43.921 * u.kPa)) / ( const.c.value * (1 - 0.00266 * np.cos(2 * lat) - 0.00028 * H.value) )
[docs] def wet_zenith_delay(self): """calculate the wet delay at zenith""" return 0.0 # this method will be updated in the future to
# either allow explicit specification of the wet zenith delay # or approximate it from weather data # default for TEMPO2 is zero wet delay if not specified def _coefficient_func(self, average, amplitudes, yearFraction): """from the Niell mapping function with annual variations""" return average + amplitudes * np.cos(2 * np.pi * yearFraction) def _find_latitude_index(self, lat): """find the index corresponding to the upper bound on latitude for nearest neighbor interpolation in the mapping function """ absLat = np.abs(lat) for lInd in range(1, len(self.LAT)): if absLat <= self.LAT[lInd]: return lInd - 1 # else this is an invalid latitude... huh? raise ValueError(f"Invaid latitude: {lat} must be between -90 and 90 degrees")
[docs] def mapping_function(self, alt, lat, H, mjd): """this implements the Niell mapping function for hydrostatic delays""" yearFraction = self._get_year_fraction_fast(mjd, lat) """ according to Niell, the way to use latitude interpolation is to interpolate between the nearest definite latitude coefficient functions. So that means I need to calculate the function, then I can go back and interpolate between the results. I figure the easiest way to do this will be to calculate the function on the entire array """ # first I need to find the nearest latitude neighbors latIndex = self._find_latitude_index(lat) aNeighbors = np.array( [ self._coefficient_func(self.A_AVG[i], self.A_AMP[i], yearFraction) for i in range(latIndex, latIndex + 2) ] ).transpose() bNeighbors = np.array( [ self._coefficient_func(self.B_AVG[i], self.B_AMP[i], yearFraction) for i in range(latIndex, latIndex + 2) ] ).transpose() cNeighbors = np.array( [ self._coefficient_func(self.C_AVG[i], self.C_AMP[i], yearFraction) for i in range(latIndex, latIndex + 2) ] ).transpose() # now time to interpolate between them latNeighbors = self.LAT[latIndex : latIndex + 2] a = self._interp(np.abs(lat), latNeighbors, aNeighbors) b = self._interp(np.abs(lat), latNeighbors, bNeighbors) c = self._interp(np.abs(lat), latNeighbors, cNeighbors) # the base mapping function baseMap = self._herring_map(alt, a, b, c) # now add in the mapping correction based on height fcorrection = self._herring_map(alt, self.A_HT, self.B_HT, self.C_HT) return baseMap + (1 / np.sin(alt) - fcorrection) * H.to(u.km).value
[docs] def wet_map(self, alt, lat): """This is very similar to the normal mapping function except it uses different coefficients. In addition, there is no height correction. From Niell (1996): "This does not apply to the wet mapping function since the water vapor is not in hydrostatic equilibrium, and the height distribution of the water vapor is not expected to be predictable from the station height" """ latIndex = self._find_latitude_index(lat) # latitude dependent aNeighbors = self.AW[latIndex : latIndex + 2] bNeighbors = self.BW[latIndex : latIndex + 2] cNeighbors = self.CW[latIndex : latIndex + 2] latNeighbors = self.LAT[latIndex : latIndex + 2] a = self._interp(np.abs(lat), latNeighbors, aNeighbors) b = self._interp(np.abs(lat), latNeighbors, bNeighbors) c = self._interp(np.abs(lat), latNeighbors, cNeighbors) return self._herring_map(alt, a, b, c)
@staticmethod def _interp(x, xn, yn): """vectorized 1d interpolation for 2 points only""" # return (x - xn[0]) * (yn[1] - yn[0]) / (xn[1] - xn[0]) + yn[0] f = scipy.interpolate.interp1d(xn, yn) return f(x) def _get_year_fraction_slow(self, mjd, lat): """ use python for loop and astropy to calculate the year fraction but it's more slow because of the looping """ seasonOffset = 0.5 if lat < 0 else 0.0 return np.array( [(i.jyear + seasonOffset + self.DOY_OFFSET / 365.25) % 1.0 for i in mjd] ) def _get_year_fraction_fast(self, tdbld, lat): """ use numpy array arithmetic to calculate the year fraction more quickly """ seasonOffset = 0.5 if lat < 0 else 0.0 return np.mod( 2000.0 + (tdbld - 51544.5 + self.DOY_OFFSET) / (365.25) + seasonOffset, 1.0 )