import cgi import io import urllib.request import urllib.parse class LoaderError(Exception): pass class LoaderNoDataError(LoaderError): pass class LoaderBadRequestError(LoaderError): pass class LoaderSourceError(LoaderError): pass class FileCache: def __init__(self, dir_path, filename_pattern): self.dir_path = dir_path self.pattern = filename_pattern def historical(self, date, base): path = self.dir_path / self.pattern.format(date=date.isoformat(), base=base) try: return path.open() except FileNotFoundError as error: raise LoaderNoDataError(path) from error class OXRAPIRequest: DEFAULT_API_ROOT = 'https://openexchangerates.org/api/' DEFAULT_RESPONSE_ENCODING = 'utf-8' def __init__(self, app_id, api_root=None, *, open_func=urllib.request.urlopen): self.api_root = self.DEFAULT_API_ROOT if api_root is None else api_root self.app_id = app_id self.open_url = open_func def _get_response_encoding(self, response, default=None): try: content_type = response.getheader('Content-Type', 'application/json') _, ct_options = cgi.parse_header(content_type) encoding = ct_options['charset'] except (KeyError, ValueError): encoding = self.DEFAULT_RESPONSE_ENCODING if default is None else default return encoding def _raw_query(self, url_tail, params): url = '{}?{}'.format( urllib.parse.urljoin(self.api_root, url_tail), urllib.parse.urlencode(params), ) response = self.open_url(url) status_code = response.status encoding = self._get_response_encoding(response) response_body = io.TextIOWrapper(response, encoding=encoding) if 200 <= status_code < 203: return response_body elif status_code == 404 or status_code == 410: exc_class = LoaderNoDataError elif status_code >= 500: exc_class = LoaderSourceError else: exc_class = LoaderBadRequestError with response_body: raise exc_class(url, response_body.read(64 * 1024)) def historical(self, date, base): return self._raw_query( 'historical/{}.json'.format(date.isoformat()), {'app_id': self.app_id, 'base': base}, )