"""Main Lomb-Scargle Multiband Implementation"""
import numpy as np
from astropy import units as u
from astropy.time import Time, TimeDelta
from astropy.timeseries.binned import BinnedTimeSeries
from astropy.timeseries.periodograms.lombscargle import LombScargle
from astropy.timeseries.sampled import TimeSeries
from .implementations import available_methods, lombscargle_multiband
from .implementations.mle import construct_regularization, design_matrix, periodic_fit
__all__ = ["LombScargleMultiband"]
def has_units(obj):
return hasattr(obj, "unit")
def get_unit(obj):
return getattr(obj, "unit", 1)
def strip_units(*arrs):
strip = lambda a: None if a is None else np.asarray(a)
if len(arrs) == 1:
return strip(arrs[0])
else:
return map(strip, arrs)
[docs]
class LombScargleMultiband(LombScargle):
"""Compute the Lomb-Scargle Periodogram.
This implementation is based on code presented in [1]_ and [2]_;
if you use this functionality in an academic application, citation of
those works would be appreciated.
Parameters
----------
t : array-like or `~astropy.units.Quantity` ['time']
sequence of observation times
y : array-like or `~astropy.units.Quantity`
sequence of observations associated with times t
bands : array-like
sequence of passband labels associated with times t, each unique label
defines a single band of data.
dy : float, array-like, or `~astropy.units.Quantity`, optional
error or sequence of observational errors associated with times t
normalization : {'standard', 'model', 'log', 'psd'}, optional
Normalization to use for the periodogram.
nterms_base : int, optional
number of frequency terms to use for the base model common to all bands.
In the case of the fast algorithm, this parameter is passed along to
the single band LombScargle method as the ``nterms`` parameter.
nterms_band : int, optional
number of frequency terms to use for the residuals between the base
model and each individual band
reg_base : float or None (default = None)
amount of regularization to use on the base model parameters
reg_band : float or None (default = 1E-6)
amount of regularization to use on the band model parameters
regularize_by_trace : bool (default = True)
if True, then regularization is expressed in units of the trace of
the normal matrix
center_data : bool, optional
if True, pre-center the data by subtracting the weighted mean
of the input data.
fit_mean : bool, optional
if True, include a constant offset as part of the model at each
frequency. This can lead to more accurate results, especially in the
case of incomplete phase coverage. Only applicable to the "fast" method
References
----------
.. [1] Vanderplas, J., Connolly, A. Ivezic, Z. & Gray, A. *Introduction to
astroML: Machine learning for astrophysics*. Proceedings of the
Conference on Intelligent Data Understanding (2012)
.. [2] VanderPlas, J. & Ivezic, Z. *Periodograms for Multiband Astronomical
Time Series*. ApJ 812.1:18 (2015)
"""
available_methods = available_methods()
def __init__(
self,
t,
y,
bands,
dy=None,
normalization="standard",
nterms_base=1,
nterms_band=1,
reg_base=None,
reg_band=1e-6,
regularize_by_trace=True,
center_data=True,
fit_mean=True,
):
# If t is a TimeDelta, convert it to a quantity. The units we convert
# to don't really matter since the user gets a Quantity back at the end
# so can convert to any units they like.
if isinstance(t, TimeDelta):
t = t.to("day")
# We want to expose self.t as being the times the user passed in, but
# if the times are absolute, we need to convert them to relative times
# internally, so we use self._trel and self._tstart for this.
self.t = t
if isinstance(self.t, Time):
self._tstart = self.t[0]
trel = (self.t - self._tstart).to(u.day)
else:
self._tstart = None
trel = self.t
self._trel, self.y, self.bands, self.dy = self._validate_inputs(
trel, y, bands, dy
)
self.normalization = normalization
self.nterms_base = nterms_base
self.nterms_band = nterms_band
self.reg_base = reg_base
self.reg_band = reg_band
self.regularize_by_trace = regularize_by_trace
self.center_data = center_data
self.fit_mean = fit_mean
self.nterms = self.nterms_base # determined by the base model params
[docs]
@classmethod
def from_timeseries(
cls,
timeseries,
signal_column=None,
uncertainty_column=None,
band_labels=None,
**kwargs,
):
"""
Initialize a multiband periodogram from a time series object.
If a binned time series is passed, the time at the center of the bins is
used. Also note that this method automatically gets rid of NaN/undefined
values when initializing the periodogram.
Parameters
----------
signal_column : list
The names of columns containing the signal values to use.
uncertainty_column : list, optional
The names of columns containing the errors on the signal.
band_labels : list, optional
The labels for each band, matched by index. If none, uses the
labels of ``signal_column`` as band names.
**kwargs
Additional keyword arguments are passed to the initializer for this
periodogram class.
"""
if signal_column is None:
raise ValueError(
"signal_column_name should be set to a list of valid column names"
)
if band_labels is not None:
if len(band_labels) != len(signal_column):
raise ValueError(
"band_labels have an equal number of elements to signal_column"
)
else:
band_labels = signal_column # use the flux labels as band labels
if isinstance(timeseries, TimeSeries):
time = timeseries.time
elif isinstance(timeseries, BinnedTimeSeries):
time = timeseries.time_bin_center
else:
raise TypeError(
"Input time series should be an instance of "
"TimeSeries or BinnedTimeSeries"
)
# Build lombscargle_multiband inputs
t = []
y = []
dy = []
band = []
for i, col in enumerate(signal_column):
if np.ma.is_masked(timeseries[col]):
signal_mask = ~timeseries[col].mask
else:
signal_mask = ~np.isnan(timeseries[col])
if uncertainty_column is not None:
dy_col = timeseries[uncertainty_column[i]]
if np.ma.is_masked(dy_col):
signal_mask &= ~dy_col.mask
else:
signal_mask &= ~np.isnan(dy_col)
t.append(time[signal_mask].mjd * u.day)
y.append(timeseries[col][signal_mask])
band.append([band_labels[i]] * sum(signal_mask))
dy.append(timeseries[uncertainty_column[i]][signal_mask])
t = np.hstack(t)
y = np.hstack(y)
band = np.hstack(band)
if uncertainty_column is not None:
dy = np.hstack(dy)
if len(dy) == 0:
dy = None
return cls(t, y, band, dy=dy, **kwargs)
def _validate_inputs(self, t, y, bands, dy):
# Validate shapes of inputs
if dy is None:
t, y, bands = np.broadcast_arrays(t, y, bands, subok=True)
else:
t, y, bands, dy = np.broadcast_arrays(t, y, bands, dy, subok=True)
if t.ndim != 1:
raise ValueError("Inputs (t, y, bands, dy) must be 1-dimensional")
# validate units of inputs if any is a Quantity
if any(has_units(arr) for arr in (t, y, bands, dy)):
t, y = map(u.Quantity, (t, y))
if dy is not None:
dy = u.Quantity(dy)
try:
dy = u.Quantity(dy, unit=y.unit)
except u.UnitConversionError:
raise ValueError("Units of dy not equivalent to units of y")
return t, y, bands, dy
[docs]
def autofrequency(
self,
samples_per_peak=5,
nyquist_factor=5,
minimum_frequency=None,
maximum_frequency=None,
return_freq_limits=False,
):
"""Determine a suitable frequency grid for data.
Note that this assumes the peak width is driven by the observational
baseline, which is generally a good assumption when the baseline is
much larger than the oscillation period.
If you are searching for periods longer than the baseline of your
observations, this may not perform well.
Even with a large baseline, be aware that the maximum frequency
returned is based on the concept of "average Nyquist frequency", which
may not be useful for irregularly-sampled data. The maximum frequency
can be adjusted via the nyquist_factor argument, or through the
maximum_frequency argument.
Parameters
----------
samples_per_peak : float, optional
The approximate number of desired samples across the typical peak
nyquist_factor : float, optional
The multiple of the average nyquist frequency used to choose the
maximum frequency if maximum_frequency is not provided.
minimum_frequency : float, optional
If specified, then use this minimum frequency rather than one
chosen based on the size of the baseline.
maximum_frequency : float, optional
If specified, then use this maximum frequency rather than one
chosen based on the average nyquist frequency.
return_freq_limits : bool, optional
if True, return only the frequency limits rather than the full
frequency grid.
Returns
-------
frequency : ndarray or `~astropy.units.Quantity` ['frequency']
The heuristically-determined optimal frequency bin
"""
if hasattr(self._trel, "unit"):
unit = self._trel.unit
trel = self._trel.to(u.day) # do frequency calculations in days
else:
trel = self._trel
baseline = trel.max() - trel.min()
n_samples = trel.size
df = 1.0 / baseline / samples_per_peak
if minimum_frequency is None:
minimum_frequency = 0.5 * df
if maximum_frequency is None:
avg_nyquist = 0.5 * n_samples / baseline
maximum_frequency = nyquist_factor * avg_nyquist
# Convert back to the input units
if hasattr(self._trel, "unit"):
df = df.to(1 / unit)
minimum_frequency = minimum_frequency.to(1 / unit)
maximum_frequency = maximum_frequency.to(1 / unit)
n_freq = 1 + int(np.round((maximum_frequency - minimum_frequency) / df))
if return_freq_limits:
return minimum_frequency, minimum_frequency + df * (n_freq - 1)
else:
return minimum_frequency + df * np.arange(n_freq)
[docs]
def autopower(
self,
method="flexible",
sb_method="auto",
normalization="standard",
samples_per_peak=5,
nyquist_factor=5,
minimum_frequency=None,
maximum_frequency=None,
):
"""Compute Lomb-Scargle power at automatically-determined frequencies.
Parameters
----------
method : str, optional
specify the multi-band lomb scargle implementation to use. Options are:
- 'flexible': Constructs a common model, and an offset model per individual
band. Applies regularization to the resulting model to constrain
complexity.
- 'fast': Passes each band individually through LombScargle (single-band),
combines periodograms at the end by weight. Speed depends on single-band
method chosen in 'sb_method'.
sb_method : str, optional
specify the single-band lomb scargle implementation to use, only in
the case of using the 'fast' multiband method. Options are:
- 'auto': choose the best method based on the input
- 'fast': use the O[N log N] fast method. Note that this requires
evenly-spaced frequencies: by default this will be checked unless
``assume_regular_frequency`` is set to True.
- 'slow': use the O[N^2] pure-python implementation
- 'cython': use the O[N^2] cython implementation. This is slightly
faster than method='slow', but much more memory efficient.
- 'chi2': use the O[N^2] chi2/linear-fitting implementation
- 'fastchi2': use the O[N log N] chi2 implementation. Note that this
requires evenly-spaced frequencies: by default this will be checked
unless ``assume_regular_frequency`` is set to True.
- 'scipy': use ``scipy.signal.lombscargle``, which is an O[N^2]
implementation written in C. Note that this does not support
heteroskedastic errors.
normalization : {'standard', 'model', 'log', 'psd'}, optional
If specified, override the normalization specified at instantiation.
samples_per_peak : float, optional
The approximate number of desired samples across the typical peak
nyquist_factor : float, optional
The multiple of the average nyquist frequency used to choose the
maximum frequency if maximum_frequency is not provided.
minimum_frequency : float or `~astropy.units.Quantity` ['frequency'], optional
If specified, then use this minimum frequency rather than one
chosen based on the size of the baseline. Should be `~astropy.units.Quantity`
if inputs to LombScargle are `~astropy.units.Quantity`.
maximum_frequency : float or `~astropy.units.Quantity` ['frequency'], optional
If specified, then use this maximum frequency rather than one
chosen based on the average nyquist frequency. Should be `~astropy.units.Quantity`
if inputs to LombScargle are `~astropy.units.Quantity`.
Returns
-------
frequency, power : ndarray
The frequency and Lomb-Scargle power
"""
frequency = self.autofrequency(
samples_per_peak=samples_per_peak,
nyquist_factor=nyquist_factor,
minimum_frequency=minimum_frequency,
maximum_frequency=maximum_frequency,
)
power = self.power(
frequency, method=method, sb_method=sb_method, normalization=normalization
)
return frequency, power
[docs]
def power(
self, frequency, method="flexible", sb_method="auto", normalization="standard"
):
"""Compute the Lomb-Scargle power at the given frequencies.
Parameters
----------
frequency : array-like or `~astropy.units.Quantity` ['frequency']
frequencies (not angular frequencies) at which to evaluate the
periodogram. Note that in order to use method='fast', frequencies
must be regularly-spaced.
method : str, optional
specify the multi-band lomb scargle implementation to use. Options are:
- 'flexible': Constructs a common model, and an offset model per individual
band. Applies regularization to the resulting model to constrain
complexity.
- 'fast': Passes each band individually through LombScargle (single-band),
combines periodograms at the end by weight. Speed depends on single-band
method chosen in 'sb_method'.
sb_method : str, optional
specify the single-band lomb scargle implementation to use, only in
the case of using the 'fast' multiband method. Options can be found
in `~astropy.timeseries.LombScargle`.
normalization : {'standard', 'model', 'log', 'psd'}, optional
If specified, override the normalization specified at instantiation.
Returns
-------
power : ndarray
The Lomb-Scargle power at the specified frequency
"""
if normalization is None:
normalization = self.normalization
frequency = self._validate_frequency(frequency)
f_shape = np.shape(frequency)
power = lombscargle_multiband(
strip_units(self._trel),
strip_units(self.y),
strip_units(self.bands),
dy=strip_units(self.dy),
frequency=strip_units(np.ravel(frequency)),
method=method,
sb_method=sb_method,
normalization=normalization,
nterms_base=self.nterms_base,
nterms_band=self.nterms_band,
reg_base=self.reg_base,
reg_band=self.reg_band,
regularize_by_trace=self.regularize_by_trace,
center_data=self.center_data,
fit_mean=self.fit_mean,
)
return np.reshape(power * self._power_unit(normalization), f_shape)
[docs]
def design_matrix(self, frequency, t_fit=None, bands_fit=None):
"""Compute the design matrix for a given frequency
Parameters
----------
frequency : float
the frequency for the model
t_fit : array-like, `~astropy.units.Quantity`, or `~astropy.time.Time` (optional)
Times (length ``n_samples``) at which to compute the model.
If not specified, then the times and uncertainties of the input
data are used.
bands_fit : array-like, or str
Bands to use in fitting, must be subset of bands in input data.
Returns
-------
ndarray
The design matrix for the model at the given frequency.
This should have a shape of (``len(t)``, ``n_parameters``).
See Also
--------
model
model_parameters
offset
"""
if t_fit is None:
t_fit, dy = strip_units(self._trel, self.dy)
else:
t_fit, dy = strip_units(
self._validate_t(self._as_relative_time("t", t_fit)), None
)
if bands_fit is None:
bands_fit = np.unique(self.bands)
elif type(bands_fit) == str:
bands_fit = [bands_fit]
unique_bands = np.unique(bands_fit)
unique_bands_fit = np.unique(bands_fit)
bands_fit = bands_fit[:, np.newaxis]
if not set(unique_bands_fit).issubset(set(unique_bands)):
raise ValueError(
"bands_fit does not match training data: "
f"input: {set(unique_bands_fit)} output: {set(unique_bands)}"
)
t_fit, bands_fit = np.broadcast_arrays(t_fit, bands_fit)
return design_matrix(
t_fit.ravel(),
bands_fit.ravel(),
frequency,
dy,
nterms_base=self.nterms_base,
nterms_band=self.nterms_band,
)
[docs]
def model(self, t, frequency, bands_fit=None):
"""Compute the Lomb-Scargle model at the given frequency.
The model at a particular frequency is a linear model:
model = offset + dot(design_matrix, model_parameters)
Parameters
----------
t : array-like or `~astropy.units.Quantity` ['time']
Times (length ``n_samples``) at which to compute the model.
frequency : float
the frequency for the model
bands_fit : list or array-like
the unique bands to fit in the model
Returns
-------
y : np.ndarray
The model fit corresponding to the input times.
Will have shape (``n_bands``,``n_samples``).
See Also
--------
design_matrix
offset
model_parameters
"""
if bands_fit is None:
bands_fit = np.unique(self.bands)
frequency = self._validate_frequency(frequency)
t = self._validate_t(self._as_relative_time("t", t))
y_fit = periodic_fit(
*strip_units(self._trel, self.y, self.dy),
bands=self.bands,
frequency=strip_units(frequency),
t_fit=strip_units(t),
bands_fit=bands_fit,
center_data=self.center_data,
nterms_base=self.nterms_base,
nterms_band=self.nterms_band,
reg_base=self.reg_base,
reg_band=self.reg_band,
regularize_by_trace=self.regularize_by_trace,
)
return y_fit * get_unit(self.y)
[docs]
def offset(self, t_fit=None, bands_fit=None):
"""Return the offset array of the model
The offset array of the model is the (weighted) mean of the y values in each band.
Note that if self.center_data is False, the offset is 0 by definition.
Parameters
----------
t_fit : array-like, `~astropy.units.Quantity`, or `~astropy.time.Time` (optional)
Times (length ``n_samples``) at which to compute the model.
If not specified, then the times and uncertainties of the input
data are used.
bands_fit : array-like, or str
Bands to use in fitting, must be subset of bands in input data.
Returns
-------
offset : array
See Also
--------
design_matrix
model
model_parameters
"""
if bands_fit is None:
bands_fit = np.unique(self.bands)
if t_fit is None:
on_fit = False
t_fit = self.t
else:
on_fit = True
bands_fit = bands_fit[:, np.newaxis]
unique_bands = np.unique(self.bands)
unique_bands_fit = np.unique(bands_fit)
if not set(unique_bands_fit).issubset(set(unique_bands)):
raise ValueError(
"bands_fit does not match training data: "
f"input: {set(unique_bands_fit)} output: {set(unique_bands)}"
)
y, dy = strip_units(self.y, self.dy)
if np.shape(t_fit) != np.shape(
bands_fit
): # No need to broadcast if bands map to timestamps
t_fit, bands_fit = np.broadcast_arrays(t_fit, bands_fit)
# need to make sure all unique filters are represented
u, i = np.unique(
np.concatenate([bands_fit.ravel(), unique_bands]), return_inverse=True
)
if not self.center_data:
return 0
if dy is None:
dy = 1
dy = np.broadcast_to(dy, y.shape)
# Calculate ymeans -- per band
ymeans = np.zeros(y.shape) # filter specific means
for band in unique_bands:
mask = self.bands == band
ymeans[mask] = np.average(y[mask], weights=1 / dy[mask] ** 2)
ymeans_fit = ymeans[i[: -len(unique_bands)]]
if on_fit:
return ymeans_fit * get_unit(self.y)
else:
return ymeans * get_unit(self.y)
[docs]
def model_parameters(self, frequency, units=True):
r"""Compute the best-fit model parameters at the given frequency.
The model described by these parameters is:
.. math::
y(t; f, \vec{\theta}) = \theta_0 + \sum_{n=1}^{\tt nterms_base} [\theta_{2n-1}\sin(2\pi n f t) + \theta_{2n}\cos(2\pi n f t)]
+ \theta_0^{(k)} + \sum_{n=1}^{\tt nterms_band} [\theta_{2n-1}^{(k)}\sin(2\pi n f t) +
where :math:`\vec{\theta}` is the array of parameters returned by this function.
Parameters
----------
frequency : float
the frequency for the model
units : bool
If True (default), return design matrix with data units.
Returns
-------
theta : np.ndarray (n_parameters,)
The best-fit model parameters at the given frequency.
See Also
--------
design_matrix
model
offset
"""
frequency = self._validate_frequency(frequency)
t, y, dy = strip_units(self._trel, self.y, self.dy)
if self.center_data:
y = y - strip_units(self.offset())
dy = np.ones_like(y) if dy is None else np.asarray(dy)
X = design_matrix(
t,
self.bands,
frequency,
dy=dy,
nterms_base=self.nterms_base,
nterms_band=self.nterms_band,
)
regularization = construct_regularization(
self.bands,
nterms_base=self.nterms_base,
nterms_band=self.nterms_band,
reg_base=self.reg_base,
reg_band=self.reg_band,
)
M = np.dot(X.T, X)
if regularization is not None:
# M is being affected by operations on diag
diag = M.ravel(order="K")[:: M.shape[0] + 1]
if self.regularize_by_trace:
diag += diag.sum() * np.asarray(regularization)
else:
diag += np.asarray(regularization)
try:
parameters = np.linalg.solve(M, np.dot(X.T, y / dy))
except np.linalg.LinAlgError:
parameters = np.dot(M, np.linalg.lstsq(X.T, y / dy)[0])
if units:
parameters = get_unit(self.y) * parameters
return parameters
[docs]
def false_alarm_probability(self):
"""Not Implemented"""
raise NotImplementedError
[docs]
def false_alarm_level(self):
"""Not Implemented"""
raise NotImplementedError
[docs]
def distribution(self):
"""Not Implemented"""
raise NotImplementedError