Source code for astropy.timeseries.sampled

# Licensed under a 3-clause BSD style license - see LICENSE.rst

from copy import deepcopy

import numpy as np

from astropy import units as u
from astropy.table import QTable, Table, groups
from astropy.time import Time, TimeDelta
from astropy.timeseries.core import BaseTimeSeries, autocheck_required_columns
from astropy.units import Quantity, UnitsError

__all__ = ["TimeSeries"]


[docs] @autocheck_required_columns class TimeSeries(BaseTimeSeries): """ A class to represent time series data in tabular form. `~astropy.timeseries.TimeSeries` provides a class for representing time series as a collection of values of different quantities measured at specific points in time (for time series with finite time bins, see the `~astropy.timeseries.BinnedTimeSeries` class). `~astropy.timeseries.TimeSeries` is a sub-class of `~astropy.table.QTable` and thus provides all the standard table maniplation methods available to tables, but it also provides additional conveniences for dealing with time series, such as a flexible initializer for setting up the times, a method for folding time series, and a ``time`` attribute for easy access to the time values. See also: https://docs.astropy.org/en/stable/timeseries/ Parameters ---------- data : numpy ndarray, dict, list, `~astropy.table.Table`, or table-like object, optional Data to initialize time series. This does not need to contain the times, which can be provided separately, but if it does contain the times they should be in a column called ``'time'`` to be automatically recognized. time : `~astropy.time.Time`, `~astropy.time.TimeDelta` or iterable The times at which the values are sampled - this can be either given directly as a `~astropy.time.Time` or `~astropy.time.TimeDelta` array or as any iterable that initializes the `~astropy.time.Time` class. If this is given, then the remaining time-related arguments should not be used. time_start : `~astropy.time.Time` or str The time of the first sample in the time series. This is an alternative to providing ``time`` and requires that ``time_delta`` is also provided. time_delta : `~astropy.time.TimeDelta` or `~astropy.units.Quantity` ['time'] The step size in time for the series. This can either be a scalar if the time series is evenly sampled, or an array of values if it is not. n_samples : int The number of time samples for the series. This is only used if both ``time_start`` and ``time_delta`` are provided and are scalar values. **kwargs : dict, optional Additional keyword arguments are passed to `~astropy.table.QTable`. """ _required_columns = ["time"] def __init__( self, data=None, *, time=None, time_start=None, time_delta=None, n_samples=None, **kwargs, ): super().__init__(data=data, **kwargs) # For some operations, an empty time series needs to be created, then # columns added one by one. We should check that when columns are added # manually, time is added first and is of the right type. if data is None and time is None and time_start is None and time_delta is None: self._required_columns_relax = True return # First if time has been given in the table data, we should extract it # and treat it as if it had been passed as a keyword argument. if data is not None: if n_samples is not None: if n_samples != len(self): raise TypeError( "'n_samples' has been given both and it is not the " "same length as the input data." ) else: n_samples = len(self) if "time" in self.colnames: if time is None: time = self.columns["time"] else: raise TypeError( "'time' has been given both in the table and as a keyword argument" ) if time is None and time_start is None: raise TypeError("Either 'time' or 'time_start' should be specified") elif time is not None and time_start is not None: raise TypeError("Cannot specify both 'time' and 'time_start'") if time is not None and not isinstance(time, (Time, TimeDelta)): time = Time(time) if time_start is not None and not isinstance(time_start, (Time, TimeDelta)): time_start = Time(time_start) if time_delta is not None and not isinstance(time_delta, (Quantity, TimeDelta)): raise TypeError("'time_delta' should be a Quantity or a TimeDelta") if isinstance(time_delta, TimeDelta): time_delta = time_delta.sec * u.s if time_start is not None: # We interpret this as meaning that time is that of the first # sample and that the interval is given by time_delta. if time_delta is None: raise TypeError("'time' is scalar, so 'time_delta' is required") if time_delta.isscalar: time_delta = np.repeat(time_delta, n_samples) time_delta = np.cumsum(time_delta) time_delta = np.roll(time_delta, 1) time_delta[0] = 0.0 * u.s time = time_start + time_delta elif len(self.colnames) > 0 and len(time) != len(self): raise ValueError( f"Length of 'time' ({len(time)}) should match data length ({n_samples})" ) elif time_delta is not None: raise TypeError( "'time_delta' should not be specified since 'time' is an array" ) with self._delay_required_column_checks(): if "time" in self.colnames: self.remove_column("time") self.add_column(time, index=0, name="time") @property def time(self): """ The time values. """ return self["time"]
[docs] def fold( self, period=None, epoch_time=None, epoch_phase=0, wrap_phase=None, normalize_phase=False, ): """ Return a new `~astropy.timeseries.TimeSeries` folded with a period and epoch. Parameters ---------- period : `~astropy.units.Quantity` ['time'] The period to use for folding epoch_time : `~astropy.time.Time` The time to use as the reference epoch, at which the relative time offset / phase will be ``epoch_phase``. Defaults to the first time in the time series. epoch_phase : float or `~astropy.units.Quantity` ['dimensionless', 'time'] Phase of ``epoch_time``. If ``normalize_phase`` is `True`, this should be a dimensionless value, while if ``normalize_phase`` is ``False``, this should be a `~astropy.units.Quantity` with time units. Defaults to 0. wrap_phase : float or `~astropy.units.Quantity` ['dimensionless', 'time'] The value of the phase above which values are wrapped back by one period. If ``normalize_phase`` is `True`, this should be a dimensionless value, while if ``normalize_phase`` is ``False``, this should be a `~astropy.units.Quantity` with time units. Defaults to half the period, so that the resulting time series goes from ``-period / 2`` to ``period / 2`` (if ``normalize_phase`` is `False`) or -0.5 to 0.5 (if ``normalize_phase`` is `True`). normalize_phase : bool If `False` phase is returned as `~astropy.time.TimeDelta`, otherwise as a dimensionless `~astropy.units.Quantity`. Returns ------- folded_timeseries : `~astropy.timeseries.TimeSeries` The folded time series object with phase as the ``time`` column. """ if not isinstance(period, Quantity) or period.unit.physical_type != "time": raise UnitsError("period should be a Quantity in units of time") folded = self.copy() if epoch_time is None: epoch_time = self.time[0] else: epoch_time = Time(epoch_time) period_sec = period.to_value(u.s) if normalize_phase: if ( isinstance(epoch_phase, Quantity) and epoch_phase.unit.physical_type != "dimensionless" ): raise UnitsError( "epoch_phase should be a dimensionless Quantity " "or a float when normalize_phase=True" ) epoch_phase_sec = epoch_phase * period_sec else: if epoch_phase == 0: epoch_phase_sec = 0.0 else: if ( not isinstance(epoch_phase, Quantity) or epoch_phase.unit.physical_type != "time" ): raise UnitsError( "epoch_phase should be a Quantity in units " "of time when normalize_phase=False" ) epoch_phase_sec = epoch_phase.to_value(u.s) if wrap_phase is None: wrap_phase = period_sec / 2 else: if normalize_phase: if isinstance( wrap_phase, Quantity ) and not wrap_phase.unit.is_equivalent(u.one): raise UnitsError( "wrap_phase should be dimensionless when normalize_phase=True" ) else: if wrap_phase < 0 or wrap_phase > 1: raise ValueError("wrap_phase should be between 0 and 1") else: wrap_phase = wrap_phase * period_sec else: if ( isinstance(wrap_phase, Quantity) and wrap_phase.unit.physical_type == "time" ): if wrap_phase < 0 or wrap_phase > period: raise ValueError( "wrap_phase should be between 0 and the period" ) else: wrap_phase = wrap_phase.to_value(u.s) else: raise UnitsError( "wrap_phase should be a Quantity in units " "of time when normalize_phase=False" ) relative_time_sec = ( (self.time - epoch_time).sec + epoch_phase_sec + (period_sec - wrap_phase) ) % period_sec - (period_sec - wrap_phase) folded_time = TimeDelta(relative_time_sec * u.s) if normalize_phase: folded_time = (folded_time / period).decompose() period = period_sec = 1 with folded._delay_required_column_checks(): folded.remove_column("time") folded.add_column(folded_time, name="time", index=0) return folded
def __getitem__(self, item): if self._is_list_or_tuple_of_str(item): if "time" not in item: out = QTable( [self[x] for x in item], meta=deepcopy(self.meta), copy_indices=self._copy_indices, ) out._groups = groups.TableGroups( out, indices=self.groups._indices, keys=self.groups._keys ) return out return super().__getitem__(item)
[docs] def add_column(self, *args, **kwargs): """ See :meth:`~astropy.table.Table.add_column`. """ # Note that the docstring is inherited from QTable result = super().add_column(*args, **kwargs) if len(self.indices) == 0 and "time" in self.colnames: self.add_index("time") return result
[docs] def add_columns(self, *args, **kwargs): """ See :meth:`~astropy.table.Table.add_columns`. """ # Note that the docstring is inherited from QTable result = super().add_columns(*args, **kwargs) if len(self.indices) == 0 and "time" in self.colnames: self.add_index("time") return result
[docs] @classmethod def from_pandas(self, df, time_scale="utc"): """ Convert a :class:`~pandas.DataFrame` to a :class:`astropy.timeseries.TimeSeries`. Parameters ---------- df : :class:`pandas.DataFrame` A pandas :class:`pandas.DataFrame` instance. time_scale : str The time scale to pass into `astropy.time.Time`. Defaults to ``UTC``. """ from pandas import DataFrame, DatetimeIndex if not isinstance(df, DataFrame): raise TypeError("Input should be a pandas DataFrame") if not isinstance(df.index, DatetimeIndex): raise TypeError("DataFrame does not have a DatetimeIndex") time = Time(df.index, scale=time_scale) table = Table.from_pandas(df) return TimeSeries(time=time, data=table)
[docs] def to_pandas(self): """ Convert this :class:`~astropy.timeseries.TimeSeries` to a :class:`~pandas.DataFrame` with a :class:`~pandas.DatetimeIndex` index. Returns ------- dataframe : :class:`pandas.DataFrame` A pandas :class:`pandas.DataFrame` instance """ return Table(self).to_pandas(index="time")
[docs] @classmethod def read( self, filename, time_column=None, time_format=None, time_scale=None, format=None, *args, **kwargs, ): """ Read and parse a file and returns a `astropy.timeseries.TimeSeries`. This method uses the unified I/O infrastructure in Astropy which makes it easy to define readers/writers for various classes (https://docs.astropy.org/en/stable/io/unified.html). By default, this method will try and use readers defined specifically for the `astropy.timeseries.TimeSeries` class - however, it is also possible to use the ``format`` keyword to specify formats defined for the `astropy.table.Table` class - in this case, you will need to also provide the column names for column containing the start times for the bins, as well as other column names (see the Parameters section below for details):: >>> from astropy.timeseries import TimeSeries >>> ts = TimeSeries.read('sampled.dat', format='ascii.ecsv', ... time_column='date') # doctest: +SKIP Parameters ---------- filename : str File to parse. format : str File format specifier. time_column : str, optional The name of the time column. time_format : str, optional The time format for the time column. time_scale : str, optional The time scale for the time column. *args : tuple, optional Positional arguments passed through to the data reader. **kwargs : dict, optional Keyword arguments passed through to the data reader. Returns ------- out : `astropy.timeseries.sampled.TimeSeries` TimeSeries corresponding to file contents. Notes ----- """ try: # First we try the readers defined for the BinnedTimeSeries class return super().read(filename, *args, format=format, **kwargs) except TypeError: # Otherwise we fall back to the default Table readers if time_column is None: raise ValueError( "``time_column`` should be provided since the default Table readers" " are being used." ) table = Table.read(filename, *args, format=format, **kwargs) if time_column in table.colnames: time = Time( table.columns[time_column], scale=time_scale, format=time_format ) table.remove_column(time_column) else: raise ValueError( f"Time column '{time_column}' not found in the input data." ) return TimeSeries(time=time, data=table)