Source code for astropy.io.fits.hdu.compressed.section
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
from astropy.io.fits.hdu.base import BITPIX2DTYPE
from astropy.io.fits.hdu.compressed._tiled_compression import (
decompress_image_data_section,
)
from astropy.io.fits.hdu.compressed.utils import _data_shape, _n_tiles, _tile_shape
from astropy.utils.shapes import simplify_basic_index
[docs]
class CompImageSection:
"""
Class enabling subsets of CompImageHDU data to be loaded lazily via slicing.
Slices of this object load the corresponding section of an image array from
the underlying FITS file, and applies any BSCALE/BZERO factors.
Section slices cannot be assigned to, and modifications to a section are
not saved back to the underlying file.
See the :ref:`astropy:data-sections` section of the Astropy documentation
for more details.
"""
def __init__(self, hdu):
self.hdu = hdu
self._data_shape = _data_shape(self.hdu._header)
self._tile_shape = _tile_shape(self.hdu._header)
self._n_dim = len(self._data_shape)
self._n_tiles = np.array(
_n_tiles(self._data_shape, self._tile_shape), dtype=int
)
@property
def shape(self):
return tuple(self._data_shape)
@property
def ndim(self):
return self.hdu._header["ZNAXIS"]
@property
def dtype(self):
return BITPIX2DTYPE[self.hdu._header["ZBITPIX"]]
def __getitem__(self, index):
# Shortcut if the whole data is requested (this is used by the
# data property, so we optimize it as it is frequently used)
if index is Ellipsis:
first_tile_index = np.zeros(self._n_dim, dtype=int)
last_tile_index = self._n_tiles - 1
data = decompress_image_data_section(
self.hdu.compressed_data,
self.hdu.compression_type,
self.hdu._header,
self.hdu,
first_tile_index,
last_tile_index,
)
return self.hdu._scale_data(data)
index = simplify_basic_index(index, shape=self._data_shape)
# Determine for each dimension the first and last tile to extract
first_tile_index = np.zeros(self._n_dim, dtype=int)
last_tile_index = np.zeros(self._n_dim, dtype=int)
final_array_index = []
for dim, idx in enumerate(index):
if isinstance(idx, slice):
if idx.step > 0:
first_tile_index[dim] = idx.start // self._tile_shape[dim]
last_tile_index[dim] = (idx.stop - 1) // self._tile_shape[dim]
else:
stop = 0 if idx.stop is None else max(idx.stop - 1, 0)
first_tile_index[dim] = stop // self._tile_shape[dim]
last_tile_index[dim] = idx.start // self._tile_shape[dim]
# Because slices such as slice(5, 0, 1) can exist (which
# would be empty) we need to make sure last_tile_index is
# always larger than first_tile_index
last_tile_index = np.maximum(last_tile_index, first_tile_index)
if idx.step < 0 and idx.stop is None:
final_array_index.append(idx)
else:
final_array_index.append(
slice(
idx.start - self._tile_shape[dim] * first_tile_index[dim],
idx.stop - self._tile_shape[dim] * first_tile_index[dim],
idx.step,
)
)
else:
first_tile_index[dim] = idx // self._tile_shape[dim]
last_tile_index[dim] = first_tile_index[dim]
final_array_index.append(
idx - self._tile_shape[dim] * first_tile_index[dim]
)
data = decompress_image_data_section(
self.hdu.compressed_data,
self.hdu.compression_type,
self.hdu._header,
self.hdu,
first_tile_index,
last_tile_index,
)
return self.hdu._scale_data(data[tuple(final_array_index)])