import cgi import functools import io import urllib.request import urllib.parse from . import errors class FileCache: def __init__(self, dir_path, filename_pattern): self.dir_path = dir_path self.pattern = filename_pattern def historical(self, date, base): path = self.dir_path / self.pattern.format(date=date.isoformat(), base=base) try: return path.open() except FileNotFoundError as error: raise errors.LoaderNoDataError(path) from error class OXRAPIRequest: DEFAULT_API_ROOT = 'https://openexchangerates.org/api/' DEFAULT_RESPONSE_ENCODING = 'utf-8' def __init__(self, app_id, api_root=None, *, open_func=urllib.request.urlopen): self.api_root = self.DEFAULT_API_ROOT if api_root is None else api_root self.app_id = app_id self.open_url = open_func def _get_response_encoding(self, response, default=None): try: content_type = response.getheader('Content-Type', 'application/json') _, ct_options = cgi.parse_header(content_type) encoding = ct_options['charset'] except (KeyError, ValueError): encoding = self.DEFAULT_RESPONSE_ENCODING if default is None else default return encoding def _raw_query(self, url_tail, params): url = '{}?{}'.format( urllib.parse.urljoin(self.api_root, url_tail), urllib.parse.urlencode(params), ) response = self.open_url(url) status_code = response.status encoding = self._get_response_encoding(response) response_body = io.TextIOWrapper(response, encoding=encoding) if 200 <= status_code < 203: return response_body elif status_code == 404 or status_code == 410: exc_class = errors.LoaderNoDataError elif status_code >= 500: exc_class = errors.LoaderSourceError else: exc_class = errors.LoaderBadRequestError with response_body: raise exc_class(url, response_body.read(64 * 1024)) def historical(self, date, base): return self._raw_query( 'historical/{}.json'.format(date.isoformat()), {'app_id': self.app_id, 'base': base}, ) class LoaderChain: def __init__(self): self.loaders = [] def add_loader(self, loader): self.loaders.append(loader) def _wrap_load_method(orig_func): @functools.wraps(orig_func) def load_wrapper(self, *args, **kwargs): self.used_loader = None error = None for loader in self.loaders: try: response = getattr(loader, orig_func.__name__)(*args, **kwargs) except errors.LoaderError as this_error: error = this_error else: self.used_loader = loader return response else: raise errors.NoLoadersError() if error is None else error return load_wrapper @_wrap_load_method def historical(self, date, base): pass