Source code for astropy.io.fits.hdu.compressed.section
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import numpy as np
from astropy.io.fits.hdu.base import BITPIX2DTYPE
from astropy.io.fits.hdu.compressed._tiled_compression import (
decompress_image_data_section,
)
from astropy.io.fits.hdu.compressed.utils import _n_tiles
from astropy.utils.shapes import simplify_basic_index
__all__ = ["CompImageSection"]
[docs]
class CompImageSection:
"""
Class enabling subsets of CompImageHDU data to be loaded lazily via slicing.
Slices of this object load the corresponding section of an image array from
the underlying FITS file, and applies any BSCALE/BZERO factors.
Section slices cannot be assigned to, and modifications to a section are
not saved back to the underlying file.
See the :ref:`astropy:data-sections` section of the Astropy documentation
for more details.
"""
def __init__(self, hdu):
self.hdu = hdu
self._data_shape = self.hdu.shape
self._tile_shape = self.hdu.tile_shape
self._n_dim = len(self._data_shape)
self._n_tiles = np.array(
_n_tiles(self._data_shape, self._tile_shape), dtype=int
)
@property
def shape(self):
return tuple(self._data_shape)
@property
def ndim(self):
return len(self.hdu.shape)
@property
def dtype(self):
return np.dtype(BITPIX2DTYPE[self.hdu._bitpix])
def __getitem__(self, index):
if self.hdu._bintable is None:
return self.hdu.data[index]
# Shortcut if the whole data is requested (this is used by the
# data property, so we optimize it as it is frequently used)
if index is Ellipsis:
first_tile_index = np.zeros(self._n_dim, dtype=int)
last_tile_index = self._n_tiles - 1
data = decompress_image_data_section(
self.hdu._bintable.data,
self.hdu.compression_type,
self.hdu._bintable.header,
self.hdu._bintable,
self.hdu.header,
first_tile_index,
last_tile_index,
)
if self.hdu._do_not_scale_image_data:
return data
scaled_data = self.hdu._scale_data(data)
self.hdu._update_header_scale_info(scaled_data.dtype)
return scaled_data
index = simplify_basic_index(index, shape=self._data_shape)
# Determine for each dimension the first and last tile to extract
first_tile_index = np.zeros(self._n_dim, dtype=int)
last_tile_index = np.zeros(self._n_dim, dtype=int)
final_array_index = []
for dim, idx in enumerate(index):
if isinstance(idx, slice):
if idx.step > 0:
first_tile_index[dim] = idx.start // self._tile_shape[dim]
last_tile_index[dim] = (idx.stop - 1) // self._tile_shape[dim]
else:
stop = 0 if idx.stop is None else max(idx.stop - 1, 0)
first_tile_index[dim] = stop // self._tile_shape[dim]
last_tile_index[dim] = idx.start // self._tile_shape[dim]
# Because slices such as slice(5, 0, 1) can exist (which
# would be empty) we need to make sure last_tile_index is
# always larger than first_tile_index
last_tile_index = np.maximum(last_tile_index, first_tile_index)
if idx.step < 0 and idx.stop is None:
final_array_index.append(idx)
else:
final_array_index.append(
slice(
idx.start - self._tile_shape[dim] * first_tile_index[dim],
idx.stop - self._tile_shape[dim] * first_tile_index[dim],
idx.step,
)
)
else:
first_tile_index[dim] = idx // self._tile_shape[dim]
last_tile_index[dim] = first_tile_index[dim]
final_array_index.append(
idx - self._tile_shape[dim] * first_tile_index[dim]
)
data = decompress_image_data_section(
self.hdu._bintable.data,
self.hdu.compression_type,
self.hdu._bintable.header,
self.hdu._bintable,
self.hdu.header,
first_tile_index,
last_tile_index,
)
data = data[tuple(final_array_index)]
if self.hdu._do_not_scale_image_data:
return data
scaled_data = self.hdu._scale_data(data)
self.hdu._update_header_scale_info(scaled_data.dtype)
return scaled_data