Changeset - 9e33b2795ca5
[Not reviewed]
0 2 0
Brett Smith - 3 years ago 2021-03-10 15:37:21
brettcsmith@brettcsmith.org
config: Add RTCredentials.idstr() method.

Want to reuse this code for a query-report cache key.
2 files changed with 25 insertions and 5 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/config.py
Show inline comments
...
 
@@ -56,24 +56,36 @@ class RTCredentials(NamedTuple):
 
                    try:
 
                        key, value = line.split(None, 1)
 
                    except ValueError:
 
                        pass
 
                    else:
 
                        if key in values:
 
                            values[key] = value.rstrip('\n')
 
        except OSError:
 
            return cls()
 
        else:
 
            return cls(**values)
 

	
 
    def idstr(self) -> str:
 
        """Return a string unique to these credentials
 

	
 
        This returns a string that incorporates the server URL and user.
 
        The string will be unique across different credentials.
 
        It's suitable for use as a cache key or filename.
 
        """
 
        return '{}@{}'.format(
 
            self.user or '',
 
            urlparse.quote(self.server or '', ''),
 
        )
 

	
 

	
 
class Config:
 
    _ENVIRON_DEFAULT_PATHS = {
 
        'XDG_CACHE_HOME': Path('.cache'),
 
        'XDG_CONFIG_HOME': Path('.config'),
 
    }
 

	
 
    def __init__(self) -> None:
 
        self.file_config = configparser.ConfigParser()
 
        self.file_config.read_string("[Beancount]\n")
 

	
 
    def load_file(self, config_path: Optional[Path]=None) -> None:
...
 
@@ -200,29 +212,25 @@ class Config:
 
        else:
 
            return None
 

	
 
    @functools.lru_cache(4)
 
    def _rt_wrapper(self, credentials: RTCredentials, client: Type[rt.Rt]) -> Optional[rtutil.RT]:
 
        wrapper_client = self.rt_client(credentials, client)
 
        if wrapper_client is None:
 
            return None
 
        cache_dir_path = self.cache_dir_path()
 
        if cache_dir_path is None:
 
            cache_db = None
 
        else:
 
            cache_name = '{}@{}.sqlite3'.format(
 
                credentials.user,
 
                urlparse.quote(str(credentials.server), ''),
 
            )
 
            cache_path = cache_dir_path / cache_name
 
            cache_path = cache_dir_path / f'{credentials.idstr()}.sqlite3'
 
            try:
 
                cache_path.touch(0o600)
 
            except OSError:
 
                # RTLinkCache.setup() will handle the problem.
 
                pass
 
            cache_db = rtutil.RTLinkCache.setup(cache_path)
 
        return rtutil.RT(wrapper_client, cache_db)
 

	
 
    def rt_wrapper(self,
 
                  credentials: RTCredentials=None,
 
                  client: Type[rt.Rt]=rt.Rt,
 
    ) -> Optional[rtutil.RT]:
tests/test_config.py
Show inline comments
 
"""Test Config class"""
 
# Copyright © 2020  Brett Smith
 
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import contextlib
 
import decimal
 
import itertools
 
import operator
 
import os
 
import re
 

	
 
import git
 

	
 
from pathlib import Path
 

	
 
import pytest
 

	
 
from . import testutil
 

	
...
 
@@ -131,24 +132,35 @@ def test_rt_credentials_from_file_and_environment_mixed(rt_environ, index, drop_
 
    expected[index] = RT_FILE_CREDS[index]
 
    assert rt_credentials == tuple(expected)
 

	
 
def test_rt_credentials_from_all_sources_mixed(tmp_path):
 
    server = 'https://example.org/mixedrt'
 
    with (tmp_path / '.rtrc').open('w') as rtrc_file:
 
        print('user basemix', 'passwd mixed up', file=rtrc_file, sep='\n')
 
    with update_environ(HOME=tmp_path, RTSERVER=server, RTUSER='mixedup'):
 
        config = config_mod.Config()
 
        rt_credentials = config.rt_credentials()
 
    assert rt_credentials == (server, 'mixedup', 'mixed up', 'rt')
 

	
 
def test_rt_credentials_idstr():
 
    actual = {
 
        config_mod.RTCredentials(server, user).idstr()
 
        for server, user in itertools.product(
 
                [None, 'https://example.org/rt'],
 
                [None, 'example'],
 
        )}
 
    assert len(actual) == 4
 
    for idstr in actual:
 
        assert '/' not in idstr
 

	
 
def check_rt_client_url(credentials, client):
 
    pattern = '^{}/?$'.format(re.escape(credentials[0].rstrip('/') + '/REST/1.0'))
 
    assert re.match(pattern, client.url)
 

	
 
@pytest.mark.parametrize('authmethod', RT_AUTH_METHODS)
 
def test_rt_client(authmethod):
 
    rt_credentials = RT_GENERIC_CREDS._replace(auth=authmethod)
 
    config = config_mod.Config()
 
    rt_client = config.rt_client(rt_credentials, testutil.RTClient)
 
    check_rt_client_url(RT_GENERIC_CREDS, rt_client)
 
    assert rt_client.auth_method == ('HTTPBasicAuth' if authmethod == 'basic' else 'login')
 
    assert rt_client.last_login == (
0 comments (0 inline, 0 general)