Files @ ae3e4617d31e
Branch filter:

Location: NPO-Accounting/oxrlib/oxrlib/loaders.py

Brett Smith
historical: Always format rates with the same precision.

When we format a rate as a price, we don't know how much precision
is "enough" to do the conversion, because we don't know what's
being converted to. As a result, we may (=will almost certainly)
end up formatting the rate with different precision on the cost
date vs. the price date, and that causes Beancount/Ledger to fail
to make the connection between them.

Using a constant of 6 is enough to make the current test for
"enough" precision pass, so just do that for now. This might need
further refinement in the future.
import cgi
import functools
import io
import urllib.request
import urllib.parse

from . import cache, errors

class ReadCacheFile(cache.CacheFileBase):
    ERRORS_MAP = [
        (FileNotFoundError, errors.LoaderNoDataError),
        (OSError, errors.LoaderSourceError),
    ]


class FileCache(cache.CacheBase):
    CacheFile = ReadCacheFile
    ConfigurationError = errors.CacheLoaderConfigurationError

    def open(self, path):
        return self.CacheFile(path)

    def is_cache(self):
        return True


class OXRAPIRequest:
    DEFAULT_API_ROOT = 'https://openexchangerates.org/api/'
    DEFAULT_RESPONSE_ENCODING = 'utf-8'

    def __init__(self, app_id, api_root=None, *, open_func=urllib.request.urlopen):
        self.api_root = self.DEFAULT_API_ROOT if api_root is None else api_root
        self.app_id = app_id
        self.open_url = open_func

    def is_cache(self):
        return False

    def _get_response_encoding(self, response, default=None):
        try:
            content_type = response.getheader('Content-Type', 'application/json')
            _, ct_options = cgi.parse_header(content_type)
            encoding = ct_options['charset']
        except (KeyError, ValueError):
            encoding = self.DEFAULT_RESPONSE_ENCODING if default is None else default
        return encoding

    def _raw_query(self, url_tail, params):
        url = '{}?{}'.format(
            urllib.parse.urljoin(self.api_root, url_tail),
            urllib.parse.urlencode(params),
        )
        response = self.open_url(url)
        status_code = response.status
        encoding = self._get_response_encoding(response)
        response_body = io.TextIOWrapper(response, encoding=encoding)
        if 200 <= status_code < 203:
            return response_body
        elif status_code == 404 or status_code == 410:
            exc_class = errors.LoaderNoDataError
        elif status_code >= 500:
            exc_class = errors.LoaderSourceError
        else:
            exc_class = errors.LoaderBadRequestError
        with response_body:
            raise exc_class(url, response_body.read(64 * 1024))

    def historical(self, date, base):
        return self._raw_query(
            'historical/{}.json'.format(date.isoformat()),
            {'app_id': self.app_id, 'base': base},
        )


class LoaderChain:
    def __init__(self):
        self.loaders = []
        self.can_cache = False

    def add_loader(self, loader):
        self.loaders.append(loader)
        self.can_cache = self.can_cache or loader.is_cache()

    def _wrap_load_method(orig_func):
        @functools.wraps(orig_func)
        def load_wrapper(self, *args, **kwargs):
            self.used_loader = None
            error = None
            for loader in self.loaders:
                try:
                    response = getattr(loader, orig_func.__name__)(*args, **kwargs)
                except errors.LoaderError as this_error:
                    error = this_error
                else:
                    self.used_loader = loader
                    return response
            else:
                raise errors.NoLoadersError() if error is None else error
        return load_wrapper

    @_wrap_load_method
    def historical(self, date, base):
        pass

    def should_cache(self):
        return self.can_cache and self.used_loader and not self.used_loader.is_cache()