import cgi
import functools
import io
import urllib.request
import urllib.parse
from . import errors
class FileCache:
def __init__(self, dir_path, filename_pattern):
self.dir_path = dir_path
self.pattern = filename_pattern
def historical(self, date, base):
path = self.dir_path / self.pattern.format(date=date.isoformat(), base=base)
try:
return path.open()
except FileNotFoundError as error:
raise errors.LoaderNoDataError(path) from error
class OXRAPIRequest:
DEFAULT_API_ROOT = 'https://openexchangerates.org/api/'
DEFAULT_RESPONSE_ENCODING = 'utf-8'
def __init__(self, app_id, api_root=None, *, open_func=urllib.request.urlopen):
self.api_root = self.DEFAULT_API_ROOT if api_root is None else api_root
self.app_id = app_id
self.open_url = open_func
def _get_response_encoding(self, response, default=None):
try:
content_type = response.getheader('Content-Type', 'application/json')
_, ct_options = cgi.parse_header(content_type)
encoding = ct_options['charset']
except (KeyError, ValueError):
encoding = self.DEFAULT_RESPONSE_ENCODING if default is None else default
return encoding
def _raw_query(self, url_tail, params):
url = '{}?{}'.format(
urllib.parse.urljoin(self.api_root, url_tail),
urllib.parse.urlencode(params),
)
response = self.open_url(url)
status_code = response.status
encoding = self._get_response_encoding(response)
response_body = io.TextIOWrapper(response, encoding=encoding)
if 200 <= status_code < 203:
return response_body
elif status_code == 404 or status_code == 410:
exc_class = errors.LoaderNoDataError
elif status_code >= 500:
exc_class = errors.LoaderSourceError
else:
exc_class = errors.LoaderBadRequestError
with response_body:
raise exc_class(url, response_body.read(64 * 1024))
def historical(self, date, base):
return self._raw_query(
'historical/{}.json'.format(date.isoformat()),
{'app_id': self.app_id, 'base': base},
)
class LoaderChain:
def __init__(self):
self.loaders = []
def add_loader(self, loader):
self.loaders.append(loader)
def _wrap_load_method(orig_func):
@functools.wraps(orig_func)
def load_wrapper(self, *args, **kwargs):
self.used_loader = None
error = None
for loader in self.loaders:
try:
response = getattr(loader, orig_func.__name__)(*args, **kwargs)
except errors.LoaderError as this_error:
error = this_error
else:
self.used_loader = loader
return response
else:
raise errors.NoLoadersError() if error is None else error
return load_wrapper
@_wrap_load_method
def historical(self, date, base):
pass