Changeset - 747ef25da618
[Not reviewed]
0 5 0
Brett Smith - 4 years ago 2020-03-30 03:18:40
brettcsmith@brettcsmith.org
setup: Disallow untyped defs.

Mostly this meant giving annotations to low-value functions like
the error classes and __init_subclass__, but it's worth it for
the future strictness+documentation value.
5 files changed with 71 insertions and 14 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/errors.py
Show inline comments
 
"""Error classes for plugins to report problems in the books"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import beancount.core.data as bc_data
 

	
 
from typing import (
 
    Any,
 
    Iterable,
 
    Optional,
 
)
 

	
 
from .beancount_types import (
 
    Directive,
 
    MetaKey,
 
    MetaValue,
 
    Posting,
 
    Transaction,
 
    Type,
 
)
 

	
 
Meta = Optional[bc_data.Meta]
 

	
 
class Error(Exception):
 
    def __init__(self, message, entry, source=None):
 
    def __init__(self,
 
                 message: str,
 
                 entry: Optional[Directive],
 
                 source: Meta=None,
 
    ) -> None:
 
        self.message = message
 
        self.entry = entry
 
        self.source = entry.meta if source is None else source
 
        if source:
 
            self.source = source
 
        elif entry is not None:
 
            self.source = entry.meta
 
        else:
 
            self.source = {}
 
            self._fill_source(self.source, '<unknown>')
 

	
 
    def __repr__(self):
 
    def __repr__(self) -> str:
 
        return "{clsname}<{source[filename]}:{source[lineno]}: {message}>".format(
 
            clsname=type(self).__name__,
 
            message=self.message,
 
            source=self.source,
 
        )
 

	
 
    def _fill_source(self, source, filename='conservancy_beancount', lineno=0):
 
    def _fill_source(self,
 
                     source: bc_data.Meta,
 
                     filename: str='conservancy_beancount',
 
                     lineno: int=0,
 
    ) -> None:
 
        source.setdefault('filename', filename)
 
        source.setdefault('lineno', lineno)
 

	
 

	
 
Iter = Iterable[Error]
 

	
 
class BrokenLinkError(Error):
 
    def __init__(self, txn, key, link, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 link: str,
 
                 source: Meta=None,
 
    ) -> None:
 
        super().__init__(
 
            "{} not found in repository: {}".format(key, link),
 
            txn,
 
            source,
 
        )
 

	
 

	
 
class BrokenRTLinkError(Error):
 
    def __init__(self, txn, key, link, parsed=True, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 link: str,
 
                 parsed: Any=True,
 
                 source: Meta=None,
 
    ) -> None:
 
        if parsed:
 
            msg_fmt = "{} not found in RT: {}"
 
        else:
 
            msg_fmt = "{} link is malformed: {}"
 
        super().__init__(
 
            msg_fmt.format(key, link),
 
            txn,
 
            source,
 
        )
 

	
 

	
 
class ConfigurationError(Error):
 
    def __init__(self, message, entry=None, source=None):
 
    def __init__(self,
 
                 message: str,
 
                 entry: Optional[Directive]=None,
 
                 source: Meta=None,
 
    ) -> None:
 
        if source is None:
 
            source = {}
 
        self._fill_source(source)
 
        super().__init__(message, entry, source)
 

	
 

	
 
class InvalidMetadataError(Error):
 
    def __init__(self, txn, key, value=None, post=None, need_type=str, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 value: Optional[MetaValue]=None,
 
                 post: Optional[bc_data.Posting]=None,
 
                 need_type: Type=str,
 
                 source: Meta=None,
 
    ) -> None:
 
        if post is None:
 
            srcname = 'transaction'
 
        else:
 
            srcname = post.account
 
        if value is None:
 
            msg = "{} missing {}".format(srcname, key)
 
        elif isinstance(value, need_type):
 
            msg = "{} has invalid {}: {}".format(srcname, key, value)
 
        else:
 
            msg = "{} has wrong type of {}: expected {} but is a {}".format(
 
                srcname, key, need_type.__name__, type(value).__name__,
 
            )
 
        super().__init__(msg, txn, source)
conservancy_beancount/plugin/core.py
Show inline comments
 
"""Base classes for plugin checks"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import abc
 
import datetime
 
import re
 

	
 
from .. import config as configmod
 
from .. import data
 
from .. import errors as errormod
 

	
 
from typing import (
 
    Any,
 
    Dict,
 
    FrozenSet,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    Optional,
 
    TypeVar,
 
)
 
from ..beancount_types import (
 
    Account,
 
    Directive,
 
    MetaKey,
 
    MetaValue,
 
    MetaValueEnum,
 
    Transaction,
 
    Type,
 
)
 

	
 
### CONSTANTS
 

	
 
# I expect these will become configurable in the future, which is why I'm
 
# keeping them outside of a class, but for now constants will do.
 
DEFAULT_START_DATE: datetime.date = datetime.date(2020, 3, 1)
 
# The default stop date leaves a little room after so it's easy to test
 
# dates past the far end of the range.
 
DEFAULT_STOP_DATE: datetime.date = datetime.date(datetime.MAXYEAR, 1, 1)
 

	
 
### TYPE DEFINITIONS
 

	
 
HookName = str
 

	
 
Entry = TypeVar('Entry', bound=Directive)
 
class Hook(Generic[Entry], metaclass=abc.ABCMeta):
 
    DIRECTIVE: Type[Directive]
 
    HOOK_GROUPS: FrozenSet[HookName] = frozenset()
 

	
 
    def __init__(self, config: configmod.Config) -> None:
 
        pass
 
        # Subclasses that need configuration should override __init__ to check
 
        # and store it.
 

	
 
    @abc.abstractmethod
 
    def run(self, entry: Entry) -> errormod.Iter: ...
 

	
 
    def __init_subclass__(cls):
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]
 
    def __init_subclass__(cls) -> None:
 
        # cls.__orig_bases__ comes from the ABCMeta metaclass
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]  # type:ignore[attr-defined]
 

	
 

	
 
TransactionHook = Hook[Transaction]
 

	
 
### HELPER CLASSES
 

	
 
class LessComparable(metaclass=abc.ABCMeta):
 
    @abc.abstractmethod
 
    def __le__(self, other: Any) -> bool: ...
 

	
 
    @abc.abstractmethod
 
    def __lt__(self, other: Any) -> bool: ...
 

	
 

	
 
CT = TypeVar('CT', bound=LessComparable)
 
class _GenericRange(Generic[CT]):
 
    """Convenience class to check whether a value is within a range.
 

	
 
    `foo in generic_range` is equivalent to `start <= foo < stop`.
 
    Since we have multiple user-configurable ranges, having the check
 
    encapsulated in an object helps implement the check consistently, and
 
    makes it easier for subclasses to override.
 
    """
 

	
 
    def __init__(self, start: CT, stop: CT) -> None:
 
        self.start = start
 
        self.stop = stop
 

	
 
    def __repr__(self) -> str:
 
        return "{clsname}({self.start!r}, {self.stop!r})".format(
 
            clsname=type(self).__name__,
 
            self=self,
 
        )
 

	
 
    def __contains__(self, item: CT) -> bool:
 
        return self.start <= item < self.stop
 

	
 

	
 
class MetadataEnum:
 
    """Map acceptable metadata values to their normalized forms.
 

	
 
    When a piece of metadata uses a set of allowed values, use this class to
 
    define them. You can also specify aliases that hooks will normalize to
 
    the primary values.
 
    """
 

	
 
    def __init__(self,
 
                 key: MetaKey,
 
                 standard_values: Iterable[MetaValueEnum],
 
                 aliases_map: Optional[Mapping[MetaValueEnum, MetaValueEnum]]=None,
 
    ) -> None:
 
        """Specify allowed values and aliases for this metadata.
 

	
 
        Arguments:
 

	
 
        * key: The name of the metadata key that uses this enum.
 
        * standard_values: A sequence of strings that enumerate the standard
 
          values for this metadata.
 
        * aliases_map: A mapping of strings to strings. The keys are
 
          additional allowed metadata values. The values are standard values
 
          that each key will evaluate to. The code asserts that all values are
 
          in standard_values.
 
        """
 
        self.key = key
 
        self._stdvalues = frozenset(standard_values)
 
        self._aliases: Dict[MetaValueEnum, MetaValueEnum] = dict(aliases_map or ())
 
        assert self._stdvalues.issuperset(self._aliases.values())
 
        self._aliases.update((v, v) for v in standard_values)
 

	
 
    def __repr__(self) -> str:
 
        return "{}<{}>".format(type(self).__name__, self.key)
 

	
 
    def __contains__(self, key: MetaValueEnum) -> bool:
 
        """Returns true if `key` is a standard value or alias."""
 
        return key in self._aliases
 

	
 
    def __getitem__(self, key: MetaValueEnum) -> MetaValueEnum:
 
        """Return the standard value for `key`.
 

	
 
        Raises KeyError if `key` is not a known value or alias.
 
        """
 
        return self._aliases[key]
 

	
 
    def __iter__(self) -> Iterator[MetaValueEnum]:
 
        """Iterate over standard values."""
 
        return iter(self._stdvalues)
 

	
 
    def get(self,
 
            key: MetaValueEnum,
 
            default_key: Optional[MetaValueEnum]=None,
 
    ) -> Optional[MetaValueEnum]:
 
        """Return self[key], or a default fallback if that doesn't exist.
 

	
 
        default_key is another key to look up, *not* a default value to return.
 
        This helps ensure you always get a standard value.
 
        """
conservancy_beancount/plugin/meta_project.py
Show inline comments
 
"""meta_project - Validate project metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
from pathlib import Path
 

	
 
import yaml
 
import yaml.error
 

	
 
from . import core
 
from .. import config as configmod
 
from .. import data
 
from .. import errors as errormod
 
from ..beancount_types import (
 
    MetaValueEnum,
 
    Transaction,
 
)
 

	
 
from typing import (
 
    Any,
 
    Dict,
 
    NoReturn,
 
    Optional,
 
    Set,
 
)
 

	
 
class MetaProject(core._NormalizePostingMetadataHook):
 
    DEFAULT_PROJECT = 'Conservancy'
 
    PROJECT_DATA_PATH = Path('Projects', 'project-data.yml')
 
    VALUES_ENUM = core.MetadataEnum('project', {DEFAULT_PROJECT})
 

	
 
    def __init__(self, config: configmod.Config, source_path: Path=PROJECT_DATA_PATH) -> None:
 
        repo_path = config.repository_path()
 
        if repo_path is None:
 
            raise self._config_error("no repository configured")
 
            self._config_error("no repository configured")
 
        project_data_path = repo_path / source_path
 
        source = {'filename': str(project_data_path)}
 
        try:
 
            with project_data_path.open() as yaml_file:
 
                project_data: Dict[str, Dict[str, Any]] = yaml.safe_load(yaml_file)
 
            names: Set[MetaValueEnum] = {self.DEFAULT_PROJECT}
 
            aliases: Dict[MetaValueEnum, MetaValueEnum] = {}
 
            for key, params in project_data.items():
 
                name = params.get('accountName', key)
 
                names.add(name)
 
                human_name = params.get('humanName', name)
 
                if name != human_name:
 
                    aliases[human_name] = name
 
                if name != key:
 
                    aliases[key] = name
 
        except AttributeError:
 
            self._config_error("loaded YAML data not in project-data format", project_data_path)
 
        except OSError as error:
 
            self._config_error(error.strerror, project_data_path)
 
        except yaml.error.YAMLError as error:
 
            self._config_error(error.args[0] or "YAML load error", project_data_path)
 
        else:
 
            self.VALUES_ENUM = core.MetadataEnum(self.METADATA_KEY, names, aliases)
 

	
 
    def _config_error(self, msg: str, filename: Optional[Path]=None):
 
    def _config_error(self, msg: str, filename: Optional[Path]=None) -> NoReturn:
 
        source = {}
 
        if filename is not None:
 
            source['filename'] = str(filename)
 
        raise errormod.ConfigurationError(
 
            "cannot load project data: " + msg,
 
            source=source,
 
        )
 

	
 
    def _run_on_post(self, txn: Transaction, post: data.Posting) -> bool:
 
        return post.account.is_under('Assets', 'Liabilities') is None
 

	
 
    def _default_value(self, txn: Transaction, post: data.Posting) -> MetaValueEnum:
 
        if post.account.is_under(
 
                'Accrued:VacationPayable',
 
                'Expenses:Payroll',
 
        ):
 
            return self.DEFAULT_PROJECT
 
        else:
 
            raise errormod.InvalidMetadataError(txn, self.METADATA_KEY, None, post)
conservancy_beancount/rtutil.py
Show inline comments
...
 
@@ -103,185 +103,186 @@ class RTLinkCache(_LinkCache):
 
                db.close()
 
                db = sqlite3.connect(':memory:', isolation_level=None)
 
                cursor = db.cursor()
 
                cursor.execute('ATTACH DATABASE ? AS readsource',
 
                               ('{}?mode=ro'.format(cache_path.as_uri()),))
 
                cursor.execute(cls.CREATE_TABLE_SQL)
 
                cursor.execute('INSERT INTO RTLinkCache SELECT * FROM readsource.RTLinkCache')
 
                cursor.execute('DETACH DATABASE readsource')
 
            except sqlite3.OperationalError as error:
 
                # We're back to the case of having nothing to read and no way
 
                # to write.
 
                return None
 
        cursor.close()
 
        db.commit()
 
        return db
 

	
 
    def __init__(self, cache_db: sqlite3.Connection) -> None:
 
        self._db = cache_db
 
        self._nourls: Set[TicketAttachmentIds] = set()
 

	
 
    def __iter__(self) -> Iterator[TicketAttachmentIds]:
 
        yield from self._db.execute('SELECT ticket_id, attachment_id FROM RTLinkCache')
 
        yield from self._nourls
 

	
 
    def __len__(self) -> int:
 
        cursor = self._db.execute('SELECT COUNT(*) FROM RTLinkCache')
 
        count: int = cursor.fetchone()[0]
 
        return count + len(self._nourls)
 

	
 
    def __getitem__(self, key: TicketAttachmentIds) -> Optional[str]:
 
        if key in self._nourls:
 
            return None
 
        cursor = self._db.execute(
 
            'SELECT url FROM RTLinkCache WHERE ticket_id = ? AND attachment_id IS ?',
 
            key,
 
        )
 
        row = cursor.fetchone()
 
        if row is None:
 
            raise KeyError(key)
 
        else:
 
            retval: str = row[0]
 
            return retval
 

	
 
    def __setitem__(self, key: TicketAttachmentIds, value: Optional[str]) -> None:
 
        if value is None:
 
            self._nourls.add(key)
 
        else:
 
            ticket_id, attachment_id = key
 
            self._db.execute(
 
                'INSERT INTO RTLinkCache VALUES(?, ?, ?)',
 
                (ticket_id, attachment_id, value),
 
            )
 

	
 
    def __delitem__(self, key: TicketAttachmentIds) -> None:
 
        raise NotImplementedError("RTLinkCache.__delitem__")
 

	
 

	
 
class RT:
 
    """RT utility wrapper class
 

	
 
    Given an RT client object, this class provides common functionality for
 
    working with RT links in Beancount metadata:
 

	
 
    * Parse links
 
    * Verify that they refer to extant objects in RT
 
    * Convert metadata links to RT web links
 
    * Cache results, to reduce network requests.
 
      You can set up an RTLinkCache to cache links to disks over multiple runs.
 
      Refer to RTLinkCache's docstring for details and instructions.
 
    """
 

	
 
    PARSE_REGEXPS = [
 
        re.compile(r'^rt:([0-9]+)(?:/([0-9]+))?/?$'),
 
        re.compile(r'^rt://ticket/([0-9]+)(?:/attachments?/([0-9]+))?/?$'),
 
    ]
 

	
 
    def __init__(self, rt_client: rt.Rt, cache_db: Optional[sqlite3.Connection]=None) -> None:
 
        urlparts = urlparse.urlparse(rt_client.url)
 
        try:
 
            index = urlparts.path.rindex('/REST/')
 
        except ValueError:
 
            base_path = urlparts.path.rstrip('/') + '/'
 
        else:
 
            base_path = urlparts.path[:index + 1]
 
        self.url_base = urlparts._replace(path=base_path)
 
        self.rt = rt_client
 
        self._cache: _LinkCache
 
        if cache_db is None:
 
            self._cache = {}
 
        else:
 
            self._cache = RTLinkCache(cache_db)
 

	
 
    # mypy complains that the first argument isn't self, but this isn't meant
 
    # to be a method, it's just an internal decrator.
 
    def _cache_method(func: _URLLookup) -> _URLLookup:  # type:ignore[misc]
 
        @functools.wraps(func)
 
        def caching_wrapper(self,
 
        def caching_wrapper(self: 'RT',
 
                            ticket_id: RTId,
 
                            attachment_id: Optional[RTId]=None,
 
        ) -> Optional[str]:
 
            cache_key = (str(ticket_id), attachment_id and str(attachment_id))
 
            cache_key = (str(ticket_id),
 
                         None if attachment_id is None else str(attachment_id))
 
            url: Optional[str]
 
            try:
 
                url = self._cache[cache_key]
 
            except KeyError:
 
                if attachment_id is None:
 
                    url = func(self, ticket_id)
 
                else:
 
                    url = func(self, ticket_id, attachment_id)
 
                self._cache[cache_key] = url
 
            return url
 
        return caching_wrapper
 

	
 
    def _extend_url(self,
 
                    path_tail: str,
 
                    fragment: Optional[str]=None,
 
                    **query: str,
 
    ) -> str:
 
        if fragment is None:
 
            fragment = self.url_base.fragment
 
        else:
 
            fragment = urlparse.quote(fragment)
 
        if query:
 
            query_s = urlparse.urlencode(query)
 
        else:
 
            query_s = self.url_base.query
 
        urlparts = self.url_base._replace(
 
            path=self.url_base.path + urlparse.quote(path_tail),
 
            query=query_s,
 
            fragment=fragment,
 
        )
 
        return urlparse.urlunparse(urlparts)
 

	
 
    def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
 
        if txn_id is None:
 
            fragment = None
 
        else:
 
            fragment = 'txn-{}'.format(txn_id)
 
        return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
 

	
 
    @_cache_method
 
    def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
 
        attachment = self.rt.get_attachment(ticket_id, attachment_id)
 
        if attachment is None:
 
            return None
 
        mimetype = attachment.get('ContentType', '')
 
        if mimetype.startswith('text/'):
 
            return self._ticket_url(ticket_id, attachment['Transaction'])
 
        else:
 
            filename = attachment.get('Filename', '')
 
            if not filename:
 
                filename = 'RT{} attachment {}{}'.format(
 
                    ticket_id,
 
                    attachment_id,
 
                    mimetypes.guess_extension(mimetype) or '.bin',
 
                )
 
            path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
 
                attachment,
 
                filename,
 
            )
 
            return self._extend_url(path_tail)
 

	
 
    def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
 
        return self.url(ticket_id, attachment_id) is not None
 

	
 
    @classmethod
 
    def parse(cls, s: str) -> Optional[Tuple[str, Optional[str]]]:
 
        for regexp in cls.PARSE_REGEXPS:
 
            match = regexp.match(s)
 
            if match is not None:
 
                ticket_id, attachment_id = match.groups()
 
                return (ticket_id, attachment_id)
 
        return None
 

	
 
    @_cache_method
 
    def ticket_url(self, ticket_id: RTId) -> Optional[str]:
 
        if self.rt.get_ticket(ticket_id) is None:
 
            return None
 
        return self._ticket_url(ticket_id)
 

	
 
    def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]:
 
        if attachment_id is None:
 
            return self.ticket_url(ticket_id)
 
        else:
 
            return self.attachment_url(ticket_id, attachment_id)
setup.cfg
Show inline comments
 
[aliases]
 
test=pytest
 
typecheck=pytest --addopts="--mypy conservancy_beancount"
 

	
 
[mypy]
 
disallow_any_unimported = True
 
disallow_untyped_defs = True
 
show_error_codes = True
 
strict_equality = True
 
warn_redundant_casts = True
 
warn_return_any = True
 
warn_unreachable = True
 
warn_unused_configs = True
0 comments (0 inline, 0 general)