Source code for astropy.wcs.wcsapi.fitswcs

# This file includes the definition of a mix-in class that provides the low-
# and high-level WCS API to the astropy.wcs.WCS object. We keep this code
# isolated in this mix-in class to avoid making the main file too
# long.

import warnings

import numpy as np

from astropy import units as u
from astropy.coordinates import SpectralCoord, Galactic, ICRS
from astropy.coordinates.spectral_coordinate import update_differentials_to_match, attach_zero_velocities
from astropy.utils.exceptions import AstropyUserWarning
from astropy.constants import c

from .low_level_api import BaseLowLevelWCS
from .high_level_api import HighLevelWCSMixin
from .wrappers import SlicedLowLevelWCS

__all__ = ['custom_ctype_to_ucd_mapping', 'SlicedFITSWCS', 'FITSWCSAPIMixin']

C_SI =

    'GEOCENT': 'gcrs',
    'BARYCENT': 'icrs',
    'HELIOCENT': 'hcrs',
    'LSRK': 'lsrk',
    'LSRD': 'lsrd'

# The spectra velocity frames below are needed for FITS spectral WCS
#  (see Greisen 06 table 12) but aren't yet defined as real
# astropy.coordinates frames, so we instead define them here as instances
# of existing coordinate frames with offset velocities. In future we should
# make these real frames so that users can more easily recognize these
# velocity frames when used in SpectralCoord.

# This frame is defined as a velocity of 220 km/s in the
# direction of l=90, b=0. The rotation velocity is defined
# in:
#   Kerr and Lynden-Bell 1986, Review of galactic constants.
# NOTE: this may differ from the assumptions of galcen_v_sun
# in the Galactocentric frame - the value used here is
# the one adopted by the WCS standard for spectral
# transformations.

VELOCITY_FRAMES['GALACTOC'] = Galactic(u=0 *, v=0 *, w=0 *,
                                       U=0 * / u.s, V=-220 * / u.s, W=0 * / u.s,

# This frame is defined as a velocity of 300 km/s in the
# direction of l=90, b=0. This is defined in:
#   Transactions of the IAU Vol. XVI B Proceedings of the
#   16th General Assembly, Reports of Meetings of Commissions:
#   Comptes Rendus Des Séances Des Commissions, Commission 28,
#   p201.
# Note that these values differ from those used by CASA
# (308 km/s towards l=105, b=-7) but we use the above values
# since these are the ones defined in Greisen et al (2006).

VELOCITY_FRAMES['LOCALGRP'] = Galactic(u=0 *, v=0 *, w=0 *,
                                       U=0 * / u.s, V=-300 * / u.s, W=0 * / u.s,

# This frame is defined as a velocity of 368 km/s in the
# direction of l=263.85, b=48.25. This is defined in:
#   Bennett et al. (2003), First-Year Wilkinson Microwave
#   Anisotropy Probe (WMAP) Observations: Preliminary Maps
#   and Basic Results
# Note that in that paper, the dipole is expressed as a
# temperature (T=3.346 +/- 0.017mK)

VELOCITY_FRAMES['CMBDIPOL'] = Galactic(l=263.85 * u.deg, b=48.25 * u.deg, distance=0 *,
                                       radial_velocity=-(3.346e-3 / 2.725 * c).to(

# Mapping from CTYPE axis name to UCD1


    # Celestial coordinates
    'RA': 'pos.eq.ra',
    'DEC': 'pos.eq.dec',
    'GLON': 'pos.galactic.lon',
    'GLAT': '',
    'ELON': 'pos.ecliptic.lon',
    'ELAT': '',
    'TLON': 'pos.bodyrc.lon',
    'TLAT': '',
    'HPLT': '',
    'HPLN': 'custom:pos.helioprojective.lon',
    'HPRZ': 'custom:pos.helioprojective.z',
    'HGLN': 'custom:pos.heliographic.stonyhurst.lon',
    'HGLT': '',
    'CRLN': 'custom:pos.heliographic.carrington.lon',
    'CRLT': '',
    'SOLX': 'custom:pos.heliocentric.x',
    'SOLY': 'custom:pos.heliocentric.y',
    'SOLZ': 'custom:pos.heliocentric.z',

    # Spectral coordinates (WCS paper 3)
    'FREQ': 'em.freq',  # Frequency
    'ENER': '',  # Energy
    'WAVN': 'em.wavenumber',  # Wavenumber
    'WAVE': 'em.wl',  # Vacuum wavelength
    'VRAD': '',  # Radio velocity
    'VOPT': 'spect.dopplerVeloc.opt',  # Optical velocity
    'ZOPT': 'src.redshift',  # Redshift
    'AWAV': 'em.wl',  # Air wavelength
    'VELO': 'spect.dopplerVeloc',  # Apparent radial velocity
    'BETA': 'custom:spect.doplerVeloc.beta',  # Beta factor (v/c)
    'STOKES': 'phys.polarization.stokes',  # STOKES parameters

    # Time coordinates (
    'TIME': 'time',
    'TAI': 'time',
    'TT': 'time',
    'TDT': 'time',
    'ET': 'time',
    'IAT': 'time',
    'UT1': 'time',
    'UTC': 'time',
    'GMT': 'time',
    'GPS': 'time',
    'TCG': 'time',
    'TCB': 'time',
    'TDB': 'time',
    'LOCAL': 'time',

    # Distance coordinates
    'DIST': 'pos.distance',
    'DSUN': 'custom:pos.distance.sunToObserver'

    # UT() and TT() are handled separately in world_axis_physical_types


# Keep a list of additional custom mappings that have been registered. This
# is kept as a list in case nested context managers are used

class custom_ctype_to_ucd_mapping:
    A context manager that makes it possible to temporarily add new CTYPE to
    UCD1+ mapping used by :attr:`FITSWCSAPIMixin.world_axis_physical_types`.

    mapping : dict
        A dictionary mapping a CTYPE value to a UCD1+ value


    Consider a WCS with the following CTYPE::

        >>> from astropy.wcs import WCS
        >>> wcs = WCS(naxis=1)
        >>> wcs.wcs.ctype = ['SPAM']

    By default, :attr:`FITSWCSAPIMixin.world_axis_physical_types` returns `None`,
    but this can be overridden::

        >>> wcs.world_axis_physical_types
        >>> with custom_ctype_to_ucd_mapping({'SPAM': 'food.spam'}):
        ...     wcs.world_axis_physical_types

    def __init__(self, mapping):
        CTYPE_TO_UCD1_CUSTOM.insert(0, mapping)
        self.mapping = mapping

    def __enter__(self):

    def __exit__(self, type, value, tb):

class SlicedFITSWCS(SlicedLowLevelWCS, HighLevelWCSMixin):

class FITSWCSAPIMixin(BaseLowLevelWCS, HighLevelWCSMixin):
    A mix-in class that is intended to be inherited by the
    :class:`~astropy.wcs.WCS` class and provides the low- and high-level WCS API

    def pixel_n_dim(self):
        return self.naxis

    def world_n_dim(self):
        return len(self.wcs.ctype)

    def array_shape(self):
        if self.pixel_shape is None:
            return None
            return self.pixel_shape[::-1]

    def array_shape(self, value):
        if value is None:
            self.pixel_shape = None
            self.pixel_shape = value[::-1]

    def pixel_shape(self):
        if self._naxis == [0, 0]:
            return None
            return tuple(self._naxis)

    def pixel_shape(self, value):
        if value is None:
            self._naxis = [0, 0]
            if len(value) != self.naxis:
                raise ValueError("The number of data axes, "
                                 "{}, does not equal the "
                                 "shape {}.".format(self.naxis, len(value)))
            self._naxis = list(value)

    def pixel_bounds(self):
        return self._pixel_bounds

    def pixel_bounds(self, value):
        if value is None:
            self._pixel_bounds = value
            if len(value) != self.naxis:
                raise ValueError("The number of data axes, "
                                 "{}, does not equal the number of "
                                 "pixel bounds {}.".format(self.naxis, len(value)))
            self._pixel_bounds = list(value)

    def world_axis_physical_types(self):
        types = []
        # TODO: need to support e.g. TT(TAI)
        for ctype in self.wcs.ctype:
            if ctype.upper().startswith(('UT(', 'TT(')):
                ctype_name = ctype.split('-')[0]
                for custom_mapping in CTYPE_TO_UCD1_CUSTOM:
                    if ctype_name in custom_mapping:
                    types.append(CTYPE_TO_UCD1.get(ctype_name.upper(), None))
        return types

    def world_axis_units(self):
        units = []
        for unit in self.wcs.cunit:
            if unit is None:
                unit = ''
            elif isinstance(unit, u.Unit):
                unit = unit.to_string(format='vounit')
                    unit = u.Unit(unit).to_string(format='vounit')
                except u.UnitsError:
                    unit = ''
        return units

    def world_axis_names(self):
        return list(self.wcs.cname)

    def axis_correlation_matrix(self):

        # If there are any distortions present, we assume that there may be
        # correlations between all axes. Maybe if some distortions only apply
        # to the image plane we can improve this?
        if self.has_distortion:
            return np.ones((self.world_n_dim, self.pixel_n_dim), dtype=bool)

        # Assuming linear world coordinates along each axis, the correlation
        # matrix would be given by whether or not the PC matrix is zero
        matrix = self.wcs.get_pc() != 0

        # We now need to check specifically for celestial coordinates since
        # these can assume correlations because of spherical distortions. For
        # each celestial coordinate we copy over the pixel dependencies from
        # the other celestial coordinates.
        celestial = (self.wcs.axis_types // 1000) % 10 == 2
        celestial_indices = np.nonzero(celestial)[0]
        for world1 in celestial_indices:
            for world2 in celestial_indices:
                if world1 != world2:
                    matrix[world1] |= matrix[world2]
                    matrix[world2] |= matrix[world1]

        return matrix

    def pixel_to_world_values(self, *pixel_arrays):
        world = self.all_pix2world(*pixel_arrays, 0)
        return world[0] if self.world_n_dim == 1 else tuple(world)

    def world_to_pixel_values(self, *world_arrays):
        pixel = self.all_world2pix(*world_arrays, 0)
        return pixel[0] if self.pixel_n_dim == 1 else tuple(pixel)

    def world_axis_object_components(self):
        return self._get_components_and_classes()[0]

    def world_axis_object_classes(self):
        return self._get_components_and_classes()[1]

    def serialized_classes(self):
        return False

    def _get_components_and_classes(self):

        # The aim of this function is to return whatever is needed for
        # world_axis_object_components and world_axis_object_classes. It's easier
        # to figure it out in one go and then return the values and let the
        # properties return part of it.

        # Since this method might get called quite a few times, we need to cache
        # it. We start off by defining a hash based on the attributes of the
        # WCS that matter here (we can't just use the WCS object as a hash since
        # it is mutable)
        wcs_hash = (self.naxis,

        # If the cache is present, we need to check that the 'hash' matches.
        if getattr(self, '_components_and_classes_cache', None) is not None:
            cache = self._components_and_classes_cache
            if cache[0] == wcs_hash:
                return cache[1]
                self._components_and_classes_cache = None

        # Avoid circular imports by importing here
        from astropy.wcs.utils import wcs_to_celestial_frame
        from astropy.coordinates import SkyCoord, EarthLocation
        from astropy.time.formats import FITS_DEPRECATED_SCALES
        from astropy.time import Time, TimeDelta

        components = [None] * self.naxis
        classes = {}

        # Let's start off by checking whether the WCS has a pair of celestial
        # components

        if self.has_celestial:

                celestial_frame = wcs_to_celestial_frame(self)
            except ValueError:
                # Some WCSes, e.g. solar, can be recognized by WCSLIB as being
                # celestial but we don't necessarily have frames for them.
                celestial_frame = None

                kwargs = {}
                kwargs['frame'] = celestial_frame
                kwargs['unit'] = u.deg

                classes['celestial'] = (SkyCoord, (), kwargs)

                components[self.wcs.lng] = ('celestial', 0, '')
                components[] = ('celestial', 1, '')

        # Next, we check for spectral components

        if self.has_spectral:

            # Find index of spectral coordinate
            ispec = self.wcs.spec
            ctype = self.wcs.ctype[ispec][:4]
            ctype = ctype.upper()

            kwargs = {}

            # Determine observer location and velocity

            # TODO: determine how WCS standard would deal with observer on a
            # spacecraft far from earth. For now assume the obsgeo parameters,
            # if present, give the geocentric observer location.

            if np.isnan(self.wcs.obsgeo[0]):
                observer = None

                earth_location = EarthLocation(*self.wcs.obsgeo[:3], unit=u.m)
                obstime = Time(self.wcs.mjdobs, format='mjd', scale='utc',
                observer_location = SkyCoord(earth_location.get_itrs(obstime=obstime))

                if self.wcs.specsys in VELOCITY_FRAMES:
                    frame = VELOCITY_FRAMES[self.wcs.specsys]
                    observer = observer_location.transform_to(frame)
                    if isinstance(frame, str):
                        observer = attach_zero_velocities(observer)
                        observer = update_differentials_to_match(observer_location,
                elif self.wcs.specsys == 'TOPOCENT':
                    observer = attach_zero_velocities(observer_location)
                    raise NotImplementedError(f'SPECSYS={self.wcs.specsys} not yet supported')

            # Determine target

            # This is tricker. In principle the target for each pixel is the
            # celestial coordinates of the pixel, but we then need to be very
            # careful about SSYSOBS which is tricky. For now, we set the
            # target using the reference celestial coordinate in the WCS (if
            # any).

            if self.has_celestial and celestial_frame is not None:

                # NOTE: celestial_frame was defined higher up

                # NOTE: we set the distance explicitly to avoid warnings in SpectralCoord

                target = SkyCoord(self.wcs.crval[self.wcs.lng] * self.wcs.cunit[self.wcs.lng],
                                  self.wcs.crval[] * self.wcs.cunit[],
                                  distance=1000 * u.kpc)

                target = attach_zero_velocities(target)


                target = None

            # SpectralCoord does not work properly if either observer or target
            # are not convertible to ICRS, so if this is the case, we (for now)
            # drop the observer and target from the SpectralCoord and warn the
            # user.

            if observer is not None:
                except Exception:
                    warnings.warn('observer cannot be converted to ICRS, so will '
                                  'not be set on SpectralCoord', AstropyUserWarning)
                    observer = None

            if target is not None:
                except Exception:
                    warnings.warn('target cannot be converted to ICRS, so will '
                                  'not be set on SpectralCoord', AstropyUserWarning)
                    target = None

            # NOTE: below we include Quantity in classes['spectral'] instead
            # of SpectralCoord - this is because we want to also be able to
            # accept plain quantities.

            if ctype == 'ZOPT':

                def spectralcoord_from_redshift(redshift):
                    if isinstance(redshift, SpectralCoord):
                        return redshift
                    return SpectralCoord((redshift + 1) * self.wcs.restwav,
                                         unit=u.m, observer=observer, target=target)

                def redshift_from_spectralcoord(spectralcoord):
                    # TODO: check target is consistent
                    if observer is None:
                        warnings.warn('No observer defined on WCS, SpectralCoord '
                                      'will be converted without any velocity '
                                      'frame change', AstropyUserWarning)
                        return spectralcoord.to_value(u.m) / self.wcs.restwav - 1.
                        return spectralcoord.with_observer_stationary_relative_to(observer).to_value(u.m) / self.wcs.restwav - 1.

                classes['spectral'] = (u.Quantity, (), {}, spectralcoord_from_redshift)
                components[self.wcs.spec] = ('spectral', 0, redshift_from_spectralcoord)

            elif ctype == 'BETA':

                def spectralcoord_from_beta(beta):
                    if isinstance(beta, SpectralCoord):
                        return beta
                    return SpectralCoord(beta * C_SI,
                                         unit=u.m / u.s,
                                         doppler_rest=self.wcs.restwav * u.m,
                                         observer=observer, target=target)

                def beta_from_spectralcoord(spectralcoord):
                    # TODO: check target is consistent
                    doppler_equiv = u.doppler_relativistic(self.wcs.restwav * u.m)
                    if observer is None:
                        warnings.warn('No observer defined on WCS, SpectralCoord '
                                      'will be converted without any velocity '
                                      'frame change', AstropyUserWarning)
                        return spectralcoord.to_value(u.m / u.s, doppler_equiv) / C_SI
                        return spectralcoord.with_observer_stationary_relative_to(observer).to_value(u.m / u.s, doppler_equiv) / C_SI

                classes['spectral'] = (u.Quantity, (), {}, spectralcoord_from_beta)
                components[self.wcs.spec] = ('spectral', 0, beta_from_spectralcoord)


                kwargs['unit'] = self.wcs.cunit[ispec]

                if self.wcs.restfrq > 0:
                    if ctype == 'VELO':
                        kwargs['doppler_convention'] = 'relativistic'
                        kwargs['doppler_rest'] = self.wcs.restfrq * u.Hz
                    elif ctype == 'VRAD':
                        kwargs['doppler_convention'] = 'radio'
                        kwargs['doppler_rest'] = self.wcs.restfrq * u.Hz
                    elif ctype == 'VOPT':
                        kwargs['doppler_convention'] = 'optical'
                        kwargs['doppler_rest'] = self.wcs.restwav * u.m

                def spectralcoord_from_value(value):
                    return SpectralCoord(value, observer=observer, target=target, **kwargs)

                def value_from_spectralcoord(spectralcoord):
                    # TODO: check target is consistent
                    if observer is None:
                        warnings.warn('No observer defined on WCS, SpectralCoord '
                                      'will be converted without any velocity '
                                      'frame change', AstropyUserWarning)
                        return spectralcoord.to_value(**kwargs)
                        return spectralcoord.with_observer_stationary_relative_to(observer).to_value(**kwargs)

                classes['spectral'] = (u.Quantity, (), {}, spectralcoord_from_value)
                components[self.wcs.spec] = ('spectral', 0, value_from_spectralcoord)

        # We can then make sure we correctly return Time objects where appropriate
        # (

        if 'time' in self.world_axis_physical_types:

            multiple_time = self.world_axis_physical_types.count('time') > 1

            for i in range(self.naxis):

                if self.world_axis_physical_types[i] == 'time':

                    if multiple_time:
                        name = f'time.{i}'
                        name = 'time'

                    # Initialize delta
                    reference_time_delta = None

                    # Extract time scale
                    scale = self.wcs.ctype[i].lower()

                    if scale == 'time':
                        if self.wcs.timesys:
                            scale = self.wcs.timesys.lower()
                            scale = 'utc'

                    # Drop sub-scales
                    if '(' in scale:
                        pos = scale.index('(')
                        scale, subscale = scale[:pos], scale[pos+1:-1]
                        warnings.warn(f'Dropping unsupported sub-scale '
                                      f'{subscale.upper()} from scale {scale.upper()}',

                    # TODO: consider having GPS as a scale in Time
                    # For now GPS is not a scale, we approximate this by TAI - 19s
                    if scale == 'gps':
                        reference_time_delta = TimeDelta(19, format='sec')
                        scale = 'tai'

                    elif scale.upper() in FITS_DEPRECATED_SCALES:
                        scale = FITS_DEPRECATED_SCALES[scale.upper()]

                    elif scale not in Time.SCALES:
                        raise ValueError(f'Unrecognized time CTYPE={self.wcs.ctype[i]}')

                    # Determine location
                    trefpos = self.wcs.trefpos.lower()

                    if trefpos.startswith('topocent'):
                        # Note that some headers use TOPOCENT instead of TOPOCENTER
                        if np.any(np.isnan(self.wcs.obsgeo[:3])):
                            warnings.warn('Missing or incomplete observer location '
                                          'information, setting location in Time to None',
                            location = None
                            location = EarthLocation(*self.wcs.obsgeo[:3], unit=u.m)
                    elif trefpos == 'geocenter':
                        location = EarthLocation(0, 0, 0, unit=u.m)
                    elif trefpos == '':
                        location = None
                        # TODO: implement support for more locations when Time supports it
                        warnings.warn(f"Observation location '{trefpos}' is not "
                                       "supported, setting location in Time to None", UserWarning)
                        location = None

                    reference_time = Time(np.nan_to_num(self.wcs.mjdref[0]),
                                          format='mjd', scale=scale,

                    if reference_time_delta is not None:
                        reference_time = reference_time + reference_time_delta

                    def time_from_reference_and_offset(offset):
                        if isinstance(offset, Time):
                            return offset
                        return reference_time + TimeDelta(offset, format='sec')

                    def offset_from_time_and_reference(time):
                        return (time - reference_time).sec

                    classes[name] = (Time, (), {}, time_from_reference_and_offset)
                    components[i] = (name, 0, offset_from_time_and_reference)

        # Fallback: for any remaining components that haven't been identified, just
        # return Quantity as the class to use

        for i in range(self.naxis):
            if components[i] is None:
                name = self.wcs.ctype[i].split('-')[0].lower()
                if name == '':
                    name = 'world'
                while name in classes:
                    name += "_"
                classes[name] = (u.Quantity, (), {'unit': self.wcs.cunit[i]})
                components[i] = (name, 0, 'value')

        # Keep a cached version of result
        self._components_and_classes_cache = wcs_hash, (components, classes)

        return components, classes