Files @ 9e33b2795ca5
Branch filter:

Location: NPO-Accounting/conservancy_beancount/conservancy_beancount/config.py

Brett Smith
config: Add RTCredentials.idstr() method.

Want to reuse this code for a query-report cache key.
"""User configuration for Conservancy bookkeeping tools"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import configparser
import datetime
import decimal
import functools
import os
import re
import urllib.parse as urlparse

import git  # type:ignore[import]
import requests.auth
import rt

from pathlib import Path
from typing import (
    Mapping,
    NamedTuple,
    Optional,
    Tuple,
    Type,
)

from . import books
from . import rtutil

class RTCredentials(NamedTuple):
    server: Optional[str] = None
    user: Optional[str] = None
    passwd: Optional[str] = None
    auth: Optional[str] = None

    @classmethod
    def from_env(cls) -> 'RTCredentials':
        values = dict(cls._field_defaults)
        for key in values:
            env_key = 'RT{}'.format(key.upper())
            try:
                values[key] = os.environ[env_key]
            except KeyError:
                pass
        return cls(**values)

    @classmethod
    def from_rtrc(cls) -> 'RTCredentials':
        values = dict(cls._field_defaults)
        rtrc_path = Path.home() / '.rtrc'
        try:
            with rtrc_path.open() as rtrc_file:
                for line in rtrc_file:
                    try:
                        key, value = line.split(None, 1)
                    except ValueError:
                        pass
                    else:
                        if key in values:
                            values[key] = value.rstrip('\n')
        except OSError:
            return cls()
        else:
            return cls(**values)

    def idstr(self) -> str:
        """Return a string unique to these credentials

        This returns a string that incorporates the server URL and user.
        The string will be unique across different credentials.
        It's suitable for use as a cache key or filename.
        """
        return '{}@{}'.format(
            self.user or '',
            urlparse.quote(self.server or '', ''),
        )


class Config:
    _ENVIRON_DEFAULT_PATHS = {
        'XDG_CACHE_HOME': Path('.cache'),
        'XDG_CONFIG_HOME': Path('.config'),
    }

    def __init__(self) -> None:
        self.file_config = configparser.ConfigParser()
        self.file_config.read_string("[Beancount]\n")

    def load_file(self, config_path: Optional[Path]=None) -> None:
        if config_path is None:
            config_path = self.config_file_path()
        with config_path.open() as config_file:
            self.file_config.read_file(config_file)

    def load_string(self, config_str: str) -> None:
        self.file_config.read_string(config_str)

    def _abspath(self, source: Mapping[str, str], key: str) -> Optional[Path]:
        try:
            retval = Path(source[key])
        except (KeyError, ValueError):
            ok = False
        else:
            if source is not os.environ:
                retval = retval.expanduser()
            ok = retval.is_absolute()
        return retval if ok else None

    def _dir_or_none(self, path: Path) -> Optional[Path]:
        try:
            path.mkdir(exist_ok=True)
        except OSError:
            return None
        else:
            return path

    def _path_from_environ(self, key: str, default: Optional[Path]=None) -> Path:
        retval = self._abspath(os.environ, key)
        if retval is None:
            retval = default or (Path.home() / self._ENVIRON_DEFAULT_PATHS[key])
        return retval

    def books_loader(self) -> Optional[books.Loader]:
        books_path = self.books_path()
        if books_path is None:
            return None
        else:
            return books.Loader(books_path, self.fiscal_year_begin())

    def books_path(self) -> Optional[Path]:
        return self._abspath(self.file_config['Beancount'], 'books dir')

    def books_repo(self) -> Optional[git.Repo]:
        """Return a git.Repo object for the books directory

        Returns None if the books directory is not a valid Git repository.
        """
        try:
            return git.Repo(self.file_config['Beancount']['books dir'])
        except (KeyError, git.exc.GitError):
            return None

    def cache_dir_path(self, name: str='conservancy_beancount') -> Optional[Path]:
        cache_root = self._path_from_environ('XDG_CACHE_HOME')
        return (
            self._dir_or_none(cache_root)
            and self._dir_or_none(cache_root / name)
        )

    def config_file_path(self, name: str='conservancy_beancount') -> Path:
        config_root = self._path_from_environ('XDG_CONFIG_HOME')
        return Path(config_root, name, 'config.ini')

    def fiscal_year_begin(self) -> books.FiscalYear:
        s = self.file_config.get('Beancount', 'fiscal year begin', fallback='3 1')
        match = re.match(r'([01]?[0-9])(?:\s*[-./ ]\s*([0-3]?[0-9]))?$', s.strip())
        if match is None:
            raise ValueError(f"fiscal year begin {s!r} has unknown format")
        try:
            month = int(match.group(1))
            day = int(match.group(2) or 1)
            # To check date validity we use an arbitrary year that's
            # 1. definitely using the modern calendar
            # 2. far enough in the past to not have books (pre-Unix epoch)
            # 3. not a leap year
            datetime.date(1959, month, day)
        except ValueError as e:
            raise ValueError(f"fiscal year begin {s!r} is invalid date: {e.args[0]}")
        else:
            return books.FiscalYear(month, day)

    def payment_threshold(self) -> decimal.Decimal:
        try:
            return decimal.Decimal(self.file_config['Beancount']['payment threshold'])
        except (KeyError, ValueError):
            return decimal.Decimal(10)

    def repository_path(self) -> Optional[Path]:
        retval = self._abspath(self.file_config['Beancount'], 'repository dir')
        if retval is None:
            retval = self._abspath(os.environ, 'CONSERVANCY_REPOSITORY')
        return retval

    def rt_credentials(self) -> RTCredentials:
        all_creds = zip(
            RTCredentials.from_env(),
            RTCredentials.from_rtrc(),
            RTCredentials(auth='rt'),
        )
        return RTCredentials._make(v0 or v1 or v2 for v0, v1, v2 in all_creds)

    def rt_client(self,
                  credentials: RTCredentials=None,
                  client: Type[rt.Rt]=rt.Rt,
    ) -> Optional[rt.Rt]:
        if credentials is None:
            credentials = self.rt_credentials()
        if credentials.server is None:
            return None
        urlparts = urlparse.urlparse(credentials.server)
        rest_path = urlparts.path.rstrip('/') + '/REST/1.0/'
        url = urlparse.urlunparse(urlparts._replace(path=rest_path))
        if credentials.auth == 'basic':
            auth = requests.auth.HTTPBasicAuth(credentials.user, credentials.passwd)
            retval = client(url, http_auth=auth)
        else:
            retval = client(url, credentials.user, credentials.passwd)
        if retval.login():
            return retval
        else:
            return None

    @functools.lru_cache(4)
    def _rt_wrapper(self, credentials: RTCredentials, client: Type[rt.Rt]) -> Optional[rtutil.RT]:
        wrapper_client = self.rt_client(credentials, client)
        if wrapper_client is None:
            return None
        cache_dir_path = self.cache_dir_path()
        if cache_dir_path is None:
            cache_db = None
        else:
            cache_path = cache_dir_path / f'{credentials.idstr()}.sqlite3'
            try:
                cache_path.touch(0o600)
            except OSError:
                # RTLinkCache.setup() will handle the problem.
                pass
            cache_db = rtutil.RTLinkCache.setup(cache_path)
        return rtutil.RT(wrapper_client, cache_db)

    def rt_wrapper(self,
                  credentials: RTCredentials=None,
                  client: Type[rt.Rt]=rt.Rt,
    ) -> Optional[rtutil.RT]:
        if credentials is None:
            credentials = self.rt_credentials()
        # type ignore for <https://github.com/python/typeshed/issues/4638>
        return self._rt_wrapper(credentials, client)  # type:ignore[arg-type]