# Licensed under a 3-clause BSD style license - see LICENSE.rst
import ctypes
import math
import time
import warnings
import numpy as np
from astropy.io.fits.fitsrec import FITS_rec
from astropy.io.fits.hdu.base import BITPIX2DTYPE, DELAYED
from astropy.io.fits.hdu.compressed._quantization import DITHER_METHODS
from astropy.io.fits.hdu.compressed._tiled_compression import (
_get_compression_setting,
compress_image_data,
)
from astropy.io.fits.hdu.compressed.utils import _tile_shape, _validate_tile_shape
from astropy.io.fits.hdu.image import ImageHDU
from astropy.io.fits.header import Header
from astropy.io.fits.util import _is_int
from astropy.io.fits.verify import _ErrList
from astropy.utils.decorators import lazyproperty
from astropy.utils.exceptions import AstropyUserWarning
from .header import (
_bintable_header_to_image_header,
_image_header_to_empty_bintable,
)
from .section import CompImageSection
from .settings import (
CMTYPE_ALIASES,
COMPRESSION_TYPES,
DEFAULT_COMPRESSION_TYPE,
DEFAULT_DITHER_SEED,
DEFAULT_HCOMP_SCALE,
DEFAULT_HCOMP_SMOOTH,
DEFAULT_QUANTIZE_LEVEL,
DEFAULT_QUANTIZE_METHOD,
DITHER_SEED_CHECKSUM,
DITHER_SEED_CLOCK,
)
__all__ = ["CompImageHDU"]
[docs]
class CompImageHDU(ImageHDU):
"""
Compressed Image HDU class.
"""
_default_name = "COMPRESSED_IMAGE"
def __init__(
self,
data=None,
header=None,
name=None,
compression_type=DEFAULT_COMPRESSION_TYPE,
tile_shape=None,
hcomp_scale=DEFAULT_HCOMP_SCALE,
hcomp_smooth=DEFAULT_HCOMP_SMOOTH,
quantize_level=DEFAULT_QUANTIZE_LEVEL,
quantize_method=DEFAULT_QUANTIZE_METHOD,
dither_seed=DEFAULT_DITHER_SEED,
do_not_scale_image_data=False,
uint=True,
scale_back=None,
bintable=None,
):
"""
Parameters
----------
data : array, optional
Uncompressed image data
header : `~astropy.io.fits.Header`, optional
Header to be associated with the image; when reading the HDU from a
file (data=DELAYED), the header read from the file
name : str, optional
The ``EXTNAME`` value; if this value is `None`, then the name from
the input image header will be used; if there is no name in the
input image header then the default name ``COMPRESSED_IMAGE`` is
used.
compression_type : str, optional
Compression algorithm: one of
``'RICE_1'``, ``'RICE_ONE'``, ``'PLIO_1'``, ``'GZIP_1'``,
``'GZIP_2'``, ``'HCOMPRESS_1'``, ``'NOCOMPRESS'``
tile_shape : tuple, optional
Compression tile shape, which should be specified using the default
Numpy convention for array shapes (C order). The default is to
treat each row of image as a tile.
hcomp_scale : float, optional
HCOMPRESS scale parameter
hcomp_smooth : float, optional
HCOMPRESS smooth parameter
quantize_level : float, optional
Floating point quantization level; see note below
quantize_method : int, optional
Floating point quantization dithering method; can be either
``NO_DITHER`` (-1; default), ``SUBTRACTIVE_DITHER_1`` (1), or
``SUBTRACTIVE_DITHER_2`` (2); see note below
dither_seed : int, optional
Random seed to use for dithering; can be either an integer in the
range 1 to 1000 (inclusive), ``DITHER_SEED_CLOCK`` (0; default), or
``DITHER_SEED_CHECKSUM`` (-1); see note below
Notes
-----
The astropy.io.fits package supports 2 methods of image compression:
1) The entire FITS file may be externally compressed with the gzip
or pkzip utility programs, producing a ``*.gz`` or ``*.zip``
file, respectively. When reading compressed files of this type,
Astropy first uncompresses the entire file into a temporary file
before performing the requested read operations. The
astropy.io.fits package does not support writing to these types
of compressed files. This type of compression is supported in
the ``_File`` class, not in the `CompImageHDU` class. The file
compression type is recognized by the ``.gz`` or ``.zip`` file
name extension.
2) The `CompImageHDU` class supports the FITS tiled image
compression convention in which the image is subdivided into a
grid of rectangular tiles, and each tile of pixels is
individually compressed. The details of this FITS compression
convention are described at the `FITS Support Office web site
<https://fits.gsfc.nasa.gov/registry/tilecompression.html>`_.
Basically, the compressed image tiles are stored in rows of a
variable length array column in a FITS binary table. The
astropy.io.fits recognizes that this binary table extension
contains an image and treats it as if it were an image
extension. Under this tile-compression format, FITS header
keywords remain uncompressed. At this time, Astropy does not
support the ability to extract and uncompress sections of the
image without having to uncompress the entire image.
The astropy.io.fits package supports 3 general-purpose compression
algorithms plus one other special-purpose compression technique that is
designed for data masks with positive integer pixel values. The 3
general purpose algorithms are GZIP, Rice, and HCOMPRESS, and the
special-purpose technique is the IRAF pixel list compression technique
(PLIO). The ``compression_type`` parameter defines the compression
algorithm to be used.
The FITS image can be subdivided into any desired rectangular grid of
compression tiles. With the GZIP, Rice, and PLIO algorithms, the
default is to take each row of the image as a tile. The HCOMPRESS
algorithm is inherently 2-dimensional in nature, so the default in this
case is to take 16 rows of the image per tile. In most cases, it makes
little difference what tiling pattern is used, so the default tiles are
usually adequate. In the case of very small images, it could be more
efficient to compress the whole image as a single tile. Note that the
image dimensions are not required to be an integer multiple of the tile
dimensions; if not, then the tiles at the edges of the image will be
smaller than the other tiles. The ``tile_shape`` parameter may be
provided as a list of tile sizes, one for each dimension in the image.
For example a ``tile_shape`` value of ``(100,100)`` would divide a 300 X
300 image into 9 100 X 100 tiles.
The 4 supported image compression algorithms are all 'lossless' when
applied to integer FITS images; the pixel values are preserved exactly
with no loss of information during the compression and uncompression
process. In addition, the HCOMPRESS algorithm supports a 'lossy'
compression mode that will produce larger amount of image compression.
This is achieved by specifying a non-zero value for the ``hcomp_scale``
parameter. Since the amount of compression that is achieved depends
directly on the RMS noise in the image, it is usually more convenient
to specify the ``hcomp_scale`` factor relative to the RMS noise.
Setting ``hcomp_scale = 2.5`` means use a scale factor that is 2.5
times the calculated RMS noise in the image tile. In some cases it may
be desirable to specify the exact scaling to be used, instead of
specifying it relative to the calculated noise value. This may be done
by specifying the negative of the desired scale value (typically in the
range -2 to -100).
Very high compression factors (of 100 or more) can be achieved by using
large ``hcomp_scale`` values, however, this can produce undesirable
'blocky' artifacts in the compressed image. A variation of the
HCOMPRESS algorithm (called HSCOMPRESS) can be used in this case to
apply a small amount of smoothing of the image when it is uncompressed
to help cover up these artifacts. This smoothing is purely cosmetic
and does not cause any significant change to the image pixel values.
Setting the ``hcomp_smooth`` parameter to 1 will engage the smoothing
algorithm.
Floating point FITS images (which have ``BITPIX`` = -32 or -64) usually
contain too much 'noise' in the least significant bits of the mantissa
of the pixel values to be effectively compressed with any lossless
algorithm. Consequently, floating point images are first quantized
into scaled integer pixel values (and thus throwing away much of the
noise) before being compressed with the specified algorithm (either
GZIP, RICE, or HCOMPRESS). This technique produces much higher
compression factors than simply using the GZIP utility to externally
compress the whole FITS file, but it also means that the original
floating point value pixel values are not exactly preserved. When done
properly, this integer scaling technique will only discard the
insignificant noise while still preserving all the real information in
the image. The amount of precision that is retained in the pixel
values is controlled by the ``quantize_level`` parameter. Larger
values will result in compressed images whose pixels more closely match
the floating point pixel values, but at the same time the amount of
compression that is achieved will be reduced. Users should experiment
with different values for this parameter to determine the optimal value
that preserves all the useful information in the image, without
needlessly preserving all the 'noise' which will hurt the compression
efficiency.
The default value for the ``quantize_level`` scale factor is 16, which
means that scaled integer pixel values will be quantized such that the
difference between adjacent integer values will be 1/16th of the noise
level in the image background. An optimized algorithm is used to
accurately estimate the noise in the image. As an example, if the RMS
noise in the background pixels of an image = 32.0, then the spacing
between adjacent scaled integer pixel values will equal 2.0 by default.
Note that the RMS noise is independently calculated for each tile of
the image, so the resulting integer scaling factor may fluctuate
slightly for each tile. In some cases, it may be desirable to specify
the exact quantization level to be used, instead of specifying it
relative to the calculated noise value. This may be done by specifying
the negative of desired quantization level for the value of
``quantize_level``. In the previous example, one could specify
``quantize_level = -2.0`` so that the quantized integer levels differ
by 2.0. Larger negative values for ``quantize_level`` means that the
levels are more coarsely-spaced, and will produce higher compression
factors.
The quantization algorithm can also apply one of two random dithering
methods in order to reduce bias in the measured intensity of background
regions. The default method, specified with the constant
``SUBTRACTIVE_DITHER_1`` adds dithering to the zero-point of the
quantization array itself rather than adding noise to the actual image.
The random noise is added on a pixel-by-pixel basis, so in order
restore each pixel from its integer value to its floating point value
it is necessary to replay the same sequence of random numbers for each
pixel (see below). The other method, ``SUBTRACTIVE_DITHER_2``, is
exactly like the first except that before dithering any pixel with a
floating point value of ``0.0`` is replaced with the special integer
value ``-2147483647``. When the image is uncompressed, pixels with
this value are restored back to ``0.0`` exactly. Finally, a value of
``NO_DITHER`` disables dithering entirely.
As mentioned above, when using the subtractive dithering algorithm it
is necessary to be able to generate a (pseudo-)random sequence of noise
for each pixel, and replay that same sequence upon decompressing. To
facilitate this, a random seed between 1 and 10000 (inclusive) is used
to seed a random number generator, and that seed is stored in the
``ZDITHER0`` keyword in the header of the compressed HDU. In order to
use that seed to generate the same sequence of random numbers the same
random number generator must be used at compression and decompression
time; for that reason the tiled image convention provides an
implementation of a very simple pseudo-random number generator. The
seed itself can be provided in one of three ways, controllable by the
``dither_seed`` argument: It may be specified manually, or it may be
generated arbitrarily based on the system's clock
(``DITHER_SEED_CLOCK``) or based on a checksum of the pixels in the
image's first tile (``DITHER_SEED_CHECKSUM``). The clock-based method
is the default, and is sufficient to ensure that the value is
reasonably "arbitrary" and that the same seed is unlikely to be
generated sequentially. The checksum method, on the other hand,
ensures that the same seed is used every time for a specific image.
This is particularly useful for software testing as it ensures that the
same image will always use the same seed.
"""
compression_type = CMTYPE_ALIASES.get(compression_type, compression_type)
self._bintable = None
if data is DELAYED or bintable is not None:
# NOTE: for now we don't ever read in CompImageHDU directly from
# files, instead we read in BinTableHDU and pass it in here. In
# future if we do want to read CompImageHDU in directly, we can
# use the following code.
# if data is DELAYED:
# # Reading the HDU from a file
# self._bintable = _CompBinTableHDU(data=data, header=header)
# else:
# If bintable is passed in, it should be a BinTableHDU
self._bintable = bintable
self._bintable._load_variable_length_data = False
self._bintable._manages_own_heap = True
self._bintable._new = False
self._bitpix = self._bintable.header["ZBITPIX"]
header = self._bintable_to_image_header()
header._modified = False
for card in header._cards:
card._modified = False
super().__init__(
data=DELAYED,
header=header,
name=name,
do_not_scale_image_data=do_not_scale_image_data,
uint=uint,
scale_back=scale_back,
)
self.compression_type = self._bintable.header.get(
"ZCMPTYPE", DEFAULT_COMPRESSION_TYPE
)
self.tile_shape = tuple(_tile_shape(self._bintable.header))
self.hcomp_scale = int(
_get_compression_setting(bintable.header, "SCALE", DEFAULT_HCOMP_SCALE)
)
self.hcomp_smooth = _get_compression_setting(
bintable.header, "SMOOTH", DEFAULT_HCOMP_SMOOTH
)
self.quantize_level = _get_compression_setting(
bintable.header, "noisebit", DEFAULT_QUANTIZE_LEVEL
)
self.quantize_method = DITHER_METHODS[
bintable.header.get("ZQUANTIZ", "NO_DITHER")
]
self.dither_seed = bintable.header.get("ZDITHER0", DEFAULT_DITHER_SEED)
else:
# Create at least a skeleton HDU that matches the input
# header and data (if any were input)
super().__init__(
data=data,
header=header or Header(),
name=name,
do_not_scale_image_data=do_not_scale_image_data,
uint=uint,
scale_back=scale_back,
)
if header is not None and "SIMPLE" in header:
self.header["SIMPLE"] = header["SIMPLE"]
self.compression_type = compression_type
self.tile_shape = _validate_tile_shape(
tile_shape=tile_shape,
compression_type=self.compression_type,
image_header=self.header,
)
self.hcomp_scale = hcomp_scale
self.hcomp_smooth = hcomp_smooth
self.quantize_level = quantize_level
self.quantize_method = quantize_method
self.dither_seed = dither_seed
# TODO: just for parameter validation, e.g. tile shape - we shouldn't
# ideally need this and should instead validate the values as they are
# set above.
self._get_bintable_without_data()
def _remove_unnecessary_default_extnames(self, header):
"""Remove default EXTNAME values if they are unnecessary.
Some data files (eg from CFHT) can have the default EXTNAME and
an explicit value. This method removes the default if a more
specific header exists. It also removes any duplicate default
values.
"""
if "EXTNAME" in header:
indices = header._keyword_indices["EXTNAME"]
# Only continue if there is more than one found
n_extname = len(indices)
if n_extname > 1:
extnames_to_remove = [
index for index in indices if header[index] == self._default_name
]
if len(extnames_to_remove) == n_extname:
# Keep the first (they are all the same)
extnames_to_remove.pop(0)
# Remove them all in reverse order to keep the index unchanged.
for index in sorted(extnames_to_remove, reverse=True):
del header[index]
@property
def compression_type(self):
return self._compression_type
@compression_type.setter
def compression_type(self, value):
value = CMTYPE_ALIASES.get(value, value)
if value in COMPRESSION_TYPES:
self._compression_type = value
else:
warnings.warn(
"Unknown compression type provided (supported are {}). "
"Default ({}) compression will be used.".format(
", ".join(map(repr, COMPRESSION_TYPES)),
DEFAULT_COMPRESSION_TYPE,
),
AstropyUserWarning,
)
self._compression_type = DEFAULT_COMPRESSION_TYPE
def _get_bintable_without_data(self):
"""
Convert the current ImageHDU (excluding the actual data) to a BinTableHDU
with the correct header.
"""
# Determine based on the size of the input data whether to use the Q
# column format to store compressed data or the P format.
# The Q format is used only if the uncompressed data is larger than
# 4 GB. This is not a perfect heuristic, as one can contrive an input
# array which, when compressed, the entire binary table representing
# the compressed data is larger than 4GB. That said, this is the same
# heuristic used by CFITSIO, so this should give consistent results.
# And the cases where this heuristic is insufficient are extreme and
# almost entirely contrived corner cases, so it will do for now
if self._has_data:
huge_hdu = self.data.nbytes > 2**32
else:
huge_hdu = False
# TODO: test above
# NOTE: for now the function below modifies the compressed binary table
# bintable._header in-place, but this could be refactored in future to
# return the compressed header.
bintable = _image_header_to_empty_bintable(
self.header,
name=self.name,
huge_hdu=huge_hdu,
compression_type=self.compression_type,
tile_shape=self.tile_shape,
hcomp_scale=self.hcomp_scale,
hcomp_smooth=self.hcomp_smooth,
quantize_level=self.quantize_level,
quantize_method=self.quantize_method,
dither_seed=self.dither_seed,
axes=self._axes,
generate_dither_seed=self._generate_dither_seed,
)
return bintable
@property
def _data_loaded(self):
"""
Whether the data is fully decompressed into self.data - note that is
a little different to _data_loaded on other HDUs, but it is conceptually
the same idea in a way.
"""
return "data" in self.__dict__ and super().data is not None
@property
def _data_shape(self):
if self._data_loaded:
return self.data.shape
else:
return tuple(reversed(self._axes))
@lazyproperty
def data(self):
"""
The decompressed data array.
Note that accessing this will cause all the tiles to be loaded,
decompressed, and combined into a single data array. If you do
not need to access the whole array, consider instead using the
:attr:`~astropy.io.fits.CompImageHDU.section` property.
"""
# If there is no internal binary table, the HDU was not created from a
# file and therefore the data is just the one on the parent ImageHDU
# class
if self._data_loaded:
return super().data
elif self._bintable is None or len(self._bintable.data) == 0:
return None
# Since .section has general code to load any arbitrary part of the
# data, we can just use this
data = self.section[...]
return data
@data.setter
def data(self, data):
ImageHDU.data.fset(self, data)
if (
data is not None
and hasattr(self, "tile_shape")
and len(self.tile_shape) != data.ndim
):
self.tile_shape = _validate_tile_shape(
tile_shape=[],
compression_type=self.compression_type,
image_header=self.header,
)
@property
def compressed_data(self):
return None if self._bintable is None else self._bintable.data
def _bintable_to_image_header(self):
if self._bintable is None:
raise ValueError("bintable is not set")
# Clean up any possible doubled EXTNAME keywords that use
# the default. Do this on the original header to ensure
# duplicates are removed cleanly.
self._remove_unnecessary_default_extnames(self._bintable.header)
# Convert compressed header to image header and save
# it off to self._image_header so it can be referenced later
# unambiguously
return _bintable_header_to_image_header(self._bintable.header)
def _add_data_to_bintable(self, bintable):
"""
Compress the image data so that it may be written to a file.
"""
if self.data is None:
return
heap = compress_image_data(
self.data, self.compression_type, bintable.header, bintable.columns
)
dtype = bintable.columns.dtype.newbyteorder(">")
buf = np.frombuffer(heap, dtype=np.uint8)
data = (
buf[: bintable._theap]
.view(dtype=dtype, type=np.rec.recarray)
.view(FITS_rec)
)
data._load_variable_length_data = False
data._coldefs = bintable.columns
data._heapoffset = bintable._theap
data._heapsize = len(buf) - bintable._theap
bintable.data = data
def _prewriteto(self, checksum=False, inplace=False):
if (
self._bintable is not None
and not self._has_data
and not self.header._modified
):
self._tmp_bintable = self._bintable
return self._tmp_bintable._prewriteto(checksum=checksum, inplace=inplace)
if self._scale_back:
self._scale_internal(
BITPIX2DTYPE[self._orig_bitpix], blank=self._orig_blank
)
self._tmp_bintable = self._get_bintable_without_data()
self._add_data_to_bintable(self._tmp_bintable)
# If a bintable already exists internally we should update that instead
# of using a whole new BinTableHDU so that mode='update' works.
if self._bintable is not None:
self._bintable.header = self._tmp_bintable.header
self._bintable.data = self._tmp_bintable.data
self._tmp_bintable = self._bintable
return self._tmp_bintable._prewriteto(checksum=checksum, inplace=inplace)
def _writeto(self, fileobj, inplace=False, copy=False):
if self._tmp_bintable is not None:
# Each time we assign the bintable data to the BinTableHDU, some of
# the blank keywords get removed, so at this point, just before
# writing, we should make sure that the number of blank cards in
# the final binary table to be written matches the number of blanks
# in the image header.
image_blanks = self.header._countblanks()
table_blanks = self._tmp_bintable.header._countblanks()
for _ in range(image_blanks - table_blanks):
self._tmp_bintable.header.append()
return self._tmp_bintable._writeto(fileobj, inplace=inplace, copy=copy)
def _postwriteto(self):
self._tmp_bintable = None
def _close(self, closed=True):
if self._bintable is not None:
return self._bintable._close(closed=closed)
def _generate_dither_seed(self, seed):
if not _is_int(seed):
raise TypeError("Seed must be an integer")
if not -1 <= seed <= 10000:
raise ValueError(
"Seed for random dithering must be either between 1 and "
"10000 inclusive, 0 for autogeneration from the system "
"clock, or -1 for autogeneration from a checksum of the first "
f"image tile (got {seed})"
)
if seed == DITHER_SEED_CHECKSUM:
# Determine the tile dimensions from the ZTILEn keywords
tile_dims = self.tile_shape
# Get the first tile by using the tile dimensions as the end
# indices of slices (starting from 0)
first_tile = self.data[tuple(slice(d) for d in tile_dims)]
# The checksum algorithm used is literally just the sum of the bytes
# of the tile data (not its actual floating point values). Integer
# overflow is irrelevant.
csum = first_tile.view(dtype="uint8").sum()
# Since CFITSIO uses an unsigned long (which may be different on
# different platforms) go ahead and truncate the sum to its
# unsigned long value and take the result modulo 10000
return (ctypes.c_ulong(csum).value % 10000) + 1
elif seed == DITHER_SEED_CLOCK:
# This isn't exactly the same algorithm as CFITSIO, but that's okay
# since the result is meant to be arbitrary. The primary difference
# is that CFITSIO incorporates the HDU number into the result in
# the hopes of heading off the possibility of the same seed being
# generated for two HDUs at the same time. Here instead we just
# add in the HDU object's id
return (
(sum(int(x) for x in math.modf(time.time())) + id(self)) % 10000
) + 1
else:
return seed
@property
def section(self):
"""
Efficiently access a section of the image array
This property can be used to access a section of the data without
loading and decompressing the entire array into memory.
The :class:`~astropy.io.fits.CompImageSection` object returned by this
attribute is not meant to be used directly by itself. Rather, slices of
the section return the appropriate slice of the data, and loads *only*
that section into memory. Any valid basic Numpy index can be used to
slice :class:`~astropy.io.fits.CompImageSection`.
Note that accessing data using :attr:`CompImageHDU.section` will always
load tiles one at a time from disk, and therefore when accessing a large
fraction of the data (or slicing it in a way that would cause most tiles
to be loaded) you may obtain better performance by using
:attr:`CompImageHDU.data`.
"""
return CompImageSection(self)
def _verify(self, *args, **kwargs):
# The following is the default _verify for ImageHDU
errs = super()._verify(*args, **kwargs)
# However in some cases the decompressed header is actually like a
# PrimaryHDU header rather than an ImageHDU header, in which case
# there are certain errors we can ignore
if "SIMPLE" in self.header:
errs_filtered = []
for err in errs:
if len(err) >= 2 and err[1] in (
"'XTENSION' card does not exist.",
"'PCOUNT' card does not exist.",
"'GCOUNT' card does not exist.",
):
continue
errs_filtered.append(err)
return _ErrList(errs_filtered)
else:
return errs
@property
def _data_offset(self):
if self._bintable is not None:
return self._bintable._data_offset
@_data_offset.setter
def _data_offset(self, value):
# We should never set _data_offset to a non-None value. We need to
# implement this setter as one of the parent classes sets _data_offset
# to None in __init__.
if value is not None:
raise RuntimeError("Cannot set CompImageHDU._data_offset")
@property
def _header_offset(self):
if self._bintable is not None:
return self._bintable._header_offset
@_header_offset.setter
def _header_offset(self, value):
# We should never set _data_offset to a non-None value. We need to
# implement this setter as one of the parent classes sets _data_offset
# to None in __init__.
if value is not None:
raise RuntimeError("Cannot set CompImageHDU._header_offset")
@property
def _data_size(self):
if self._bintable is not None:
return self._bintable._data_size
@_data_size.setter
def _data_size(self, value):
# We should never set _data_offset to a non-None value. We need to
# implement this setter as one of the parent classes sets _data_offset
# to None in __init__.
if value is not None:
raise RuntimeError("Cannot set CompImageHDU._data_size")