Changeset - f09f029fc4cb
[Not reviewed]
0 4 0
Brett Smith - 4 years ago 2020-03-24 13:08:08
brettcsmith@brettcsmith.org
config: Add Config.rt_client method.
4 files changed with 109 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/config.py
Show inline comments
...
 
@@ -6,29 +6,34 @@
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import os
 
import urllib.parse as urlparse
 

	
 
import requests.auth
 
import rt
 

	
 
from pathlib import Path
 
from typing import (
 
    NamedTuple,
 
    Optional,
 
    Type,
 
)
 

	
 
class RTCredentials(NamedTuple):
 
    server: Optional[str] = None
 
    user: Optional[str] = None
 
    passwd: Optional[str] = None
 
    auth: Optional[str] = None
 

	
 
    @classmethod
 
    def from_env(cls) -> 'RTCredentials':
 
        values = dict(cls._field_defaults)
 
        for key in values:
...
 
@@ -64,12 +69,33 @@ class Config:
 
        try:
 
            return Path(os.environ['CONSERVANCY_REPOSITORY'])
 
        except (KeyError, ValueError):
 
            return None
 

	
 
    def rt_credentials(self) -> RTCredentials:
 
        all_creds = zip(
 
            RTCredentials.from_env(),
 
            RTCredentials.from_rtrc(),
 
            RTCredentials(auth='rt'),
 
        )
 
        return RTCredentials._make(v0 or v1 or v2 for v0, v1, v2 in all_creds)
 

	
 
    def rt_client(self,
 
                  credentials: RTCredentials=None,
 
                  client: Type[rt.Rt]=rt.Rt,
 
    ) -> Optional[rt.Rt]:
 
        if credentials is None:
 
            credentials = self.rt_credentials()
 
        if credentials.server is None:
 
            return None
 
        urlparts = urlparse.urlparse(credentials.server)
 
        rest_path = urlparts.path.rstrip('/') + '/REST/1.0/'
 
        url = urlparse.urlunparse(urlparts._replace(path=rest_path))
 
        if credentials.auth == 'basic':
 
            auth = requests.auth.HTTPBasicAuth(credentials.user, credentials.passwd)
 
            retval = client(url, http_auth=auth)
 
        else:
 
            retval = client(url, credentials.user, credentials.passwd)
 
        if retval.login():
 
            return retval
 
        else:
 
            return None
setup.py
Show inline comments
...
 
@@ -3,24 +3,25 @@
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='0.1',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'beancount>=2.2',
 
        'rt>=2.0',
 
    ],
 
    setup_requires=[
 
        'pytest-mypy',
 
        'pytest-runner',
 
    ],
 
    tests_require=[
 
        'mypy>=0.770',
 
        'pytest',
 
    ],
 

	
 
    packages=['conservancy_beancount'],
 
    entry_points={},
tests/test_config.py
Show inline comments
...
 
@@ -7,52 +7,62 @@
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import contextlib
 
import os
 
import re
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import config as config_mod
 

	
 
RT_AUTH_METHODS = frozenset(['basic', 'gssapi', 'rt'])
 

	
 
RT_ENV_KEYS = (
 
    'RTSERVER',
 
    'RTUSER',
 
    'RTPASSWD',
 
    'RTAUTH',
 
)
 

	
 
RT_ENV_CREDS = (
 
    'https://example.org/envrt',
 
    'envuser',
 
    'env  password',
 
    'gssapi',
 
)
 

	
 
RT_FILE_CREDS = (
 
    'https://example.org/filert',
 
    'fileuser',
 
    'file  password',
 
    'basic',
 
)
 

	
 
RT_GENERIC_CREDS = config_mod.RTCredentials(
 
    'https://example.org/genericrt',
 
    'genericuser',
 
    'generic password',
 
    None,
 
)
 

	
 
@pytest.fixture
 
def rt_environ():
 
    return dict(zip(RT_ENV_KEYS, RT_ENV_CREDS))
 

	
 
def _update_environ(updates):
 
    for key, value in updates.items():
 
        if value is None:
 
            os.environ.pop(key, None)
 
        else:
 
            os.environ[key] = str(value)
 

	
 
@contextlib.contextmanager
...
 
@@ -100,12 +110,51 @@ def test_rt_credentials_from_file_and_environment_mixed(rt_environ, index, drop_
 
    expected = list(RT_ENV_CREDS)
 
    expected[index] = RT_FILE_CREDS[index]
 
    assert rt_credentials == tuple(expected)
 

	
 
def test_rt_credentials_from_all_sources_mixed(tmp_path):
 
    server = 'https://example.org/mixedrt'
 
    with (tmp_path / '.rtrc').open('w') as rtrc_file:
 
        print('user basemix', 'passwd mixed up', file=rtrc_file, sep='\n')
 
    with update_environ(HOME=tmp_path, RTSERVER=server, RTUSER='mixedup'):
 
        config = config_mod.Config()
 
        rt_credentials = config.rt_credentials()
 
    assert rt_credentials == (server, 'mixedup', 'mixed up', 'rt')
 

	
 
def check_rt_client_url(credentials, client):
 
    pattern = '^{}/?$'.format(re.escape(credentials[0].rstrip('/') + '/REST/1.0'))
 
    assert re.match(pattern, client.url)
 

	
 
@pytest.mark.parametrize('authmethod', RT_AUTH_METHODS)
 
def test_rt_client(authmethod):
 
    rt_credentials = RT_GENERIC_CREDS._replace(auth=authmethod)
 
    config = config_mod.Config()
 
    rt_client = config.rt_client(rt_credentials, testutil.RTClient)
 
    check_rt_client_url(RT_GENERIC_CREDS, rt_client)
 
    assert rt_client.auth_method == ('HTTPBasicAuth' if authmethod == 'basic' else 'login')
 
    assert rt_client.last_login == (
 
        RT_GENERIC_CREDS.user,
 
        RT_GENERIC_CREDS.passwd,
 
        True,
 
    )
 

	
 
def test_default_rt_client(rt_environ):
 
    with update_environ(**rt_environ):
 
        config = config_mod.Config()
 
        rt_client = config.rt_client(client=testutil.RTClient)
 
    check_rt_client_url(RT_ENV_CREDS, rt_client)
 
    assert rt_client.last_login[:-1] == RT_ENV_CREDS[1:3]
 
    assert rt_client.last_login[-1]
 

	
 
@pytest.mark.parametrize('authmethod', RT_AUTH_METHODS)
 
def test_rt_client_login_failure(authmethod):
 
    rt_credentials = RT_GENERIC_CREDS._replace(
 
        auth=authmethod,
 
        passwd='bad{}'.format(authmethod),
 
    )
 
    config = config_mod.Config()
 
    assert config.rt_client(rt_credentials, testutil.RTClient) is None
 

	
 
def test_no_rt_client_without_server():
 
    rt_credentials = RT_GENERIC_CREDS._replace(server=None, auth='rt')
 
    config = config_mod.Config()
 
    assert config.rt_client(rt_credentials, testutil.RTClient) is None
tests/testutil.py
Show inline comments
...
 
@@ -103,12 +103,45 @@ class Transaction:
 
            posting = Posting(arg, *args, **kwargs)
 
        else:
 
            posting = arg
 
        self.postings.append(posting)
 

	
 

	
 
class TestConfig:
 
    def __init__(self, repo_path=None):
 
        self.repo_path = test_path(repo_path)
 

	
 
    def repository_path(self):
 
        return self.repo_path
 

	
 

	
 
class RTClient:
 
    def __init__(self,
 
                 url,
 
                 default_login=None,
 
                 default_password=None,
 
                 proxy=None,
 
                 default_queue='General',
 
                 skip_login=False,
 
                 verify_cert=True,
 
                 http_auth=None,
 
    ):
 
        self.url = url
 
        if http_auth is None:
 
            self.user = default_login
 
            self.password = default_password
 
            self.auth_method = 'login'
 
            self.login_result = skip_login or None
 
        else:
 
            self.user = http_auth.username
 
            self.password = http_auth.password
 
            self.auth_method = type(http_auth).__name__
 
            self.login_result = True
 
        self.last_login = None
 

	
 
    def login(self, login=None, password=None):
 
        if login is None and password is None:
 
            login = self.user
 
            password = self.password
 
        self.login_result = bool(login and password and not password.startswith('bad'))
 
        self.last_login = (login, password, self.login_result)
 
        return self.login_result
0 comments (0 inline, 0 general)