Source code for

# Licensed under a 3-clause BSD style license - see LICENSE.rst
Define the Enhanced Character-Separated-Values (ECSV) which allows for reading and
writing all the meta data associated with an astropy Table object.

import json
import re
import warnings
from collections import OrderedDict

import numpy as np

from import convert_numpy
from astropy.table import meta, serialize
from astropy.utils.data_info import serialize_context_as
from astropy.utils.exceptions import AstropyUserWarning

from . import basic, core

DELIMITERS = (" ", ",")
)  # Raise warning if not one of these standard dtypes

class InvalidEcsvDatatypeWarning(AstropyUserWarning):
    ECSV specific Astropy warning class.

class EcsvHeader(basic.BasicHeader):
    """Header class for which the column definition line starts with the
    comment character.  See the :class:`CommentedHeader` class  for an example.

    def process_lines(self, lines):
        """Return only non-blank lines that start with the comment regexp.  For these
        lines strip out the matching characters and leading/trailing whitespace.
        re_comment = re.compile(self.comment)
        for line in lines:
            line = line.strip()
            if not line:
            match = re_comment.match(line)
            if match:
                out = line[match.end() :]
                if out:
                    yield out
                # Stop iterating on first failed match for a non-blank line

    def write(self, lines):
        Write header information in the ECSV ASCII format.

        This function is called at the point when preprocessing has been done to
        convert the input table columns to `self.cols` which is a list of
        `` objects. In particular `col.str_vals`
        is available for each column with the string representation of each
        column item for output.

        This format starts with a delimiter separated list of the column names
        in order to make this format readable by humans and simple csv-type
        readers. It then encodes the full table meta and column attributes and
        meta as YAML and pretty-prints this in the header.  Finally the
        delimited column names are repeated again, for humans and readers that
        look for the *last* comment line as defining the column names.
        if self.splitter.delimiter not in DELIMITERS:
            raise ValueError(
                "only space and comma are allowed for delimiter in ECSV format"

        # Now assemble the header dict that will be serialized by the YAML dumper
        header = {"cols": self.cols, "schema": "astropy-2.0"}

        if self.table_meta:
            header["meta"] = OrderedDict(self.table_meta)

        # Set the delimiter only for the non-default option(s)
        if self.splitter.delimiter != " ":
            header["delimiter"] = self.splitter.delimiter

        header_yaml_lines = [
            f"%ECSV {ECSV_VERSION}",
        ] + meta.get_yaml_from_header(header)

        lines.extend([self.write_comment + line for line in header_yaml_lines])
        lines.append(self.splitter.join([ for x in self.cols]))

    def write_comments(self, lines, meta):
        WRITE: Override the default write_comments to do nothing since this is handled
        in the custom write method.

    def update_meta(self, lines, meta):
        READ: Override the default update_meta to do nothing.  This process is done
        in get_cols() for this reader.

    def get_cols(self, lines):
        READ: Initialize the header Column objects from the table ``lines``.

        lines : list
            List of table lines

        # Cache a copy of the original input lines before processing below
        raw_lines = lines

        # Extract non-blank comment (header) lines with comment character stripped
        lines = list(self.process_lines(lines))

        # Validate that this is a ECSV file
        ecsv_header_re = r"""%ECSV [ ]
                             (?P<major> \d+)
                             \. (?P<minor> \d+)
                             \.? (?P<bugfix> \d+)? $"""

        no_header_msg = (
            'ECSV header line like "# %ECSV <version>" not found as first line.'
            "  This is required for a ECSV file."

        if not lines:
            raise core.InconsistentTableError(no_header_msg)

        match = re.match(ecsv_header_re, lines[0].strip(), re.VERBOSE)
        if not match:
            raise core.InconsistentTableError(no_header_msg)

            header = meta.get_header_from_yaml(lines)
        except meta.YamlParseError:
            raise core.InconsistentTableError("unable to parse yaml in meta header")

        if "meta" in header:
            self.table_meta = header["meta"]

        if "delimiter" in header:
            delimiter = header["delimiter"]
            if delimiter not in DELIMITERS:
                raise ValueError(
                    "only space and comma are allowed for delimiter in ECSV format"
            self.splitter.delimiter = delimiter
   = delimiter

        # Create the list of io.ascii column objects from `header`
        header_cols = {x["name"]: x for x in header["datatype"]}

        self.names = [x["name"] for x in header["datatype"]]

        # Read the first non-commented line of table and split to get the CSV
        # header column names.  This is essentially what the Basic reader does.
            header_line = next(super().process_lines(raw_lines))
            header_names = next(self.splitter([header_line]))
        except StopIteration:
            # there are no non-commented lines
            header_line = ""
            header_names = []

        # Check for consistency of the ECSV vs. CSV header column names
        if header_names != self.names:
            raise core.InconsistentTableError(
                f"column names from ECSV header {self.names} do not "
                f"match names from header line of CSV data {header_names}"

        # BaseHeader method to create self.cols, which is a list of
        # io.ascii.core.Column objects (*not* Table Column objects).

        # Transfer attributes from the column descriptor stored in the input
        # header YAML metadata to the new columns to create this table.
        for col in self.cols:
            for attr in ("description", "format", "unit", "meta", "subtype"):
                if attr in header_cols[]:
                    setattr(col, attr, header_cols[][attr])

            col.dtype = header_cols[]["datatype"]
            # Warn if col dtype is not a valid ECSV datatype, but allow reading for
            # back-compatibility with existing older files that have numpy datatypes
            # like datetime64 or object or python str, which are not in the ECSV standard.
            if col.dtype not in ECSV_DATATYPES:
                msg = (
                    f"unexpected datatype {col.dtype!r} of column {!r} "
                    f"is not in allowed ECSV datatypes {ECSV_DATATYPES}. "
                    "Using anyway as a numpy dtype but beware since unexpected "
                    "results are possible."
                warnings.warn(msg, category=InvalidEcsvDatatypeWarning)

            # Subtype is written like "int64[2,null]" and we want to split this
            # out to "int64" and [2, None].
            subtype = col.subtype
            if subtype and "[" in subtype:
                idx = subtype.index("[")
                col.subtype = subtype[:idx]
                col.shape = json.loads(subtype[idx:])

            # Convert ECSV "string" to numpy "str"
            for attr in ("dtype", "subtype"):
                if getattr(col, attr) == "string":
                    setattr(col, attr, "str")

            # ECSV subtype of 'json' maps to numpy 'object' dtype
            if col.subtype == "json":
                col.subtype = "object"

def _check_dtype_is_str(col):
    if col.dtype != "str":
        raise ValueError(f'datatype of column {!r} must be "string"')

class EcsvOutputter(core.TableOutputter):
    After reading the input lines and processing, convert the Reader columns
    and metadata to an astropy.table.Table object.  This overrides the default
    converters to be an empty list because there is no "guessing" of the
    conversion function.

    default_converters = []

    def __call__(self, cols, meta):
        # Convert to a Table with all plain Column subclass columns
        out = super().__call__(cols, meta)

        # If mixin columns exist (based on the special '__mixin_columns__'
        # key in the table ``meta``), then use that information to construct
        # appropriate mixin columns and remove the original data columns.
        # If no __mixin_columns__ exists then this function just passes back
        # the input table.
        out = serialize._construct_mixins_from_columns(out)

        return out

    def _convert_vals(self, cols):
        """READ: Convert str_vals in `cols` to final arrays with correct dtypes.

        This is adapted from ``BaseOutputter._convert_vals``. In the case of ECSV
        there is no guessing and all types are known in advance. A big change
        is handling the possibility of JSON-encoded values, both unstructured
        object data and structured values that may contain masked data.
        for col in cols:
                # 1-d or N-d object columns are serialized as JSON.
                if col.subtype == "object":
                    col_vals = [json.loads(val) for val in col.str_vals]
           = np.empty([len(col_vals)] + col.shape, dtype=object)
          [...] = col_vals

                # Variable length arrays with shape (n, m, ..., *) for fixed
                # n, m, .. and variable in last axis. Masked values here are
                # not currently supported.
                elif col.shape and col.shape[-1] is None:

                    # Empty (blank) values in original ECSV are changed to "0"
                    # in str_vals with corresponding col.mask being created and
                    # set accordingly. Instead use an empty list here.
                    if hasattr(col, "mask"):
                        for idx in np.nonzero(col.mask)[0]:
                            col.str_vals[idx] = "[]"

                    # Remake as a 1-d object column of numpy ndarrays or
                    # MaskedArray using the datatype specified in the ECSV file.
                    col_vals = []
                    for str_val in col.str_vals:
                        obj_val = json.loads(str_val)  # list or nested lists
                            arr_val = np.array(obj_val, dtype=col.subtype)
                        except TypeError:
                            # obj_val has entries that are inconsistent with
                            # dtype. For a valid ECSV file the only possibility
                            # is None values (indicating missing values).
                            data = np.array(obj_val, dtype=object)
                            # Replace all the None with an appropriate fill value
                            mask = data == None
                            kind = np.dtype(col.subtype).kind
                            data[mask] = {"U": "", "S": b""}.get(kind, 0)
                            arr_val =, mask=mask)


                    col.shape = ()
                    col.dtype = np.dtype(object)
                    # np.array(col_vals_arr, dtype=object) fails ?? so this workaround:
           = np.empty(len(col_vals), dtype=object)
          [:] = col_vals

                # Multidim columns with consistent shape (n, m, ...). These
                # might be masked.
                elif col.shape:

                    # Change empty (blank) values in original ECSV to something
                    # like "[[null, null],[null,null]]" so subsequent JSON
                    # decoding works. Delete `col.mask` so that later code in
                    # core TableOutputter.__call__() that deals with col.mask
                    # does not run (since handling is done here already).
                    if hasattr(col, "mask"):
                        all_none_arr = np.full(
                            shape=col.shape, fill_value=None, dtype=object
                        all_none_json = json.dumps(all_none_arr.tolist())
                        for idx in np.nonzero(col.mask)[0]:
                            col.str_vals[idx] = all_none_json
                        del col.mask

                    col_vals = [json.loads(val) for val in col.str_vals]
                    # Make a numpy object array of col_vals to look for None
                    # (masked values)
                    data = np.array(col_vals, dtype=object)
                    mask = data == None
                    if not np.any(mask):
                        # No None's, just convert to required dtype
               = data.astype(col.subtype)
                        # Replace all the None with an appropriate fill value
                        kind = np.dtype(col.subtype).kind
                        data[mask] = {"U": "", "S": b""}.get(kind, 0)
                        # Finally make a MaskedArray with the filled data + mask
               =, mask=mask)

                # Regular scalar value column
                    if col.subtype:
                            f"unexpected subtype {col.subtype!r} set for column "
                            f"{!r}, using dtype={col.dtype!r} instead.",
                    converter_func, _ = convert_numpy(col.dtype)
           = converter_func(col.str_vals)

                if[1:] != tuple(col.shape):
                    raise ValueError(
                        "shape mismatch between value and column specifier"

            except json.JSONDecodeError:
                raise ValueError(
                    f"column {!r} failed to convert: "
                    "column value is not valid JSON"
            except Exception as exc:
                raise ValueError(f"column {!r} failed to convert: {exc}")

class EcsvData(basic.BasicData):
    def _set_fill_values(self, cols):
        """READ: Set the fill values of the individual cols based on fill_values of BaseData.

        For ECSV handle the corner case of data that has been serialized using
        the serialize_method='data_mask' option, which writes the full data and
        mask directly, AND where that table includes a string column with zero-length
        string entries ("") which are valid data.

        Normally the super() method will set col.fill_value=('', '0') to replace
        blanks with a '0'.  But for that corner case subset, instead do not do
        any filling.

        # Get the serialized columns spec.  It might not exist and there might
        # not even be any table meta, so punt in those cases.
            scs = self.header.table_meta["__serialized_columns__"]
        except (AttributeError, KeyError):

        # Got some serialized columns, so check for string type and serialized
        # as a MaskedColumn.  Without 'data_mask', MaskedColumn objects are
        # stored to ECSV as normal columns.
        for col in cols:
            if (
                col.dtype == "str"
                and in scs
                and scs[]["__class__"] == "astropy.table.column.MaskedColumn"
                col.fill_values = {}  # No data value replacement

    def str_vals(self):
        """WRITE: convert all values in table to a list of lists of strings.

        This version considerably simplifies the base method:
        - No need to set fill values and column formats
        - No per-item formatting, just use repr()
        - Use JSON for object-type or multidim values
        - Only Column or MaskedColumn can end up as cols here.
        - Only replace masked values with "", not the generalized filling
        for col in self.cols:
            if len(col.shape) > 1 or == "O":

                def format_col_item(idx):
                    obj = col[idx]
                        obj = obj.tolist()
                    except AttributeError:
                    return json.dumps(obj, separators=(",", ":"))


                def format_col_item(idx):
                    return str(col[idx])

                col.str_vals = [format_col_item(idx) for idx in range(len(col))]
            except TypeError as exc:
                raise TypeError(
                    f"could not convert column {!r} to string: {exc}"
                ) from exc

            # Replace every masked value in a 1-d column with an empty string.
            # For multi-dim columns this gets done by JSON via "null".
            if hasattr(col, "mask") and col.ndim == 1:
                for idx in col.mask.nonzero()[0]:
                    col.str_vals[idx] = ""

        out = [col.str_vals for col in self.cols]
        return out

[docs] class Ecsv(basic.Basic): """ECSV (Enhanced Character Separated Values) format table. Th ECSV format allows for specification of key table and column meta-data, in particular the data type and unit. See: Examples -------- >>> from astropy.table import Table >>> ecsv_content = '''# %ECSV 0.9 ... # --- ... # datatype: ... # - {name: a, unit: m / s, datatype: int64, format: '%03d'} ... # - {name: b, unit: km, datatype: int64, description: This is column b} ... a b ... 001 2 ... 004 3 ... ''' >>>, format='ascii.ecsv') <Table length=2> a b m / s km int64 int64 ----- ----- 001 2 004 3 """ _format_name = "ecsv" _description = "Enhanced CSV" _io_registry_suffix = ".ecsv" header_class = EcsvHeader data_class = EcsvData outputter_class = EcsvOutputter max_ndim = None # No limit on column dimensionality
[docs] def update_table_data(self, table): """ Update table columns in place if mixin columns are present. This is a hook to allow updating the table columns after name filtering but before setting up to write the data. This is currently only used by ECSV and is otherwise just a pass-through. Parameters ---------- table : `astropy.table.Table` Input table for writing Returns ------- table : `astropy.table.Table` Output table for writing """ with serialize_context_as("ecsv"): out = serialize.represent_mixins_as_columns(table) return out