# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
An extensible ASCII table reader and writer.
Classes to read DAOphot table format
:Copyright: Smithsonian Astrophysical Observatory (2011)
:Author: Tom Aldcroft (aldcroft@head.cfa.harvard.edu)
"""
import itertools as itt
import re
from collections import defaultdict
import numpy as np
from . import core, fixedwidth
from .misc import first_false_index, first_true_index, groupmore
class DaophotHeader(core.BaseHeader):
"""
Read the header from a file produced by the IRAF DAOphot routine.
"""
comment = r"\s*#K"
# Regex for extracting the format strings
re_format = re.compile(r"%-?(\d+)\.?\d?[sdfg]")
re_header_keyword = re.compile(
r"[#]K\s+ (?P<name> \w+)\s* = (?P<stuff> .+) $", re.VERBOSE
)
aperture_values = ()
def __init__(self):
core.BaseHeader.__init__(self)
def parse_col_defs(self, grouped_lines_dict):
"""Parse a series of column definition lines.
Examples
--------
When parsing, there may be several such blocks in a single file
(where continuation characters have already been stripped).
#N ID XCENTER YCENTER MAG MERR MSKY NITER
#U ## pixels pixels magnitudes magnitudes counts ##
#F %-9d %-10.3f %-10.3f %-12.3f %-14.3f %-15.7g %-6d
"""
line_ids = ("#N", "#U", "#F")
coldef_dict = defaultdict(list)
# Function to strip identifier lines
stripper = lambda s: s[2:].strip(" \\")
for defblock in zip(*map(grouped_lines_dict.get, line_ids)):
for key, line in zip(line_ids, map(stripper, defblock)):
coldef_dict[key].append(line.split())
# Save the original columns so we can use it later to reconstruct the
# original header for writing
if self.data.is_multiline:
# Database contains multi-aperture data.
# Autogen column names, units, formats from last row of column headers
last_names, last_units, last_formats = list(
zip(*map(coldef_dict.get, line_ids))
)[-1]
N_multiline = len(self.data.first_block)
for i in np.arange(1, N_multiline + 1).astype("U2"):
# extra column names eg. RAPERT2, SUM2 etc...
extended_names = list(map("".join, zip(last_names, itt.repeat(i))))
if i == "1": # Enumerate the names starting at 1
coldef_dict["#N"][-1] = extended_names
else:
coldef_dict["#N"].append(extended_names)
coldef_dict["#U"].append(last_units)
coldef_dict["#F"].append(last_formats)
# Get column widths from column format specifiers
get_col_width = lambda s: int(self.re_format.search(s).groups()[0])
col_widths = [
[get_col_width(f) for f in formats] for formats in coldef_dict["#F"]
]
# original data format might be shorter than 80 characters and filled with spaces
row_widths = np.fromiter(map(sum, col_widths), int)
row_short = Daophot.table_width - row_widths
# fix last column widths
for w, r in zip(col_widths, row_short):
w[-1] += r
self.col_widths = col_widths
# merge the multi-line header data into single line data
return {k: list(itt.chain(*v)) for (k, v) in coldef_dict.items()}
def update_meta(self, lines, meta):
"""
Extract table-level keywords for DAOphot table. These are indicated by
a leading '#K ' prefix.
"""
table_meta = meta["table"]
# self.lines = self.get_header_lines(lines)
Nlines = len(self.lines)
if Nlines > 0:
# Group the header lines according to their line identifiers (#K,
# #N, #U, #F or just # (spacer line)) function that grabs the line
# identifier
get_line_id = lambda s: s.split(None, 1)[0]
# Group lines by the line identifier ('#N', '#U', '#F', '#K') and
# capture line index
gid, groups = zip(*groupmore(get_line_id, self.lines, range(Nlines)))
# Groups of lines and their indices
grouped_lines, gix = zip(*groups)
# Dict of line groups keyed by line identifiers
grouped_lines_dict = dict(zip(gid, grouped_lines))
# Update the table_meta keywords if necessary
if "#K" in grouped_lines_dict:
keywords = dict(
map(self.extract_keyword_line, grouped_lines_dict["#K"])
)
table_meta["keywords"] = keywords
coldef_dict = self.parse_col_defs(grouped_lines_dict)
line_ids = ("#N", "#U", "#F")
for name, unit, fmt in zip(*map(coldef_dict.get, line_ids)):
meta["cols"][name] = {"unit": unit, "format": fmt}
self.meta = meta
self.names = coldef_dict["#N"]
def extract_keyword_line(self, line):
"""
Extract info from a header keyword line (#K).
"""
m = self.re_header_keyword.match(line)
if m:
vals = m.group("stuff").strip().rsplit(None, 2)
keyword_dict = {
"units": vals[-2],
"format": vals[-1],
"value": (vals[0] if len(vals) > 2 else ""),
}
return m.group("name"), keyword_dict
def get_cols(self, lines):
"""
Initialize the header Column objects from the table ``lines`` for a DAOphot
header. The DAOphot header is specialized so that we just copy the entire BaseHeader
get_cols routine and modify as needed.
Parameters
----------
lines : list
List of table lines
Returns
-------
col : list
List of table Columns
"""
if not self.names:
raise core.InconsistentTableError("No column names found in DAOphot header")
# Create the list of io.ascii column objects
self._set_cols_from_names()
# Set unit and format as needed.
coldefs = self.meta["cols"]
for col in self.cols:
unit, fmt = map(coldefs[col.name].get, ("unit", "format"))
if unit != "##":
col.unit = unit
if fmt != "##":
col.format = fmt
# Set column start and end positions.
col_width = list(itt.chain.from_iterable(self.col_widths))
ends = np.cumsum(col_width)
starts = ends - col_width
for i, col in enumerate(self.cols):
col.start, col.end = starts[i], ends[i]
col.span = col.end - col.start
if hasattr(col, "format"):
if any(x in col.format for x in "fg"):
col.type = core.FloatType
elif "d" in col.format:
col.type = core.IntType
elif "s" in col.format:
col.type = core.StrType
# INDEF is the missing value marker
self.data.fill_values.append(("INDEF", "0"))
class DaophotData(core.BaseData):
splitter_class = fixedwidth.FixedWidthSplitter
start_line = 0
comment = r"\s*#"
def __init__(self):
core.BaseData.__init__(self)
self.is_multiline = False
def get_data_lines(self, lines):
# Special case for multiline daophot databases. Extract the aperture
# values from the first multiline data block
if self.is_multiline:
# Grab the first column of the special block (aperture values) and
# recreate the aperture description string
aplist = next(zip(*map(str.split, self.first_block)))
self.header.aperture_values = tuple(map(float, aplist))
# Set self.data.data_lines to a slice of lines contain the data rows
core.BaseData.get_data_lines(self, lines)
class DaophotInputter(core.ContinuationLinesInputter):
continuation_char = "\\"
multiline_char = "*"
replace_char = " "
re_multiline = re.compile(r"(#?)[^\\*#]*(\*?)(\\*) ?$")
def search_multiline(self, lines, depth=150):
"""
Search lines for special continuation character to determine number of
continued rows in a datablock. For efficiency, depth gives the upper
limit of lines to search.
"""
# The list of apertures given in the #K APERTURES keyword may not be
# complete!! This happens if the string description of the aperture
# list is longer than the field width of the #K APERTURES field. In
# this case we have to figure out how many apertures there are based on
# the file structure.
comment, special, cont = zip(
*(self.re_multiline.search(line).groups() for line in lines[:depth])
)
# Find first non-comment line
data_start = first_false_index(comment)
# No data in lines[:depth]. This may be because there is no data in
# the file, or because the header is really huge. If the latter,
# increasing the search depth should help
if data_start is None:
return None, None, lines[:depth]
header_lines = lines[:data_start]
# Find first line ending on special row continuation character '*'
# indexed relative to data_start
first_special = first_true_index(special[data_start:depth])
if first_special is None: # no special lines
return None, None, header_lines
# last line ending on special '*', but not on line continue '/'
last_special = first_false_index(special[data_start + first_special : depth])
# index relative to first_special
# if first_special is None: #no end of special lines within search
# depth! increase search depth return self.search_multiline( lines,
# depth=2*depth )
# indexing now relative to line[0]
markers = np.cumsum([data_start, first_special, last_special])
# multiline portion of first data block
multiline_block = lines[markers[1] : markers[-1]]
return markers, multiline_block, header_lines
def process_lines(self, lines):
markers, block, header = self.search_multiline(lines)
self.data.is_multiline = markers is not None
self.data.markers = markers
self.data.first_block = block
# set the header lines returned by the search as a attribute of the header
self.data.header.lines = header
if markers is not None:
lines = lines[markers[0] :]
continuation_char = self.continuation_char
multiline_char = self.multiline_char
replace_char = self.replace_char
parts = []
outlines = []
for i, line in enumerate(lines):
mo = self.re_multiline.search(line)
if mo:
comment, special, cont = mo.groups()
if comment or cont:
line = line.replace(continuation_char, replace_char)
if special:
line = line.replace(multiline_char, replace_char)
if cont and not comment:
parts.append(line)
if not cont:
parts.append(line)
outlines.append("".join(parts))
parts = []
else:
raise core.InconsistentTableError(
f"multiline re could not match line {i}: {line}"
)
return outlines
[docs]
class Daophot(core.BaseReader):
"""
DAOphot format table.
Example::
#K MERGERAD = INDEF scaleunit %-23.7g
#K IRAF = NOAO/IRAFV2.10EXPORT version %-23s
#K USER = davis name %-23s
#K HOST = tucana computer %-23s
#
#N ID XCENTER YCENTER MAG MERR MSKY NITER \\
#U ## pixels pixels magnitudes magnitudes counts ## \\
#F %-9d %-10.3f %-10.3f %-12.3f %-14.3f %-15.7g %-6d
#
#N SHARPNESS CHI PIER PERROR \\
#U ## ## ## perrors \\
#F %-23.3f %-12.3f %-6d %-13s
#
14 138.538 INDEF 15.461 0.003 34.85955 4 \\
-0.032 0.802 0 No_error
The keywords defined in the #K records are available via the output table
``meta`` attribute::
>>> import os
>>> from astropy.io import ascii
>>> filename = os.path.join(ascii.__path__[0], 'tests/data/daophot.dat')
>>> data = ascii.read(filename)
>>> for name, keyword in data.meta['keywords'].items():
... print(name, keyword['value'], keyword['units'], keyword['format'])
...
MERGERAD INDEF scaleunit %-23.7g
IRAF NOAO/IRAFV2.10EXPORT version %-23s
USER name %-23s
...
The unit and formats are available in the output table columns::
>>> for colname in data.colnames:
... col = data[colname]
... print(colname, col.unit, col.format)
...
ID None %-9d
XCENTER pixels %-10.3f
YCENTER pixels %-10.3f
...
Any column values of INDEF are interpreted as a missing value and will be
masked out in the resultant table.
In case of multi-aperture daophot files containing repeated entries for the last
row of fields, extra unique column names will be created by suffixing
corresponding field names with numbers starting from 2 to N (where N is the
total number of apertures).
For example,
first aperture radius will be RAPERT and corresponding magnitude will be MAG,
second aperture radius will be RAPERT2 and corresponding magnitude will be MAG2,
third aperture radius will be RAPERT3 and corresponding magnitude will be MAG3,
and so on.
"""
_format_name = "daophot"
_io_registry_format_aliases = ["daophot"]
_io_registry_can_write = False
_description = "IRAF DAOphot format table"
header_class = DaophotHeader
data_class = DaophotData
inputter_class = DaophotInputter
table_width = 80
def __init__(self):
core.BaseReader.__init__(self)
# The inputter needs to know about the data (see DaophotInputter.process_lines)
self.inputter.data = self.data
[docs]
def write(self, table=None):
raise NotImplementedError