Files @ 8dede9d1398c
Branch filter:

Location: NPO-Accounting/oxrlib/oxrlib/loaders.py

Brett Smith
historical: Swap Ledger and Beancount formatters in the class hierarchy.

This makes sense for a couple of reasons:

* The Beancount formatter has "less features" than the Ledger formatter, so
this is a more "logical" organization of the hierarchy anyway. Note how
this eliminates the need for the BeancountFormatter.__init__ override to
turn off Ledger features.

* Any future work will probably be focused on the Beancount formatter, so
this reduces the amount of code you have to understand and hold in your
head to do that.
import cgi
import functools
import io
import urllib.request
import urllib.parse

from . import cache, errors

class ReadCacheFile(cache.CacheFileBase):
    ERRORS_MAP = [
        (FileNotFoundError, errors.LoaderNoDataError),
        (OSError, errors.LoaderSourceError),
    ]


class FileCache(cache.CacheBase):
    CacheFile = ReadCacheFile
    ConfigurationError = errors.CacheLoaderConfigurationError

    def open(self, path):
        return self.CacheFile(path)

    def is_cache(self):
        return True


class OXRAPIRequest:
    DEFAULT_API_ROOT = 'https://openexchangerates.org/api/'
    DEFAULT_RESPONSE_ENCODING = 'utf-8'

    def __init__(self, app_id, api_root=None, *, open_func=urllib.request.urlopen):
        self.api_root = self.DEFAULT_API_ROOT if api_root is None else api_root
        self.app_id = app_id
        self.open_url = open_func

    def is_cache(self):
        return False

    def _get_response_encoding(self, response, default=None):
        try:
            content_type = response.getheader('Content-Type', 'application/json')
            _, ct_options = cgi.parse_header(content_type)
            encoding = ct_options['charset']
        except (KeyError, ValueError):
            encoding = self.DEFAULT_RESPONSE_ENCODING if default is None else default
        return encoding

    def _raw_query(self, url_tail, params):
        url = '{}?{}'.format(
            urllib.parse.urljoin(self.api_root, url_tail),
            urllib.parse.urlencode(params),
        )
        response = self.open_url(url)
        status_code = response.status
        encoding = self._get_response_encoding(response)
        response_body = io.TextIOWrapper(response, encoding=encoding)
        if 200 <= status_code < 203:
            return response_body
        elif status_code == 404 or status_code == 410:
            exc_class = errors.LoaderNoDataError
        elif status_code >= 500:
            exc_class = errors.LoaderSourceError
        else:
            exc_class = errors.LoaderBadRequestError
        with response_body:
            raise exc_class(url, response_body.read(64 * 1024))

    def historical(self, date, base):
        return self._raw_query(
            'historical/{}.json'.format(date.isoformat()),
            {'app_id': self.app_id, 'base': base},
        )


class LoaderChain:
    def __init__(self):
        self.loaders = []
        self.can_cache = False

    def add_loader(self, loader):
        self.loaders.append(loader)
        self.can_cache = self.can_cache or loader.is_cache()

    def _wrap_load_method(orig_func):
        @functools.wraps(orig_func)
        def load_wrapper(self, *args, **kwargs):
            self.used_loader = None
            error = None
            for loader in self.loaders:
                try:
                    response = getattr(loader, orig_func.__name__)(*args, **kwargs)
                except errors.LoaderError as this_error:
                    error = this_error
                else:
                    self.used_loader = loader
                    return response
            else:
                raise errors.NoLoadersError() if error is None else error
        return load_wrapper

    @_wrap_load_method
    def historical(self, date, base):
        pass

    def should_cache(self):
        return self.can_cache and self.used_loader and not self.used_loader.is_cache()