Changeset - 9e33b2795ca5
[Not reviewed]
0 2 0
Brett Smith - 3 years ago 2021-03-10 15:37:21
brettcsmith@brettcsmith.org
config: Add RTCredentials.idstr() method.

Want to reuse this code for a query-report cache key.
2 files changed with 25 insertions and 5 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/config.py
Show inline comments
...
 
@@ -44,48 +44,60 @@ class RTCredentials(NamedTuple):
 
                values[key] = os.environ[env_key]
 
            except KeyError:
 
                pass
 
        return cls(**values)
 

	
 
    @classmethod
 
    def from_rtrc(cls) -> 'RTCredentials':
 
        values = dict(cls._field_defaults)
 
        rtrc_path = Path.home() / '.rtrc'
 
        try:
 
            with rtrc_path.open() as rtrc_file:
 
                for line in rtrc_file:
 
                    try:
 
                        key, value = line.split(None, 1)
 
                    except ValueError:
 
                        pass
 
                    else:
 
                        if key in values:
 
                            values[key] = value.rstrip('\n')
 
        except OSError:
 
            return cls()
 
        else:
 
            return cls(**values)
 

	
 
    def idstr(self) -> str:
 
        """Return a string unique to these credentials
 

	
 
        This returns a string that incorporates the server URL and user.
 
        The string will be unique across different credentials.
 
        It's suitable for use as a cache key or filename.
 
        """
 
        return '{}@{}'.format(
 
            self.user or '',
 
            urlparse.quote(self.server or '', ''),
 
        )
 

	
 

	
 
class Config:
 
    _ENVIRON_DEFAULT_PATHS = {
 
        'XDG_CACHE_HOME': Path('.cache'),
 
        'XDG_CONFIG_HOME': Path('.config'),
 
    }
 

	
 
    def __init__(self) -> None:
 
        self.file_config = configparser.ConfigParser()
 
        self.file_config.read_string("[Beancount]\n")
 

	
 
    def load_file(self, config_path: Optional[Path]=None) -> None:
 
        if config_path is None:
 
            config_path = self.config_file_path()
 
        with config_path.open() as config_file:
 
            self.file_config.read_file(config_file)
 

	
 
    def load_string(self, config_str: str) -> None:
 
        self.file_config.read_string(config_str)
 

	
 
    def _abspath(self, source: Mapping[str, str], key: str) -> Optional[Path]:
 
        try:
 
            retval = Path(source[key])
 
        except (KeyError, ValueError):
...
 
@@ -188,45 +200,41 @@ class Config:
 
        if credentials.server is None:
 
            return None
 
        urlparts = urlparse.urlparse(credentials.server)
 
        rest_path = urlparts.path.rstrip('/') + '/REST/1.0/'
 
        url = urlparse.urlunparse(urlparts._replace(path=rest_path))
 
        if credentials.auth == 'basic':
 
            auth = requests.auth.HTTPBasicAuth(credentials.user, credentials.passwd)
 
            retval = client(url, http_auth=auth)
 
        else:
 
            retval = client(url, credentials.user, credentials.passwd)
 
        if retval.login():
 
            return retval
 
        else:
 
            return None
 

	
 
    @functools.lru_cache(4)
 
    def _rt_wrapper(self, credentials: RTCredentials, client: Type[rt.Rt]) -> Optional[rtutil.RT]:
 
        wrapper_client = self.rt_client(credentials, client)
 
        if wrapper_client is None:
 
            return None
 
        cache_dir_path = self.cache_dir_path()
 
        if cache_dir_path is None:
 
            cache_db = None
 
        else:
 
            cache_name = '{}@{}.sqlite3'.format(
 
                credentials.user,
 
                urlparse.quote(str(credentials.server), ''),
 
            )
 
            cache_path = cache_dir_path / cache_name
 
            cache_path = cache_dir_path / f'{credentials.idstr()}.sqlite3'
 
            try:
 
                cache_path.touch(0o600)
 
            except OSError:
 
                # RTLinkCache.setup() will handle the problem.
 
                pass
 
            cache_db = rtutil.RTLinkCache.setup(cache_path)
 
        return rtutil.RT(wrapper_client, cache_db)
 

	
 
    def rt_wrapper(self,
 
                  credentials: RTCredentials=None,
 
                  client: Type[rt.Rt]=rt.Rt,
 
    ) -> Optional[rtutil.RT]:
 
        if credentials is None:
 
            credentials = self.rt_credentials()
 
        # type ignore for <https://github.com/python/typeshed/issues/4638>
 
        return self._rt_wrapper(credentials, client)  # type:ignore[arg-type]
tests/test_config.py
Show inline comments
 
"""Test Config class"""
 
# Copyright © 2020  Brett Smith
 
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import contextlib
 
import decimal
 
import itertools
 
import operator
 
import os
 
import re
 

	
 
import git
 

	
 
from pathlib import Path
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import config as config_mod
 

	
 
RT_AUTH_METHODS = frozenset(['basic', 'gssapi', 'rt'])
 

	
 
RT_ENV_KEYS = (
 
    'RTSERVER',
 
    'RTUSER',
 
    'RTPASSWD',
 
    'RTAUTH',
 
)
 

	
 
RT_ENV_CREDS = (
...
 
@@ -119,48 +120,59 @@ def test_rt_credentials_from_environment(rt_environ):
 
    with update_environ(**rt_environ):
 
        config = config_mod.Config()
 
        rt_credentials = config.rt_credentials()
 
    assert rt_credentials == RT_ENV_CREDS
 

	
 
@pytest.mark.parametrize('index,drop_key', enumerate(RT_ENV_KEYS))
 
def test_rt_credentials_from_file_and_environment_mixed(rt_environ, index, drop_key):
 
    del rt_environ[drop_key]
 
    with update_environ(**rt_environ):
 
        config = config_mod.Config()
 
        rt_credentials = config.rt_credentials()
 
    expected = list(RT_ENV_CREDS)
 
    expected[index] = RT_FILE_CREDS[index]
 
    assert rt_credentials == tuple(expected)
 

	
 
def test_rt_credentials_from_all_sources_mixed(tmp_path):
 
    server = 'https://example.org/mixedrt'
 
    with (tmp_path / '.rtrc').open('w') as rtrc_file:
 
        print('user basemix', 'passwd mixed up', file=rtrc_file, sep='\n')
 
    with update_environ(HOME=tmp_path, RTSERVER=server, RTUSER='mixedup'):
 
        config = config_mod.Config()
 
        rt_credentials = config.rt_credentials()
 
    assert rt_credentials == (server, 'mixedup', 'mixed up', 'rt')
 

	
 
def test_rt_credentials_idstr():
 
    actual = {
 
        config_mod.RTCredentials(server, user).idstr()
 
        for server, user in itertools.product(
 
                [None, 'https://example.org/rt'],
 
                [None, 'example'],
 
        )}
 
    assert len(actual) == 4
 
    for idstr in actual:
 
        assert '/' not in idstr
 

	
 
def check_rt_client_url(credentials, client):
 
    pattern = '^{}/?$'.format(re.escape(credentials[0].rstrip('/') + '/REST/1.0'))
 
    assert re.match(pattern, client.url)
 

	
 
@pytest.mark.parametrize('authmethod', RT_AUTH_METHODS)
 
def test_rt_client(authmethod):
 
    rt_credentials = RT_GENERIC_CREDS._replace(auth=authmethod)
 
    config = config_mod.Config()
 
    rt_client = config.rt_client(rt_credentials, testutil.RTClient)
 
    check_rt_client_url(RT_GENERIC_CREDS, rt_client)
 
    assert rt_client.auth_method == ('HTTPBasicAuth' if authmethod == 'basic' else 'login')
 
    assert rt_client.last_login == (
 
        RT_GENERIC_CREDS.user,
 
        RT_GENERIC_CREDS.passwd,
 
        True,
 
    )
 

	
 
def test_default_rt_client(rt_environ):
 
    with update_environ(**rt_environ):
 
        config = config_mod.Config()
 
        rt_client = config.rt_client(client=testutil.RTClient)
 
    check_rt_client_url(RT_ENV_CREDS, rt_client)
 
    assert rt_client.last_login[:-1] == RT_ENV_CREDS[1:3]
 
    assert rt_client.last_login[-1]
0 comments (0 inline, 0 general)