Changeset - 747ef25da618
[Not reviewed]
0 5 0
Brett Smith - 4 years ago 2020-03-30 03:18:40
brettcsmith@brettcsmith.org
setup: Disallow untyped defs.

Mostly this meant giving annotations to low-value functions like
the error classes and __init_subclass__, but it's worth it for
the future strictness+documentation value.
5 files changed with 71 insertions and 14 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/errors.py
Show inline comments
 
"""Error classes for plugins to report problems in the books"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import beancount.core.data as bc_data
 

	
 
from typing import (
 
    Any,
 
    Iterable,
 
    Optional,
 
)
 

	
 
from .beancount_types import (
 
    Directive,
 
    MetaKey,
 
    MetaValue,
 
    Posting,
 
    Transaction,
 
    Type,
 
)
 

	
 
Meta = Optional[bc_data.Meta]
 

	
 
class Error(Exception):
 
    def __init__(self, message, entry, source=None):
 
    def __init__(self,
 
                 message: str,
 
                 entry: Optional[Directive],
 
                 source: Meta=None,
 
    ) -> None:
 
        self.message = message
 
        self.entry = entry
 
        self.source = entry.meta if source is None else source
 
        if source:
 
            self.source = source
 
        elif entry is not None:
 
            self.source = entry.meta
 
        else:
 
            self.source = {}
 
            self._fill_source(self.source, '<unknown>')
 

	
 
    def __repr__(self):
 
    def __repr__(self) -> str:
 
        return "{clsname}<{source[filename]}:{source[lineno]}: {message}>".format(
 
            clsname=type(self).__name__,
 
            message=self.message,
 
            source=self.source,
 
        )
 

	
 
    def _fill_source(self, source, filename='conservancy_beancount', lineno=0):
 
    def _fill_source(self,
 
                     source: bc_data.Meta,
 
                     filename: str='conservancy_beancount',
 
                     lineno: int=0,
 
    ) -> None:
 
        source.setdefault('filename', filename)
 
        source.setdefault('lineno', lineno)
 

	
 

	
 
Iter = Iterable[Error]
 

	
 
class BrokenLinkError(Error):
 
    def __init__(self, txn, key, link, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 link: str,
 
                 source: Meta=None,
 
    ) -> None:
 
        super().__init__(
 
            "{} not found in repository: {}".format(key, link),
 
            txn,
 
            source,
 
        )
 

	
 

	
 
class BrokenRTLinkError(Error):
 
    def __init__(self, txn, key, link, parsed=True, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 link: str,
 
                 parsed: Any=True,
 
                 source: Meta=None,
 
    ) -> None:
 
        if parsed:
 
            msg_fmt = "{} not found in RT: {}"
 
        else:
 
            msg_fmt = "{} link is malformed: {}"
 
        super().__init__(
 
            msg_fmt.format(key, link),
 
            txn,
 
            source,
 
        )
 

	
 

	
 
class ConfigurationError(Error):
 
    def __init__(self, message, entry=None, source=None):
 
    def __init__(self,
 
                 message: str,
 
                 entry: Optional[Directive]=None,
 
                 source: Meta=None,
 
    ) -> None:
 
        if source is None:
 
            source = {}
 
        self._fill_source(source)
 
        super().__init__(message, entry, source)
 

	
 

	
 
class InvalidMetadataError(Error):
 
    def __init__(self, txn, key, value=None, post=None, need_type=str, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 value: Optional[MetaValue]=None,
 
                 post: Optional[bc_data.Posting]=None,
 
                 need_type: Type=str,
 
                 source: Meta=None,
 
    ) -> None:
 
        if post is None:
 
            srcname = 'transaction'
 
        else:
 
            srcname = post.account
 
        if value is None:
 
            msg = "{} missing {}".format(srcname, key)
 
        elif isinstance(value, need_type):
 
            msg = "{} has invalid {}: {}".format(srcname, key, value)
 
        else:
 
            msg = "{} has wrong type of {}: expected {} but is a {}".format(
 
                srcname, key, need_type.__name__, type(value).__name__,
 
            )
 
        super().__init__(msg, txn, source)
conservancy_beancount/plugin/core.py
Show inline comments
 
"""Base classes for plugin checks"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import abc
 
import datetime
 
import re
 

	
 
from .. import config as configmod
 
from .. import data
 
from .. import errors as errormod
 

	
 
from typing import (
 
    Any,
 
    Dict,
 
    FrozenSet,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    Optional,
 
    TypeVar,
 
)
 
from ..beancount_types import (
 
    Account,
 
    Directive,
 
    MetaKey,
 
    MetaValue,
 
    MetaValueEnum,
 
    Transaction,
 
    Type,
 
)
 

	
 
### CONSTANTS
 

	
 
# I expect these will become configurable in the future, which is why I'm
 
# keeping them outside of a class, but for now constants will do.
 
DEFAULT_START_DATE: datetime.date = datetime.date(2020, 3, 1)
 
# The default stop date leaves a little room after so it's easy to test
 
# dates past the far end of the range.
 
DEFAULT_STOP_DATE: datetime.date = datetime.date(datetime.MAXYEAR, 1, 1)
 

	
 
### TYPE DEFINITIONS
 

	
 
HookName = str
 

	
 
Entry = TypeVar('Entry', bound=Directive)
 
class Hook(Generic[Entry], metaclass=abc.ABCMeta):
 
    DIRECTIVE: Type[Directive]
 
    HOOK_GROUPS: FrozenSet[HookName] = frozenset()
 

	
 
    def __init__(self, config: configmod.Config) -> None:
 
        pass
 
        # Subclasses that need configuration should override __init__ to check
 
        # and store it.
 

	
 
    @abc.abstractmethod
 
    def run(self, entry: Entry) -> errormod.Iter: ...
 

	
 
    def __init_subclass__(cls):
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]
 
    def __init_subclass__(cls) -> None:
 
        # cls.__orig_bases__ comes from the ABCMeta metaclass
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]  # type:ignore[attr-defined]
 

	
 

	
 
TransactionHook = Hook[Transaction]
 

	
 
### HELPER CLASSES
 

	
 
class LessComparable(metaclass=abc.ABCMeta):
 
    @abc.abstractmethod
 
    def __le__(self, other: Any) -> bool: ...
 

	
 
    @abc.abstractmethod
 
    def __lt__(self, other: Any) -> bool: ...
 

	
 

	
 
CT = TypeVar('CT', bound=LessComparable)
 
class _GenericRange(Generic[CT]):
 
    """Convenience class to check whether a value is within a range.
 

	
 
    `foo in generic_range` is equivalent to `start <= foo < stop`.
 
    Since we have multiple user-configurable ranges, having the check
 
    encapsulated in an object helps implement the check consistently, and
 
    makes it easier for subclasses to override.
 
    """
 

	
 
    def __init__(self, start: CT, stop: CT) -> None:
 
        self.start = start
 
        self.stop = stop
 

	
 
    def __repr__(self) -> str:
 
        return "{clsname}({self.start!r}, {self.stop!r})".format(
 
            clsname=type(self).__name__,
 
            self=self,
 
        )
 

	
 
    def __contains__(self, item: CT) -> bool:
 
        return self.start <= item < self.stop
 

	
 

	
 
class MetadataEnum:
 
    """Map acceptable metadata values to their normalized forms.
 

	
 
    When a piece of metadata uses a set of allowed values, use this class to
 
    define them. You can also specify aliases that hooks will normalize to
 
    the primary values.
 
    """
 

	
 
    def __init__(self,
 
                 key: MetaKey,
 
                 standard_values: Iterable[MetaValueEnum],
 
                 aliases_map: Optional[Mapping[MetaValueEnum, MetaValueEnum]]=None,
 
    ) -> None:
 
        """Specify allowed values and aliases for this metadata.
 

	
 
        Arguments:
 

	
 
        * key: The name of the metadata key that uses this enum.
 
        * standard_values: A sequence of strings that enumerate the standard
 
          values for this metadata.
 
        * aliases_map: A mapping of strings to strings. The keys are
 
          additional allowed metadata values. The values are standard values
 
          that each key will evaluate to. The code asserts that all values are
 
          in standard_values.
 
        """
 
        self.key = key
 
        self._stdvalues = frozenset(standard_values)
 
        self._aliases: Dict[MetaValueEnum, MetaValueEnum] = dict(aliases_map or ())
 
        assert self._stdvalues.issuperset(self._aliases.values())
 
        self._aliases.update((v, v) for v in standard_values)
 

	
 
    def __repr__(self) -> str:
 
        return "{}<{}>".format(type(self).__name__, self.key)
 

	
 
    def __contains__(self, key: MetaValueEnum) -> bool:
 
        """Returns true if `key` is a standard value or alias."""
 
        return key in self._aliases
 

	
 
    def __getitem__(self, key: MetaValueEnum) -> MetaValueEnum:
 
        """Return the standard value for `key`.
 

	
 
        Raises KeyError if `key` is not a known value or alias.
 
        """
 
        return self._aliases[key]
 

	
 
    def __iter__(self) -> Iterator[MetaValueEnum]:
 
        """Iterate over standard values."""
 
        return iter(self._stdvalues)
 

	
 
    def get(self,
 
            key: MetaValueEnum,
 
            default_key: Optional[MetaValueEnum]=None,
 
    ) -> Optional[MetaValueEnum]:
 
        """Return self[key], or a default fallback if that doesn't exist.
 

	
 
        default_key is another key to look up, *not* a default value to return.
 
        This helps ensure you always get a standard value.
 
        """
 
        try:
 
            return self[key]
 
        except KeyError:
 
            if default_key is None:
 
                return None
 
            else:
 
                return self[default_key]
 

	
 

	
 
### HOOK SUBCLASSES
 

	
 
class _PostingHook(TransactionHook, metaclass=abc.ABCMeta):
 
    TXN_DATE_RANGE: _GenericRange = _GenericRange(DEFAULT_START_DATE, DEFAULT_STOP_DATE)
 

	
 
    def __init_subclass__(cls) -> None:
 
        cls.HOOK_GROUPS = cls.HOOK_GROUPS.union(['posting'])
 

	
 
    def _run_on_txn(self, txn: Transaction) -> bool:
 
        return txn.date in self.TXN_DATE_RANGE
 

	
 
    def _run_on_post(self, txn: Transaction, post: data.Posting) -> bool:
 
        return True
 

	
 
    def run(self, txn: Transaction) -> errormod.Iter:
 
        if self._run_on_txn(txn):
 
            for post in data.iter_postings(txn):
 
                if self._run_on_post(txn, post):
 
                    yield from self.post_run(txn, post)
 

	
 
    @abc.abstractmethod
 
    def post_run(self, txn: Transaction, post: data.Posting) -> errormod.Iter: ...
 

	
 

	
 
class _NormalizePostingMetadataHook(_PostingHook):
 
    """Base class to normalize posting metadata from an enum."""
 
    # This class provides basic functionality to filter postings, normalize
 
    # metadata values, and set default values.
 
    METADATA_KEY: MetaKey
 
    VALUES_ENUM: MetadataEnum
 

	
 
    def __init_subclass__(cls) -> None:
 
        super().__init_subclass__()
 
        cls.METADATA_KEY = cls.VALUES_ENUM.key
 
        cls.HOOK_GROUPS = cls.HOOK_GROUPS.union(['metadata', cls.METADATA_KEY])
 

	
 
    # If the posting does not specify METADATA_KEY, the hook calls
 
    # _default_value to get a default. This method should either return
 
    # a value string from METADATA_ENUM, or else raise InvalidMetadataError.
 
    # This base implementation does the latter.
 
    def _default_value(self, txn: Transaction, post: data.Posting) -> MetaValueEnum:
 
        raise errormod.InvalidMetadataError(txn, self.METADATA_KEY, None, post)
 

	
 
    def post_run(self, txn: Transaction, post: data.Posting) -> errormod.Iter:
 
        source_value = post.meta.get(self.METADATA_KEY)
 
        set_value = source_value
 
        error: Optional[errormod.Error] = None
 
        if source_value is None:
 
            try:
 
                set_value = self._default_value(txn, post)
 
            except errormod.Error as error_:
 
                error = error_
 
        else:
 
            try:
 
                set_value = self.VALUES_ENUM[source_value]
 
            except KeyError:
 
                error = errormod.InvalidMetadataError(
 
                    txn, self.METADATA_KEY, source_value, post,
 
                )
 
        if error is None:
 
            post.meta[self.METADATA_KEY] = set_value
 
        else:
 
            yield error
 

	
 

	
 
class _RequireLinksPostingMetadataHook(_PostingHook):
 
    """Base class to require that posting metadata include links"""
 
    # This base class confirms that a posting's metadata has one or more links
 
    # under METADATA_KEY.
 
    # Most subclasses only need to define METADATA_KEY and _run_on_post.
 
    METADATA_KEY: str
 

	
 
    def __init_subclass__(cls) -> None:
 
        super().__init_subclass__()
 
        cls.HOOK_GROUPS = cls.HOOK_GROUPS.union(['metadata', cls.METADATA_KEY])
 

	
 
    def _check_links(self, txn: Transaction, post: data.Posting, key: MetaKey) -> None:
 
        try:
 
            problem = not post.meta.get_links(key)
 
            value = None
 
        except TypeError:
 
            problem = True
 
            value = post.meta[key]
 
        if problem:
 
            raise errormod.InvalidMetadataError(txn, key, value, post)
 

	
 
    def post_run(self, txn: Transaction, post: data.Posting) -> errormod.Iter:
 
        try:
 
            self._check_links(txn, post, self.METADATA_KEY)
 
        except errormod.Error as error:
 
            yield error
conservancy_beancount/plugin/meta_project.py
Show inline comments
 
"""meta_project - Validate project metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
from pathlib import Path
 

	
 
import yaml
 
import yaml.error
 

	
 
from . import core
 
from .. import config as configmod
 
from .. import data
 
from .. import errors as errormod
 
from ..beancount_types import (
 
    MetaValueEnum,
 
    Transaction,
 
)
 

	
 
from typing import (
 
    Any,
 
    Dict,
 
    NoReturn,
 
    Optional,
 
    Set,
 
)
 

	
 
class MetaProject(core._NormalizePostingMetadataHook):
 
    DEFAULT_PROJECT = 'Conservancy'
 
    PROJECT_DATA_PATH = Path('Projects', 'project-data.yml')
 
    VALUES_ENUM = core.MetadataEnum('project', {DEFAULT_PROJECT})
 

	
 
    def __init__(self, config: configmod.Config, source_path: Path=PROJECT_DATA_PATH) -> None:
 
        repo_path = config.repository_path()
 
        if repo_path is None:
 
            raise self._config_error("no repository configured")
 
            self._config_error("no repository configured")
 
        project_data_path = repo_path / source_path
 
        source = {'filename': str(project_data_path)}
 
        try:
 
            with project_data_path.open() as yaml_file:
 
                project_data: Dict[str, Dict[str, Any]] = yaml.safe_load(yaml_file)
 
            names: Set[MetaValueEnum] = {self.DEFAULT_PROJECT}
 
            aliases: Dict[MetaValueEnum, MetaValueEnum] = {}
 
            for key, params in project_data.items():
 
                name = params.get('accountName', key)
 
                names.add(name)
 
                human_name = params.get('humanName', name)
 
                if name != human_name:
 
                    aliases[human_name] = name
 
                if name != key:
 
                    aliases[key] = name
 
        except AttributeError:
 
            self._config_error("loaded YAML data not in project-data format", project_data_path)
 
        except OSError as error:
 
            self._config_error(error.strerror, project_data_path)
 
        except yaml.error.YAMLError as error:
 
            self._config_error(error.args[0] or "YAML load error", project_data_path)
 
        else:
 
            self.VALUES_ENUM = core.MetadataEnum(self.METADATA_KEY, names, aliases)
 

	
 
    def _config_error(self, msg: str, filename: Optional[Path]=None):
 
    def _config_error(self, msg: str, filename: Optional[Path]=None) -> NoReturn:
 
        source = {}
 
        if filename is not None:
 
            source['filename'] = str(filename)
 
        raise errormod.ConfigurationError(
 
            "cannot load project data: " + msg,
 
            source=source,
 
        )
 

	
 
    def _run_on_post(self, txn: Transaction, post: data.Posting) -> bool:
 
        return post.account.is_under('Assets', 'Liabilities') is None
 

	
 
    def _default_value(self, txn: Transaction, post: data.Posting) -> MetaValueEnum:
 
        if post.account.is_under(
 
                'Accrued:VacationPayable',
 
                'Expenses:Payroll',
 
        ):
 
            return self.DEFAULT_PROJECT
 
        else:
 
            raise errormod.InvalidMetadataError(txn, self.METADATA_KEY, None, post)
conservancy_beancount/rtutil.py
Show inline comments
 
"""RT client utilities"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import functools
 
import mimetypes
 
import re
 
import sqlite3
 
import urllib.parse as urlparse
 

	
 
import rt
 

	
 
from pathlib import Path
 

	
 
from typing import (
 
    Callable,
 
    Iterator,
 
    MutableMapping,
 
    Optional,
 
    Set,
 
    Tuple,
 
    Union,
 
)
 

	
 
RTId = Union[int, str]
 
TicketAttachmentIds = Tuple[str, Optional[str]]
 
_LinkCache = MutableMapping[TicketAttachmentIds, Optional[str]]
 
_URLLookup = Callable[..., Optional[str]]
 

	
 
class RTLinkCache(_LinkCache):
 
    """Cache RT links to disk
 

	
 
    This class provides a dict-like interface to a cache of RT links.
 
    Once an object is in RT, a link to it should never change.
 
    The only exception is when objects get shredded, and those objects
 
    shouldn't be referenced in books anyway.
 

	
 
    This implementation is backed by a sqlite database. You can call::
 

	
 
        db = RTLinkCache.setup(path)
 

	
 
    This method will try to open a sqlite database at the given path,
 
    and set up necessary tables, etc.
 
    If it succeeds, it returns a database connection you can use to
 
    initialize the cache.
 
    If it fails, it returns None, and the caller should use some other
 
    dict-like object (like a normal dict) for caching.
 
    You can give the result to the RT utility class either way,
 
    and it will do the right thing for itself::
 

	
 
        rt = RT(rt_client, db)
 
    """
 

	
 
    CREATE_TABLE_SQL = """CREATE TABLE IF NOT EXISTS RTLinkCache(
 
 ticket_id TEXT NOT NULL,
 
 attachment_id TEXT,
 
 url TEXT NOT NULL,
 
 PRIMARY KEY (ticket_id, attachment_id)
 
)"""
 

	
 
    @classmethod
 
    def setup(cls, cache_path: Path) -> Optional[sqlite3.Connection]:
 
        try:
 
            db = sqlite3.connect(cache_path, isolation_level=None)
 
            cursor = db.cursor()
 
            cursor.execute(cls.CREATE_TABLE_SQL)
 
            cursor.execute('SELECT url FROM RTLinkCache LIMIT 1')
 
            have_data = cursor.fetchone() is not None
 
        except sqlite3.OperationalError:
 
            # If we couldn't get this far, sqlite provides no benefit.
 
            return None
 
        try:
 
            # There shouldn't be any records where url is NULL, so running this
 
            # DELETE pulls double duty for us: it tells us whether or not we
 
            # can write to the database and it enforces database integrity.
 
            cursor.execute('DELETE FROM RTLinkCache WHERE url IS NULL')
 
        except sqlite3.OperationalError:
 
            can_write = False
 
        else:
 
            can_write = True
 
        print(cache_path, have_data, can_write)
 
        if not (can_write or have_data):
 
            # If there's nothing to read and no way to write, sqlite provides
 
            # no benefit.
 
            return None
 
        elif not can_write:
 
            # Set up an in-memory database that we can write to, seeded with
 
            # the data available to read.
 
            try:
 
                cursor.close()
 
                db.close()
 
                db = sqlite3.connect(':memory:', isolation_level=None)
 
                cursor = db.cursor()
 
                cursor.execute('ATTACH DATABASE ? AS readsource',
 
                               ('{}?mode=ro'.format(cache_path.as_uri()),))
 
                cursor.execute(cls.CREATE_TABLE_SQL)
 
                cursor.execute('INSERT INTO RTLinkCache SELECT * FROM readsource.RTLinkCache')
 
                cursor.execute('DETACH DATABASE readsource')
 
            except sqlite3.OperationalError as error:
 
                # We're back to the case of having nothing to read and no way
 
                # to write.
 
                return None
 
        cursor.close()
 
        db.commit()
 
        return db
 

	
 
    def __init__(self, cache_db: sqlite3.Connection) -> None:
 
        self._db = cache_db
 
        self._nourls: Set[TicketAttachmentIds] = set()
 

	
 
    def __iter__(self) -> Iterator[TicketAttachmentIds]:
 
        yield from self._db.execute('SELECT ticket_id, attachment_id FROM RTLinkCache')
 
        yield from self._nourls
 

	
 
    def __len__(self) -> int:
 
        cursor = self._db.execute('SELECT COUNT(*) FROM RTLinkCache')
 
        count: int = cursor.fetchone()[0]
 
        return count + len(self._nourls)
 

	
 
    def __getitem__(self, key: TicketAttachmentIds) -> Optional[str]:
 
        if key in self._nourls:
 
            return None
 
        cursor = self._db.execute(
 
            'SELECT url FROM RTLinkCache WHERE ticket_id = ? AND attachment_id IS ?',
 
            key,
 
        )
 
        row = cursor.fetchone()
 
        if row is None:
 
            raise KeyError(key)
 
        else:
 
            retval: str = row[0]
 
            return retval
 

	
 
    def __setitem__(self, key: TicketAttachmentIds, value: Optional[str]) -> None:
 
        if value is None:
 
            self._nourls.add(key)
 
        else:
 
            ticket_id, attachment_id = key
 
            self._db.execute(
 
                'INSERT INTO RTLinkCache VALUES(?, ?, ?)',
 
                (ticket_id, attachment_id, value),
 
            )
 

	
 
    def __delitem__(self, key: TicketAttachmentIds) -> None:
 
        raise NotImplementedError("RTLinkCache.__delitem__")
 

	
 

	
 
class RT:
 
    """RT utility wrapper class
 

	
 
    Given an RT client object, this class provides common functionality for
 
    working with RT links in Beancount metadata:
 

	
 
    * Parse links
 
    * Verify that they refer to extant objects in RT
 
    * Convert metadata links to RT web links
 
    * Cache results, to reduce network requests.
 
      You can set up an RTLinkCache to cache links to disks over multiple runs.
 
      Refer to RTLinkCache's docstring for details and instructions.
 
    """
 

	
 
    PARSE_REGEXPS = [
 
        re.compile(r'^rt:([0-9]+)(?:/([0-9]+))?/?$'),
 
        re.compile(r'^rt://ticket/([0-9]+)(?:/attachments?/([0-9]+))?/?$'),
 
    ]
 

	
 
    def __init__(self, rt_client: rt.Rt, cache_db: Optional[sqlite3.Connection]=None) -> None:
 
        urlparts = urlparse.urlparse(rt_client.url)
 
        try:
 
            index = urlparts.path.rindex('/REST/')
 
        except ValueError:
 
            base_path = urlparts.path.rstrip('/') + '/'
 
        else:
 
            base_path = urlparts.path[:index + 1]
 
        self.url_base = urlparts._replace(path=base_path)
 
        self.rt = rt_client
 
        self._cache: _LinkCache
 
        if cache_db is None:
 
            self._cache = {}
 
        else:
 
            self._cache = RTLinkCache(cache_db)
 

	
 
    # mypy complains that the first argument isn't self, but this isn't meant
 
    # to be a method, it's just an internal decrator.
 
    def _cache_method(func: _URLLookup) -> _URLLookup:  # type:ignore[misc]
 
        @functools.wraps(func)
 
        def caching_wrapper(self,
 
        def caching_wrapper(self: 'RT',
 
                            ticket_id: RTId,
 
                            attachment_id: Optional[RTId]=None,
 
        ) -> Optional[str]:
 
            cache_key = (str(ticket_id), attachment_id and str(attachment_id))
 
            cache_key = (str(ticket_id),
 
                         None if attachment_id is None else str(attachment_id))
 
            url: Optional[str]
 
            try:
 
                url = self._cache[cache_key]
 
            except KeyError:
 
                if attachment_id is None:
 
                    url = func(self, ticket_id)
 
                else:
 
                    url = func(self, ticket_id, attachment_id)
 
                self._cache[cache_key] = url
 
            return url
 
        return caching_wrapper
 

	
 
    def _extend_url(self,
 
                    path_tail: str,
 
                    fragment: Optional[str]=None,
 
                    **query: str,
 
    ) -> str:
 
        if fragment is None:
 
            fragment = self.url_base.fragment
 
        else:
 
            fragment = urlparse.quote(fragment)
 
        if query:
 
            query_s = urlparse.urlencode(query)
 
        else:
 
            query_s = self.url_base.query
 
        urlparts = self.url_base._replace(
 
            path=self.url_base.path + urlparse.quote(path_tail),
 
            query=query_s,
 
            fragment=fragment,
 
        )
 
        return urlparse.urlunparse(urlparts)
 

	
 
    def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
 
        if txn_id is None:
 
            fragment = None
 
        else:
 
            fragment = 'txn-{}'.format(txn_id)
 
        return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
 

	
 
    @_cache_method
 
    def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
 
        attachment = self.rt.get_attachment(ticket_id, attachment_id)
 
        if attachment is None:
 
            return None
 
        mimetype = attachment.get('ContentType', '')
 
        if mimetype.startswith('text/'):
 
            return self._ticket_url(ticket_id, attachment['Transaction'])
 
        else:
 
            filename = attachment.get('Filename', '')
 
            if not filename:
 
                filename = 'RT{} attachment {}{}'.format(
 
                    ticket_id,
 
                    attachment_id,
 
                    mimetypes.guess_extension(mimetype) or '.bin',
 
                )
 
            path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
 
                attachment,
 
                filename,
 
            )
 
            return self._extend_url(path_tail)
 

	
 
    def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
 
        return self.url(ticket_id, attachment_id) is not None
 

	
 
    @classmethod
 
    def parse(cls, s: str) -> Optional[Tuple[str, Optional[str]]]:
 
        for regexp in cls.PARSE_REGEXPS:
 
            match = regexp.match(s)
 
            if match is not None:
 
                ticket_id, attachment_id = match.groups()
 
                return (ticket_id, attachment_id)
 
        return None
 

	
 
    @_cache_method
 
    def ticket_url(self, ticket_id: RTId) -> Optional[str]:
 
        if self.rt.get_ticket(ticket_id) is None:
 
            return None
 
        return self._ticket_url(ticket_id)
 

	
 
    def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]:
 
        if attachment_id is None:
 
            return self.ticket_url(ticket_id)
 
        else:
 
            return self.attachment_url(ticket_id, attachment_id)
setup.cfg
Show inline comments
 
[aliases]
 
test=pytest
 
typecheck=pytest --addopts="--mypy conservancy_beancount"
 

	
 
[mypy]
 
disallow_any_unimported = True
 
disallow_untyped_defs = True
 
show_error_codes = True
 
strict_equality = True
 
warn_redundant_casts = True
 
warn_return_any = True
 
warn_unreachable = True
 
warn_unused_configs = True
0 comments (0 inline, 0 general)