Source code for astropy.io.registry

# Licensed under a 3-clause BSD style license - see LICENSE.rst


import contextlib
import pathlib
import re
import sys
import inspect
import os

from collections import OrderedDict
from operator import itemgetter

import numpy as np


__all__ = ['register_reader', 'register_writer', 'register_identifier',
           'identify_format', 'get_reader', 'get_writer', 'read', 'write',
           'get_formats', 'IORegistryError', 'delay_doc_updates',
           'UnifiedReadWriteMethod', 'UnifiedReadWrite']


__doctest_skip__ = ['register_identifier']


_readers = OrderedDict()
_writers = OrderedDict()
_identifiers = OrderedDict()

PATH_TYPES = (str, pathlib.Path)


[docs]class IORegistryError(Exception): """Custom error for registry clashes. """ pass
# If multiple formats are added to one class the update of the docs is quite # expensive. Classes for which the doc update is temporarly delayed are added # to this set. _delayed_docs_classes = set()
[docs]@contextlib.contextmanager def delay_doc_updates(cls): """Contextmanager to disable documentation updates when registering reader and writer. The documentation is only built once when the contextmanager exits. .. versionadded:: 1.3 Parameters ---------- cls : class Class for which the documentation updates should be delayed. Notes ----- Registering multiple readers and writers can cause significant overhead because the documentation of the corresponding ``read`` and ``write`` methods are build every time. .. warning:: This contextmanager is experimental and may be replaced by a more general approach. Examples -------- see for example the source code of ``astropy.table.__init__``. """ _delayed_docs_classes.add(cls) yield _delayed_docs_classes.discard(cls) _update__doc__(cls, 'read') _update__doc__(cls, 'write')
[docs]def get_formats(data_class=None, readwrite=None): """ Get the list of registered I/O formats as a Table. Parameters ---------- data_class : classobj, optional Filter readers/writer to match data class (default = all classes). readwrite : str or None, optional Search only for readers (``"Read"``) or writers (``"Write"``). If None search for both. Default is None. .. versionadded:: 1.3 Returns ------- format_table : Table Table of available I/O formats. """ from astropy.table import Table format_classes = sorted(set(_readers) | set(_writers), key=itemgetter(0)) rows = [] for format_class in format_classes: if (data_class is not None and not _is_best_match( data_class, format_class[1], format_classes)): continue has_read = 'Yes' if format_class in _readers else 'No' has_write = 'Yes' if format_class in _writers else 'No' has_identify = 'Yes' if format_class in _identifiers else 'No' # Check if this is a short name (e.g. 'rdb') which is deprecated in # favor of the full 'ascii.rdb'. ascii_format_class = ('ascii.' + format_class[0], format_class[1]) deprecated = 'Yes' if ascii_format_class in format_classes else '' rows.append((format_class[1].__name__, format_class[0], has_read, has_write, has_identify, deprecated)) if readwrite is not None: if readwrite == 'Read': rows = [row for row in rows if row[2] == 'Yes'] elif readwrite == 'Write': rows = [row for row in rows if row[3] == 'Yes'] else: raise ValueError('unrecognized value for "readwrite": {0}.\n' 'Allowed are "Read" and "Write" and None.') # Sorting the list of tuples is much faster than sorting it after the table # is created. (#5262) if rows: # Indices represent "Data Class", "Deprecated" and "Format". data = list(zip(*sorted(rows, key=itemgetter(0, 5, 1)))) else: data = None format_table = Table(data, names=('Data class', 'Format', 'Read', 'Write', 'Auto-identify', 'Deprecated')) if not np.any(format_table['Deprecated'] == 'Yes'): format_table.remove_column('Deprecated') return format_table
def _update__doc__(data_class, readwrite): """ Update the docstring to include all the available readers / writers for the ``data_class.read`` or ``data_class.write`` functions (respectively). """ FORMATS_TEXT = 'The available built-in formats are:' # Get the existing read or write method and its docstring class_readwrite_func = getattr(data_class, readwrite) if not isinstance(class_readwrite_func.__doc__, str): # No docstring--could just be test code, or possibly code compiled # without docstrings return lines = class_readwrite_func.__doc__.splitlines() # Find the location of the existing formats table if it exists sep_indices = [ii for ii, line in enumerate(lines) if FORMATS_TEXT in line] if sep_indices: # Chop off the existing formats table, including the initial blank line chop_index = sep_indices[0] lines = lines[:chop_index] # Find the minimum indent, skipping the first line because it might be odd matches = [re.search(r'(\S)', line) for line in lines[1:]] left_indent = ' ' * min(match.start() for match in matches if match) # Get the available unified I/O formats for this class # Include only formats that have a reader, and drop the 'Data class' column format_table = get_formats(data_class, readwrite.capitalize()) format_table.remove_column('Data class') # Get the available formats as a table, then munge the output of pformat() # a bit and put it into the docstring. new_lines = format_table.pformat(max_lines=-1, max_width=80) table_rst_sep = re.sub('-', '=', new_lines[1]) new_lines[1] = table_rst_sep new_lines.insert(0, table_rst_sep) new_lines.append(table_rst_sep) # Check for deprecated names and include a warning at the end. if 'Deprecated' in format_table.colnames: new_lines.extend(['', 'Deprecated format names like ``aastex`` will be ' 'removed in a future version. Use the full ', 'name (e.g. ``ascii.aastex``) instead.']) new_lines = [FORMATS_TEXT, ''] + new_lines lines.extend([left_indent + line for line in new_lines]) # Depending on Python version and whether class_readwrite_func is # an instancemethod or classmethod, one of the following will work. if isinstance(class_readwrite_func, UnifiedReadWrite): class_readwrite_func.__class__.__doc__ = '\n'.join(lines) else: try: class_readwrite_func.__doc__ = '\n'.join(lines) except AttributeError: class_readwrite_func.__func__.__doc__ = '\n'.join(lines)
[docs]def register_reader(data_format, data_class, function, force=False): """ Register a reader function. Parameters ---------- data_format : str The data format identifier. This is the string that will be used to specify the data type when reading. data_class : classobj The class of the object that the reader produces. function : function The function to read in a data object. force : bool, optional Whether to override any existing function if already present. Default is ``False``. """ if not (data_format, data_class) in _readers or force: _readers[(data_format, data_class)] = function else: raise IORegistryError("Reader for format '{0}' and class '{1}' is " 'already defined' ''.format(data_format, data_class.__name__)) if data_class not in _delayed_docs_classes: _update__doc__(data_class, 'read')
def unregister_reader(data_format, data_class): """ Unregister a reader function Parameters ---------- data_format : str The data format identifier. data_class : classobj The class of the object that the reader produces. """ if (data_format, data_class) in _readers: _readers.pop((data_format, data_class)) else: raise IORegistryError("No reader defined for format '{0}' and class '{1}'" ''.format(data_format, data_class.__name__)) if data_class not in _delayed_docs_classes: _update__doc__(data_class, 'read')
[docs]def register_writer(data_format, data_class, function, force=False): """ Register a table writer function. Parameters ---------- data_format : str The data format identifier. This is the string that will be used to specify the data type when writing. data_class : classobj The class of the object that can be written. function : function The function to write out a data object. force : bool, optional Whether to override any existing function if already present. Default is ``False``. """ if not (data_format, data_class) in _writers or force: _writers[(data_format, data_class)] = function else: raise IORegistryError("Writer for format '{0}' and class '{1}' is " 'already defined' ''.format(data_format, data_class.__name__)) if data_class not in _delayed_docs_classes: _update__doc__(data_class, 'write')
def unregister_writer(data_format, data_class): """ Unregister a writer function Parameters ---------- data_format : str The data format identifier. data_class : classobj The class of the object that can be written. """ if (data_format, data_class) in _writers: _writers.pop((data_format, data_class)) else: raise IORegistryError("No writer defined for format '{0}' and class '{1}'" ''.format(data_format, data_class.__name__)) if data_class not in _delayed_docs_classes: _update__doc__(data_class, 'write')
[docs]def register_identifier(data_format, data_class, identifier, force=False): """ Associate an identifier function with a specific data type. Parameters ---------- data_format : str The data format identifier. This is the string that is used to specify the data type when reading/writing. data_class : classobj The class of the object that can be written. identifier : function A function that checks the argument specified to `read` or `write` to determine whether the input can be interpreted as a table of type ``data_format``. This function should take the following arguments: - ``origin``: A string ``"read"`` or ``"write"`` identifying whether the file is to be opened for reading or writing. - ``path``: The path to the file. - ``fileobj``: An open file object to read the file's contents, or `None` if the file could not be opened. - ``*args``: Positional arguments for the `read` or `write` function. - ``**kwargs``: Keyword arguments for the `read` or `write` function. One or both of ``path`` or ``fileobj`` may be `None`. If they are both `None`, the identifier will need to work from ``args[0]``. The function should return True if the input can be identified as being of format ``data_format``, and False otherwise. force : bool, optional Whether to override any existing function if already present. Default is ``False``. Examples -------- To set the identifier based on extensions, for formats that take a filename as a first argument, you can do for example:: >>> def my_identifier(*args, **kwargs): ... return isinstance(args[0], str) and args[0].endswith('.tbl') >>> register_identifier('ipac', Table, my_identifier) """ if not (data_format, data_class) in _identifiers or force: _identifiers[(data_format, data_class)] = identifier else: raise IORegistryError("Identifier for format '{0}' and class '{1}' is " 'already defined'.format(data_format, data_class.__name__))
def unregister_identifier(data_format, data_class): """ Unregister an identifier function Parameters ---------- data_format : str The data format identifier. data_class : classobj The class of the object that can be read/written. """ if (data_format, data_class) in _identifiers: _identifiers.pop((data_format, data_class)) else: raise IORegistryError("No identifier defined for format '{0}' and class" " '{1}'".format(data_format, data_class.__name__))
[docs]def identify_format(origin, data_class_required, path, fileobj, args, kwargs): """Loop through identifiers to see which formats match. Parameters ---------- origin : str A string ``"read`` or ``"write"`` identifying whether the file is to be opened for reading or writing. data_class_required : object The specified class for the result of `read` or the class that is to be written. path : str, other path object or None The path to the file or None. fileobj : File object or None. An open file object to read the file's contents, or ``None`` if the file could not be opened. args : sequence Positional arguments for the `read` or `write` function. Note that these must be provided as sequence. kwargs : dict-like Keyword arguments for the `read` or `write` function. Note that this parameter must be `dict`-like. Returns ------- valid_formats : list List of matching formats. """ valid_formats = [] for data_format, data_class in _identifiers: if _is_best_match(data_class_required, data_class, _identifiers): if _identifiers[(data_format, data_class)]( origin, path, fileobj, *args, **kwargs): valid_formats.append(data_format) return valid_formats
def _get_format_table_str(data_class, readwrite): format_table = get_formats(data_class, readwrite=readwrite) format_table.remove_column('Data class') format_table_str = '\n'.join(format_table.pformat(max_lines=-1)) return format_table_str
[docs]def get_reader(data_format, data_class): """Get reader for ``data_format``. Parameters ---------- data_format : str The data format identifier. This is the string that is used to specify the data type when reading/writing. data_class : classobj The class of the object that can be written. Returns ------- reader : callable The registered reader function for this format and class. """ readers = [(fmt, cls) for fmt, cls in _readers if fmt == data_format] for reader_format, reader_class in readers: if _is_best_match(data_class, reader_class, readers): return _readers[(reader_format, reader_class)] else: format_table_str = _get_format_table_str(data_class, 'Read') raise IORegistryError( "No reader defined for format '{0}' and class '{1}'.\n\nThe " "available formats are:\n\n{2}".format( data_format, data_class.__name__, format_table_str))
[docs]def get_writer(data_format, data_class): """Get writer for ``data_format``. Parameters ---------- data_format : str The data format identifier. This is the string that is used to specify the data type when reading/writing. data_class : classobj The class of the object that can be written. Returns ------- writer : callable The registered writer function for this format and class. """ writers = [(fmt, cls) for fmt, cls in _writers if fmt == data_format] for writer_format, writer_class in writers: if _is_best_match(data_class, writer_class, writers): return _writers[(writer_format, writer_class)] else: format_table_str = _get_format_table_str(data_class, 'Write') raise IORegistryError( "No writer defined for format '{0}' and class '{1}'.\n\nThe " "available formats are:\n\n{2}".format( data_format, data_class.__name__, format_table_str))
[docs]def read(cls, *args, format=None, **kwargs): """ Read in data. The arguments passed to this method depend on the format. """ ctx = None try: if format is None: path = None fileobj = None if len(args): if isinstance(args[0], PATH_TYPES): from astropy.utils.data import get_readable_fileobj # path might be a pathlib.Path object if isinstance(args[0], pathlib.Path): args = (str(args[0]),) + args[1:] path = args[0] try: ctx = get_readable_fileobj(args[0], encoding='binary') fileobj = ctx.__enter__() except OSError: raise except Exception: fileobj = None else: args = [fileobj] + list(args[1:]) elif hasattr(args[0], 'read'): path = None fileobj = args[0] format = _get_valid_format( 'read', cls, path, fileobj, args, kwargs) reader = get_reader(format, cls) data = reader(*args, **kwargs) if not isinstance(data, cls): # User has read with a subclass where only the parent class is # registered. This returns the parent class, so try coercing # to desired subclass. try: data = cls(data) except Exception: raise TypeError('could not convert reader output to {0} ' 'class.'.format(cls.__name__)) finally: if ctx is not None: ctx.__exit__(*sys.exc_info()) return data
[docs]def write(data, *args, format=None, **kwargs): """ Write out data. The arguments passed to this method depend on the format. """ if format is None: path = None fileobj = None if len(args): if isinstance(args[0], PATH_TYPES): # path might be a pathlib.Path object if isinstance(args[0], pathlib.Path): args = (str(args[0]),) + args[1:] path = args[0] fileobj = None elif hasattr(args[0], 'read'): path = None fileobj = args[0] format = _get_valid_format( 'write', data.__class__, path, fileobj, args, kwargs) writer = get_writer(format, data.__class__) writer(data, *args, **kwargs)
def _is_best_match(class1, class2, format_classes): """ Determine if class2 is the "best" match for class1 in the list of classes. It is assumed that (class2 in classes) is True. class2 is the the best match if: - ``class1`` is a subclass of ``class2`` AND - ``class2`` is the nearest ancestor of ``class1`` that is in classes (which includes the case that ``class1 is class2``) """ if issubclass(class1, class2): classes = {cls for fmt, cls in format_classes} for parent in class1.__mro__: if parent is class2: # class2 is closest registered ancestor return True if parent in classes: # class2 was superceded return False return False def _get_valid_format(mode, cls, path, fileobj, args, kwargs): """ Returns the first valid format that can be used to read/write the data in question. Mode can be either 'read' or 'write'. """ valid_formats = identify_format(mode, cls, path, fileobj, args, kwargs) if len(valid_formats) == 0: format_table_str = _get_format_table_str(cls, mode.capitalize()) raise IORegistryError("Format could not be identified based on the" " file name or contents, please provide a" " 'format' argument.\n" "The available formats are:\n" "{0}".format(format_table_str)) elif len(valid_formats) > 1: raise IORegistryError( "Format is ambiguous - options are: {0}".format( ', '.join(sorted(valid_formats, key=itemgetter(0))))) return valid_formats[0]
[docs]class UnifiedReadWrite: """Base class for the worker object used in unified read() or write() methods. This lightweight object is created for each `read()` or `write()` call via ``read`` / ``write`` descriptors on the data object class. The key driver is to allow complete format-specific documentation of available method options via a ``help()`` method, e.g. ``Table.read.help('fits')``. Subclasses must define a ``__call__`` method which is what actually gets called when the data object ``read()`` or ``write()`` method is called. For the canonical example see the `~astropy.table.Table` class implementation (in particular the ``connect.py`` module there). Parameters ---------- instance : object Descriptor calling instance or None if no instance cls : type Descriptor calling class (either owner class or instance class) method_name : str Method name, either 'read' or 'write' """ def __init__(self, instance, cls, method_name): self._instance = instance self._cls = cls self._method_name = method_name # 'read' or 'write'
[docs] def help(self, format=None, out=None): """Output help documentation for the specified unified I/O ``format``. By default the help output is printed to the console via ``pydoc.pager``. Instead one can supplied a file handle object as ``out`` and the output will be written to that handle. Parameters ---------- format : str Unified I/O format name, e.g. 'fits' or 'ascii.ecsv' out : None or file handle object Output destination (default is stdout via a pager) """ cls = self._cls method_name = self._method_name # Get reader or writer function get_func = get_reader if method_name == 'read' else get_writer try: if format: read_write_func = get_func(format, cls) except IORegistryError as err: reader_doc = 'ERROR: ' + str(err) else: if format: # Format-specific header = ("{}.{}(format='{}') documentation\n" .format(cls.__name__, method_name, format)) doc = read_write_func.__doc__ else: # General docs header = ('{}.{} general documentation\n' .format(cls.__name__, method_name)) doc = getattr(cls, method_name).__doc__ reader_doc = re.sub('.', '=', header) reader_doc += header reader_doc += re.sub('.', '=', header) reader_doc += os.linesep reader_doc += inspect.cleandoc(doc) if out is None: import pydoc pydoc.pager(reader_doc) else: out.write(reader_doc)
[docs] def list_formats(self, out=None): """Print a list of available formats to console (or ``out`` filehandle) out : None or file handle object Output destination (default is stdout via a pager) """ tbl = get_formats(self._cls, self._method_name.capitalize()) del tbl['Data class'] if out is None: tbl.pprint(max_lines=-1, max_width=-1) else: out.write('\n'.join(tbl.pformat(max_lines=-1, max_width=-1))) return out
[docs]class UnifiedReadWriteMethod: """Descriptor class for creating read() and write() methods in unified I/O. The canonical example is in the ``Table`` class, where the ``connect.py`` module creates subclasses of the ``UnifiedReadWrite`` class. These have custom ``__call__`` methods that do the setup work related to calling the registry read() or write() functions. With this, the ``Table`` class defines read and write methods as follows:: read = UnifiedReadWriteMethod(TableRead) write = UnifiedReadWriteMethod(TableWrite) Parameters ---------- func : `~astropy.io.registry.UnifiedReadWrite` subclass Class that defines read or write functionality """ def __init__(self, func): self.func = func def __get__(self, instance, owner_cls): return self.func(instance, owner_cls)