# This file includes the definition of a mix-in class that provides the low-
# and high-level WCS API to the astropy.wcs.WCS object. We keep this code
# isolated in this mix-in class to avoid making the main wcs.py file too
# long.
import warnings
import numpy as np
from astropy import units as u
from astropy.coordinates import SpectralCoord, Galactic, ICRS
from astropy.coordinates.spectral_coordinate import update_differentials_to_match, attach_zero_velocities
from astropy.utils.exceptions import AstropyUserWarning
from astropy.constants import c
from .low_level_api import BaseLowLevelWCS
from .high_level_api import HighLevelWCSMixin
from .wrappers import SlicedLowLevelWCS
__all__ = ['custom_ctype_to_ucd_mapping', 'SlicedFITSWCS', 'FITSWCSAPIMixin']
C_SI = c.si.value
VELOCITY_FRAMES = {
'GEOCENT': 'gcrs',
'BARYCENT': 'icrs',
'HELIOCENT': 'hcrs',
'LSRK': 'lsrk',
'LSRD': 'lsrd'
}
# The spectra velocity frames below are needed for FITS spectral WCS
# (see Greisen 06 table 12) but aren't yet defined as real
# astropy.coordinates frames, so we instead define them here as instances
# of existing coordinate frames with offset velocities. In future we should
# make these real frames so that users can more easily recognize these
# velocity frames when used in SpectralCoord.
# This frame is defined as a velocity of 220 km/s in the
# direction of l=90, b=0. The rotation velocity is defined
# in:
#
# Kerr and Lynden-Bell 1986, Review of galactic constants.
#
# NOTE: this may differ from the assumptions of galcen_v_sun
# in the Galactocentric frame - the value used here is
# the one adopted by the WCS standard for spectral
# transformations.
VELOCITY_FRAMES['GALACTOC'] = Galactic(u=0 * u.km, v=0 * u.km, w=0 * u.km,
U=0 * u.km / u.s, V=-220 * u.km / u.s, W=0 * u.km / u.s,
representation_type='cartesian',
differential_type='cartesian')
# This frame is defined as a velocity of 300 km/s in the
# direction of l=90, b=0. This is defined in:
#
# Transactions of the IAU Vol. XVI B Proceedings of the
# 16th General Assembly, Reports of Meetings of Commissions:
# Comptes Rendus Des Séances Des Commissions, Commission 28,
# p201.
#
# Note that these values differ from those used by CASA
# (308 km/s towards l=105, b=-7) but we use the above values
# since these are the ones defined in Greisen et al (2006).
VELOCITY_FRAMES['LOCALGRP'] = Galactic(u=0 * u.km, v=0 * u.km, w=0 * u.km,
U=0 * u.km / u.s, V=-300 * u.km / u.s, W=0 * u.km / u.s,
representation_type='cartesian',
differential_type='cartesian')
# This frame is defined as a velocity of 368 km/s in the
# direction of l=263.85, b=48.25. This is defined in:
#
# Bennett et al. (2003), First-Year Wilkinson Microwave
# Anisotropy Probe (WMAP) Observations: Preliminary Maps
# and Basic Results
#
# Note that in that paper, the dipole is expressed as a
# temperature (T=3.346 +/- 0.017mK)
VELOCITY_FRAMES['CMBDIPOL'] = Galactic(l=263.85 * u.deg, b=48.25 * u.deg, distance=0 * u.km,
radial_velocity=-(3.346e-3 / 2.725 * c).to(u.km/u.s))
# Mapping from CTYPE axis name to UCD1
CTYPE_TO_UCD1 = {
# Celestial coordinates
'RA': 'pos.eq.ra',
'DEC': 'pos.eq.dec',
'GLON': 'pos.galactic.lon',
'GLAT': 'pos.galactic.lat',
'ELON': 'pos.ecliptic.lon',
'ELAT': 'pos.ecliptic.lat',
'TLON': 'pos.bodyrc.lon',
'TLAT': 'pos.bodyrc.lat',
'HPLT': 'custom:pos.helioprojective.lat',
'HPLN': 'custom:pos.helioprojective.lon',
'HPRZ': 'custom:pos.helioprojective.z',
'HGLN': 'custom:pos.heliographic.stonyhurst.lon',
'HGLT': 'custom:pos.heliographic.stonyhurst.lat',
'CRLN': 'custom:pos.heliographic.carrington.lon',
'CRLT': 'custom:pos.heliographic.carrington.lat',
'SOLX': 'custom:pos.heliocentric.x',
'SOLY': 'custom:pos.heliocentric.y',
'SOLZ': 'custom:pos.heliocentric.z',
# Spectral coordinates (WCS paper 3)
'FREQ': 'em.freq', # Frequency
'ENER': 'em.energy', # Energy
'WAVN': 'em.wavenumber', # Wavenumber
'WAVE': 'em.wl', # Vacuum wavelength
'VRAD': 'spect.dopplerVeloc.radio', # Radio velocity
'VOPT': 'spect.dopplerVeloc.opt', # Optical velocity
'ZOPT': 'src.redshift', # Redshift
'AWAV': 'em.wl', # Air wavelength
'VELO': 'spect.dopplerVeloc', # Apparent radial velocity
'BETA': 'custom:spect.doplerVeloc.beta', # Beta factor (v/c)
'STOKES': 'phys.polarization.stokes', # STOKES parameters
# Time coordinates (https://www.aanda.org/articles/aa/pdf/2015/02/aa24653-14.pdf)
'TIME': 'time',
'TAI': 'time',
'TT': 'time',
'TDT': 'time',
'ET': 'time',
'IAT': 'time',
'UT1': 'time',
'UTC': 'time',
'GMT': 'time',
'GPS': 'time',
'TCG': 'time',
'TCB': 'time',
'TDB': 'time',
'LOCAL': 'time',
# Distance coordinates
'DIST': 'pos.distance',
'DSUN': 'custom:pos.distance.sunToObserver'
# UT() and TT() are handled separately in world_axis_physical_types
}
# Keep a list of additional custom mappings that have been registered. This
# is kept as a list in case nested context managers are used
CTYPE_TO_UCD1_CUSTOM = []
class custom_ctype_to_ucd_mapping:
"""
A context manager that makes it possible to temporarily add new CTYPE to
UCD1+ mapping used by :attr:`FITSWCSAPIMixin.world_axis_physical_types`.
Parameters
----------
mapping : dict
A dictionary mapping a CTYPE value to a UCD1+ value
Examples
--------
Consider a WCS with the following CTYPE::
>>> from astropy.wcs import WCS
>>> wcs = WCS(naxis=1)
>>> wcs.wcs.ctype = ['SPAM']
By default, :attr:`FITSWCSAPIMixin.world_axis_physical_types` returns `None`,
but this can be overridden::
>>> wcs.world_axis_physical_types
[None]
>>> with custom_ctype_to_ucd_mapping({'SPAM': 'food.spam'}):
... wcs.world_axis_physical_types
['food.spam']
"""
def __init__(self, mapping):
CTYPE_TO_UCD1_CUSTOM.insert(0, mapping)
self.mapping = mapping
def __enter__(self):
pass
def __exit__(self, type, value, tb):
CTYPE_TO_UCD1_CUSTOM.remove(self.mapping)
class SlicedFITSWCS(SlicedLowLevelWCS, HighLevelWCSMixin):
pass
class FITSWCSAPIMixin(BaseLowLevelWCS, HighLevelWCSMixin):
"""
A mix-in class that is intended to be inherited by the
:class:`~astropy.wcs.WCS` class and provides the low- and high-level WCS API
"""
@property
def pixel_n_dim(self):
return self.naxis
@property
def world_n_dim(self):
return len(self.wcs.ctype)
@property
def array_shape(self):
if self.pixel_shape is None:
return None
else:
return self.pixel_shape[::-1]
@array_shape.setter
def array_shape(self, value):
if value is None:
self.pixel_shape = None
else:
self.pixel_shape = value[::-1]
@property
def pixel_shape(self):
if self._naxis == [0, 0]:
return None
else:
return tuple(self._naxis)
@pixel_shape.setter
def pixel_shape(self, value):
if value is None:
self._naxis = [0, 0]
else:
if len(value) != self.naxis:
raise ValueError("The number of data axes, "
"{}, does not equal the "
"shape {}.".format(self.naxis, len(value)))
self._naxis = list(value)
@property
def pixel_bounds(self):
return self._pixel_bounds
@pixel_bounds.setter
def pixel_bounds(self, value):
if value is None:
self._pixel_bounds = value
else:
if len(value) != self.naxis:
raise ValueError("The number of data axes, "
"{}, does not equal the number of "
"pixel bounds {}.".format(self.naxis, len(value)))
self._pixel_bounds = list(value)
@property
def world_axis_physical_types(self):
types = []
# TODO: need to support e.g. TT(TAI)
for ctype in self.wcs.ctype:
if ctype.upper().startswith(('UT(', 'TT(')):
types.append('time')
else:
ctype_name = ctype.split('-')[0]
for custom_mapping in CTYPE_TO_UCD1_CUSTOM:
if ctype_name in custom_mapping:
types.append(custom_mapping[ctype_name])
break
else:
types.append(CTYPE_TO_UCD1.get(ctype_name.upper(), None))
return types
@property
def world_axis_units(self):
units = []
for unit in self.wcs.cunit:
if unit is None:
unit = ''
elif isinstance(unit, u.Unit):
unit = unit.to_string(format='vounit')
else:
try:
unit = u.Unit(unit).to_string(format='vounit')
except u.UnitsError:
unit = ''
units.append(unit)
return units
@property
def world_axis_names(self):
return list(self.wcs.cname)
@property
def axis_correlation_matrix(self):
# If there are any distortions present, we assume that there may be
# correlations between all axes. Maybe if some distortions only apply
# to the image plane we can improve this?
if self.has_distortion:
return np.ones((self.world_n_dim, self.pixel_n_dim), dtype=bool)
# Assuming linear world coordinates along each axis, the correlation
# matrix would be given by whether or not the PC matrix is zero
matrix = self.wcs.get_pc() != 0
# We now need to check specifically for celestial coordinates since
# these can assume correlations because of spherical distortions. For
# each celestial coordinate we copy over the pixel dependencies from
# the other celestial coordinates.
celestial = (self.wcs.axis_types // 1000) % 10 == 2
celestial_indices = np.nonzero(celestial)[0]
for world1 in celestial_indices:
for world2 in celestial_indices:
if world1 != world2:
matrix[world1] |= matrix[world2]
matrix[world2] |= matrix[world1]
return matrix
def pixel_to_world_values(self, *pixel_arrays):
world = self.all_pix2world(*pixel_arrays, 0)
return world[0] if self.world_n_dim == 1 else tuple(world)
def world_to_pixel_values(self, *world_arrays):
pixel = self.all_world2pix(*world_arrays, 0)
return pixel[0] if self.pixel_n_dim == 1 else tuple(pixel)
@property
def world_axis_object_components(self):
return self._get_components_and_classes()[0]
@property
def world_axis_object_classes(self):
return self._get_components_and_classes()[1]
@property
def serialized_classes(self):
return False
def _get_components_and_classes(self):
# The aim of this function is to return whatever is needed for
# world_axis_object_components and world_axis_object_classes. It's easier
# to figure it out in one go and then return the values and let the
# properties return part of it.
# Since this method might get called quite a few times, we need to cache
# it. We start off by defining a hash based on the attributes of the
# WCS that matter here (we can't just use the WCS object as a hash since
# it is mutable)
wcs_hash = (self.naxis,
list(self.wcs.ctype),
list(self.wcs.cunit),
self.wcs.radesys,
self.wcs.specsys,
self.wcs.equinox,
self.wcs.dateobs,
self.wcs.lng,
self.wcs.lat)
# If the cache is present, we need to check that the 'hash' matches.
if getattr(self, '_components_and_classes_cache', None) is not None:
cache = self._components_and_classes_cache
if cache[0] == wcs_hash:
return cache[1]
else:
self._components_and_classes_cache = None
# Avoid circular imports by importing here
from astropy.wcs.utils import wcs_to_celestial_frame
from astropy.coordinates import SkyCoord, EarthLocation
from astropy.time.formats import FITS_DEPRECATED_SCALES
from astropy.time import Time, TimeDelta
components = [None] * self.naxis
classes = {}
# Let's start off by checking whether the WCS has a pair of celestial
# components
if self.has_celestial:
try:
celestial_frame = wcs_to_celestial_frame(self)
except ValueError:
# Some WCSes, e.g. solar, can be recognized by WCSLIB as being
# celestial but we don't necessarily have frames for them.
celestial_frame = None
else:
kwargs = {}
kwargs['frame'] = celestial_frame
kwargs['unit'] = u.deg
classes['celestial'] = (SkyCoord, (), kwargs)
components[self.wcs.lng] = ('celestial', 0, 'spherical.lon.degree')
components[self.wcs.lat] = ('celestial', 1, 'spherical.lat.degree')
# Next, we check for spectral components
if self.has_spectral:
# Find index of spectral coordinate
ispec = self.wcs.spec
ctype = self.wcs.ctype[ispec][:4]
ctype = ctype.upper()
kwargs = {}
# Determine observer location and velocity
# TODO: determine how WCS standard would deal with observer on a
# spacecraft far from earth. For now assume the obsgeo parameters,
# if present, give the geocentric observer location.
if np.isnan(self.wcs.obsgeo[0]):
observer = None
else:
earth_location = EarthLocation(*self.wcs.obsgeo[:3], unit=u.m)
obstime = Time(self.wcs.mjdobs, format='mjd', scale='utc',
location=earth_location)
observer_location = SkyCoord(earth_location.get_itrs(obstime=obstime))
if self.wcs.specsys in VELOCITY_FRAMES:
frame = VELOCITY_FRAMES[self.wcs.specsys]
observer = observer_location.transform_to(frame)
if isinstance(frame, str):
observer = attach_zero_velocities(observer)
else:
observer = update_differentials_to_match(observer_location,
VELOCITY_FRAMES[self.wcs.specsys],
preserve_observer_frame=True)
elif self.wcs.specsys == 'TOPOCENT':
observer = attach_zero_velocities(observer_location)
else:
raise NotImplementedError(f'SPECSYS={self.wcs.specsys} not yet supported')
# Determine target
# This is tricker. In principle the target for each pixel is the
# celestial coordinates of the pixel, but we then need to be very
# careful about SSYSOBS which is tricky. For now, we set the
# target using the reference celestial coordinate in the WCS (if
# any).
if self.has_celestial and celestial_frame is not None:
# NOTE: celestial_frame was defined higher up
# NOTE: we set the distance explicitly to avoid warnings in SpectralCoord
target = SkyCoord(self.wcs.crval[self.wcs.lng] * self.wcs.cunit[self.wcs.lng],
self.wcs.crval[self.wcs.lat] * self.wcs.cunit[self.wcs.lat],
frame=celestial_frame,
distance=1000 * u.kpc)
target = attach_zero_velocities(target)
else:
target = None
# SpectralCoord does not work properly if either observer or target
# are not convertible to ICRS, so if this is the case, we (for now)
# drop the observer and target from the SpectralCoord and warn the
# user.
if observer is not None:
try:
observer.transform_to(ICRS())
except Exception:
warnings.warn('observer cannot be converted to ICRS, so will '
'not be set on SpectralCoord', AstropyUserWarning)
observer = None
if target is not None:
try:
target.transform_to(ICRS())
except Exception:
warnings.warn('target cannot be converted to ICRS, so will '
'not be set on SpectralCoord', AstropyUserWarning)
target = None
# NOTE: below we include Quantity in classes['spectral'] instead
# of SpectralCoord - this is because we want to also be able to
# accept plain quantities.
if ctype == 'ZOPT':
def spectralcoord_from_redshift(redshift):
if isinstance(redshift, SpectralCoord):
return redshift
return SpectralCoord((redshift + 1) * self.wcs.restwav,
unit=u.m, observer=observer, target=target)
def redshift_from_spectralcoord(spectralcoord):
# TODO: check target is consistent
if observer is None:
warnings.warn('No observer defined on WCS, SpectralCoord '
'will be converted without any velocity '
'frame change', AstropyUserWarning)
return spectralcoord.to_value(u.m) / self.wcs.restwav - 1.
else:
return spectralcoord.with_observer_stationary_relative_to(observer).to_value(u.m) / self.wcs.restwav - 1.
classes['spectral'] = (u.Quantity, (), {}, spectralcoord_from_redshift)
components[self.wcs.spec] = ('spectral', 0, redshift_from_spectralcoord)
elif ctype == 'BETA':
def spectralcoord_from_beta(beta):
if isinstance(beta, SpectralCoord):
return beta
return SpectralCoord(beta * C_SI,
unit=u.m / u.s,
doppler_convention='relativistic',
doppler_rest=self.wcs.restwav * u.m,
observer=observer, target=target)
def beta_from_spectralcoord(spectralcoord):
# TODO: check target is consistent
doppler_equiv = u.doppler_relativistic(self.wcs.restwav * u.m)
if observer is None:
warnings.warn('No observer defined on WCS, SpectralCoord '
'will be converted without any velocity '
'frame change', AstropyUserWarning)
return spectralcoord.to_value(u.m / u.s, doppler_equiv) / C_SI
else:
return spectralcoord.with_observer_stationary_relative_to(observer).to_value(u.m / u.s, doppler_equiv) / C_SI
classes['spectral'] = (u.Quantity, (), {}, spectralcoord_from_beta)
components[self.wcs.spec] = ('spectral', 0, beta_from_spectralcoord)
else:
kwargs['unit'] = self.wcs.cunit[ispec]
if self.wcs.restfrq > 0:
if ctype == 'VELO':
kwargs['doppler_convention'] = 'relativistic'
kwargs['doppler_rest'] = self.wcs.restfrq * u.Hz
elif ctype == 'VRAD':
kwargs['doppler_convention'] = 'radio'
kwargs['doppler_rest'] = self.wcs.restfrq * u.Hz
elif ctype == 'VOPT':
kwargs['doppler_convention'] = 'optical'
kwargs['doppler_rest'] = self.wcs.restwav * u.m
def spectralcoord_from_value(value):
return SpectralCoord(value, observer=observer, target=target, **kwargs)
def value_from_spectralcoord(spectralcoord):
# TODO: check target is consistent
if observer is None:
warnings.warn('No observer defined on WCS, SpectralCoord '
'will be converted without any velocity '
'frame change', AstropyUserWarning)
return spectralcoord.to_value(**kwargs)
else:
return spectralcoord.with_observer_stationary_relative_to(observer).to_value(**kwargs)
classes['spectral'] = (u.Quantity, (), {}, spectralcoord_from_value)
components[self.wcs.spec] = ('spectral', 0, value_from_spectralcoord)
# We can then make sure we correctly return Time objects where appropriate
# (https://www.aanda.org/articles/aa/pdf/2015/02/aa24653-14.pdf)
if 'time' in self.world_axis_physical_types:
multiple_time = self.world_axis_physical_types.count('time') > 1
for i in range(self.naxis):
if self.world_axis_physical_types[i] == 'time':
if multiple_time:
name = f'time.{i}'
else:
name = 'time'
# Initialize delta
reference_time_delta = None
# Extract time scale
scale = self.wcs.ctype[i].lower()
if scale == 'time':
if self.wcs.timesys:
scale = self.wcs.timesys.lower()
else:
scale = 'utc'
# Drop sub-scales
if '(' in scale:
pos = scale.index('(')
scale, subscale = scale[:pos], scale[pos+1:-1]
warnings.warn(f'Dropping unsupported sub-scale '
f'{subscale.upper()} from scale {scale.upper()}',
UserWarning)
# TODO: consider having GPS as a scale in Time
# For now GPS is not a scale, we approximate this by TAI - 19s
if scale == 'gps':
reference_time_delta = TimeDelta(19, format='sec')
scale = 'tai'
elif scale.upper() in FITS_DEPRECATED_SCALES:
scale = FITS_DEPRECATED_SCALES[scale.upper()]
elif scale not in Time.SCALES:
raise ValueError(f'Unrecognized time CTYPE={self.wcs.ctype[i]}')
# Determine location
trefpos = self.wcs.trefpos.lower()
if trefpos.startswith('topocent'):
# Note that some headers use TOPOCENT instead of TOPOCENTER
if np.any(np.isnan(self.wcs.obsgeo[:3])):
warnings.warn('Missing or incomplete observer location '
'information, setting location in Time to None',
UserWarning)
location = None
else:
location = EarthLocation(*self.wcs.obsgeo[:3], unit=u.m)
elif trefpos == 'geocenter':
location = EarthLocation(0, 0, 0, unit=u.m)
elif trefpos == '':
location = None
else:
# TODO: implement support for more locations when Time supports it
warnings.warn(f"Observation location '{trefpos}' is not "
"supported, setting location in Time to None", UserWarning)
location = None
reference_time = Time(np.nan_to_num(self.wcs.mjdref[0]),
np.nan_to_num(self.wcs.mjdref[1]),
format='mjd', scale=scale,
location=location)
if reference_time_delta is not None:
reference_time = reference_time + reference_time_delta
def time_from_reference_and_offset(offset):
if isinstance(offset, Time):
return offset
return reference_time + TimeDelta(offset, format='sec')
def offset_from_time_and_reference(time):
return (time - reference_time).sec
classes[name] = (Time, (), {}, time_from_reference_and_offset)
components[i] = (name, 0, offset_from_time_and_reference)
# Fallback: for any remaining components that haven't been identified, just
# return Quantity as the class to use
for i in range(self.naxis):
if components[i] is None:
name = self.wcs.ctype[i].split('-')[0].lower()
if name == '':
name = 'world'
while name in classes:
name += "_"
classes[name] = (u.Quantity, (), {'unit': self.wcs.cunit[i]})
components[i] = (name, 0, 'value')
# Keep a cached version of result
self._components_and_classes_cache = wcs_hash, (components, classes)
return components, classes