diff --git a/oxrlib/loaders.py b/oxrlib/loaders.py index 3e4d22b10d7f3b34c24399c30ba86356632d3c23..e839dddcbb9c18748fc020fdae6f2cd04a373b06 100644 --- a/oxrlib/loaders.py +++ b/oxrlib/loaders.py @@ -1,3 +1,8 @@ +import cgi +import io +import urllib.request +import urllib.parse + class LoaderError(Exception): pass @@ -25,3 +30,48 @@ class FileCache: return path.open() except FileNotFoundError as error: raise LoaderNoDataError(path) from error + + +class OXRAPIRequest: + DEFAULT_API_ROOT = 'https://openexchangerates.org/api/' + DEFAULT_RESPONSE_ENCODING = 'utf-8' + + def __init__(self, app_id, api_root=None, *, open_func=urllib.request.urlopen): + self.api_root = self.DEFAULT_API_ROOT if api_root is None else api_root + self.app_id = app_id + self.open_url = open_func + + def _get_response_encoding(self, response, default=None): + try: + content_type = response.getheader('Content-Type', 'application/json') + _, ct_options = cgi.parse_header(content_type) + encoding = ct_options['charset'] + except (KeyError, ValueError): + encoding = self.DEFAULT_RESPONSE_ENCODING if default is None else default + return encoding + + def _raw_query(self, url_tail, params): + url = '{}?{}'.format( + urllib.parse.urljoin(self.api_root, url_tail), + urllib.parse.urlencode(params), + ) + response = self.open_url(url) + status_code = response.status + encoding = self._get_response_encoding(response) + response_body = io.TextIOWrapper(response, encoding=encoding) + if 200 <= status_code < 203: + return response_body + elif status_code == 404 or status_code == 410: + exc_class = LoaderNoDataError + elif status_code >= 500: + exc_class = LoaderSourceError + else: + exc_class = LoaderBadRequestError + with response_body: + raise exc_class(url, response_body.read(64 * 1024)) + + def historical(self, date, base): + return self._raw_query( + 'historical/{}.json'.format(date.isoformat()), + {'app_id': self.app_id, 'base': base}, + ) diff --git a/tests/test_OXRAPIRequest.py b/tests/test_OXRAPIRequest.py new file mode 100644 index 0000000000000000000000000000000000000000..147af8a11f327b3e9e038117397ee30feb0bea48 --- /dev/null +++ b/tests/test_OXRAPIRequest.py @@ -0,0 +1,102 @@ +import datetime +import http.client +import io +import json +import os +import random +import string +import urllib.parse + +import pytest +import oxrlib.loaders + +APPID_CHARS = string.ascii_letters + string.digits +RANDOM_APPID = ''.join(random.choice(APPID_CHARS) for _ in range(32)) +API_ROOT = 'http://[100::]/oxrlibtest/' +API_ROOT_PATH = urllib.parse.urlsplit(API_ROOT).path + +class FakeResponse: + debuglevel = 0 + version = 11 + + def __init__(self, status_code, reason=None, body=None, headers=None, encoding='utf-16'): + if reason is None: + reason = http.client.responses[status_code] + if body is None: + body = json.dumps(reason) + if headers is None: + headers = { + 'Content-Type': 'application/json; charset={}'.format(encoding), + 'Content-Length': str(len(body)), + } + self.status = status_code + self.reason = reason + read_fd, write_fd = os.pipe() + with open(write_fd, 'w', encoding=encoding) as body_file: + print('\ufeff', body, sep='', file=body_file) + self.fp = open(read_fd, 'rb') + self.headers = headers + + def __getattr__(self, name): + return getattr(self.fp, name) + + def getheader(self, name, default=None): + return self.headers.get(name, default) + + def getheaders(self): + return list(self.headers.itervalues()) + + +class FakeOpener: + def __init__(self, response): + self.response = response + self.call_list = [] + + def __call__(self, url, **kwargs): + self.call_list.append((url, kwargs)) + return self.response + + def call_count(self): + return len(self.call_list) + + def last_called_url(self): + return self.call_list[-1][0] + + +@pytest.fixture +def api_client(): + return oxrlib.loaders.OXRAPIRequest(RANDOM_APPID, API_ROOT) + +@pytest.fixture +def any_date(): + return datetime.date.today() - datetime.timedelta(days=730 - random.randint(0, 365)) + +@pytest.mark.parametrize('base', ['USD', 'JPY']) +def test_success(api_client, any_date, base): + body = "Good Test" + opener = FakeOpener(FakeResponse(200, body)) + api_client.open_url = opener + response = api_client.historical(any_date, base) + assert opener.call_count() == 1 + urlparts = urllib.parse.urlsplit(opener.last_called_url()) + assert urlparts.path == '{}historical/{}.json'.format(API_ROOT_PATH, any_date.isoformat()) + params = urllib.parse.parse_qs(urlparts.query) + assert params['base'] == [base] + assert response.read() == (json.dumps(body) + "\n") + +@pytest.mark.parametrize('status_code,expect_exctype', [ + (400, oxrlib.loaders.LoaderBadRequestError), + (403, oxrlib.loaders.LoaderBadRequestError), + (404, oxrlib.loaders.LoaderNoDataError), + (410, oxrlib.loaders.LoaderNoDataError), + (500, oxrlib.loaders.LoaderSourceError), +]) +def test_failure(api_client, any_date, status_code, expect_exctype): + opener = FakeOpener(FakeResponse(status_code)) + api_client.open_url = opener + try: + response = api_client.historical(any_date, 'USD') + except expect_exctype: + pass + else: + assert False, "got response: " + response.read()