Changeset - 747ef25da618
[Not reviewed]
0 5 0
Brett Smith - 4 years ago 2020-03-30 03:18:40
brettcsmith@brettcsmith.org
setup: Disallow untyped defs.

Mostly this meant giving annotations to low-value functions like
the error classes and __init_subclass__, but it's worth it for
the future strictness+documentation value.
5 files changed with 71 insertions and 14 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/errors.py
Show inline comments
 
"""Error classes for plugins to report problems in the books"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import beancount.core.data as bc_data
 

	
 
from typing import (
 
    Any,
 
    Iterable,
 
    Optional,
 
)
 

	
 
from .beancount_types import (
 
    Directive,
 
    MetaKey,
 
    MetaValue,
 
    Posting,
 
    Transaction,
 
    Type,
 
)
 

	
 
Meta = Optional[bc_data.Meta]
 

	
 
class Error(Exception):
 
    def __init__(self, message, entry, source=None):
 
    def __init__(self,
 
                 message: str,
 
                 entry: Optional[Directive],
 
                 source: Meta=None,
 
    ) -> None:
 
        self.message = message
 
        self.entry = entry
 
        self.source = entry.meta if source is None else source
 
        if source:
 
            self.source = source
 
        elif entry is not None:
 
            self.source = entry.meta
 
        else:
 
            self.source = {}
 
            self._fill_source(self.source, '<unknown>')
 

	
 
    def __repr__(self):
 
    def __repr__(self) -> str:
 
        return "{clsname}<{source[filename]}:{source[lineno]}: {message}>".format(
 
            clsname=type(self).__name__,
 
            message=self.message,
 
            source=self.source,
 
        )
 

	
 
    def _fill_source(self, source, filename='conservancy_beancount', lineno=0):
 
    def _fill_source(self,
 
                     source: bc_data.Meta,
 
                     filename: str='conservancy_beancount',
 
                     lineno: int=0,
 
    ) -> None:
 
        source.setdefault('filename', filename)
 
        source.setdefault('lineno', lineno)
 

	
 

	
 
Iter = Iterable[Error]
 

	
 
class BrokenLinkError(Error):
 
    def __init__(self, txn, key, link, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 link: str,
 
                 source: Meta=None,
 
    ) -> None:
 
        super().__init__(
 
            "{} not found in repository: {}".format(key, link),
 
            txn,
 
            source,
 
        )
 

	
 

	
 
class BrokenRTLinkError(Error):
 
    def __init__(self, txn, key, link, parsed=True, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 link: str,
 
                 parsed: Any=True,
 
                 source: Meta=None,
 
    ) -> None:
 
        if parsed:
 
            msg_fmt = "{} not found in RT: {}"
 
        else:
 
            msg_fmt = "{} link is malformed: {}"
 
        super().__init__(
 
            msg_fmt.format(key, link),
 
            txn,
 
            source,
 
        )
 

	
 

	
 
class ConfigurationError(Error):
 
    def __init__(self, message, entry=None, source=None):
 
    def __init__(self,
 
                 message: str,
 
                 entry: Optional[Directive]=None,
 
                 source: Meta=None,
 
    ) -> None:
 
        if source is None:
 
            source = {}
 
        self._fill_source(source)
 
        super().__init__(message, entry, source)
 

	
 

	
 
class InvalidMetadataError(Error):
 
    def __init__(self, txn, key, value=None, post=None, need_type=str, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 value: Optional[MetaValue]=None,
 
                 post: Optional[bc_data.Posting]=None,
 
                 need_type: Type=str,
 
                 source: Meta=None,
 
    ) -> None:
 
        if post is None:
 
            srcname = 'transaction'
 
        else:
 
            srcname = post.account
 
        if value is None:
 
            msg = "{} missing {}".format(srcname, key)
 
        elif isinstance(value, need_type):
 
            msg = "{} has invalid {}: {}".format(srcname, key, value)
 
        else:
 
            msg = "{} has wrong type of {}: expected {} but is a {}".format(
 
                srcname, key, need_type.__name__, type(value).__name__,
 
            )
 
        super().__init__(msg, txn, source)
conservancy_beancount/plugin/core.py
Show inline comments
...
 
@@ -8,130 +8,131 @@
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import abc
 
import datetime
 
import re
 

	
 
from .. import config as configmod
 
from .. import data
 
from .. import errors as errormod
 

	
 
from typing import (
 
    Any,
 
    Dict,
 
    FrozenSet,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    Optional,
 
    TypeVar,
 
)
 
from ..beancount_types import (
 
    Account,
 
    Directive,
 
    MetaKey,
 
    MetaValue,
 
    MetaValueEnum,
 
    Transaction,
 
    Type,
 
)
 

	
 
### CONSTANTS
 

	
 
# I expect these will become configurable in the future, which is why I'm
 
# keeping them outside of a class, but for now constants will do.
 
DEFAULT_START_DATE: datetime.date = datetime.date(2020, 3, 1)
 
# The default stop date leaves a little room after so it's easy to test
 
# dates past the far end of the range.
 
DEFAULT_STOP_DATE: datetime.date = datetime.date(datetime.MAXYEAR, 1, 1)
 

	
 
### TYPE DEFINITIONS
 

	
 
HookName = str
 

	
 
Entry = TypeVar('Entry', bound=Directive)
 
class Hook(Generic[Entry], metaclass=abc.ABCMeta):
 
    DIRECTIVE: Type[Directive]
 
    HOOK_GROUPS: FrozenSet[HookName] = frozenset()
 

	
 
    def __init__(self, config: configmod.Config) -> None:
 
        pass
 
        # Subclasses that need configuration should override __init__ to check
 
        # and store it.
 

	
 
    @abc.abstractmethod
 
    def run(self, entry: Entry) -> errormod.Iter: ...
 

	
 
    def __init_subclass__(cls):
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]
 
    def __init_subclass__(cls) -> None:
 
        # cls.__orig_bases__ comes from the ABCMeta metaclass
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]  # type:ignore[attr-defined]
 

	
 

	
 
TransactionHook = Hook[Transaction]
 

	
 
### HELPER CLASSES
 

	
 
class LessComparable(metaclass=abc.ABCMeta):
 
    @abc.abstractmethod
 
    def __le__(self, other: Any) -> bool: ...
 

	
 
    @abc.abstractmethod
 
    def __lt__(self, other: Any) -> bool: ...
 

	
 

	
 
CT = TypeVar('CT', bound=LessComparable)
 
class _GenericRange(Generic[CT]):
 
    """Convenience class to check whether a value is within a range.
 

	
 
    `foo in generic_range` is equivalent to `start <= foo < stop`.
 
    Since we have multiple user-configurable ranges, having the check
 
    encapsulated in an object helps implement the check consistently, and
 
    makes it easier for subclasses to override.
 
    """
 

	
 
    def __init__(self, start: CT, stop: CT) -> None:
 
        self.start = start
 
        self.stop = stop
 

	
 
    def __repr__(self) -> str:
 
        return "{clsname}({self.start!r}, {self.stop!r})".format(
 
            clsname=type(self).__name__,
 
            self=self,
 
        )
 

	
 
    def __contains__(self, item: CT) -> bool:
 
        return self.start <= item < self.stop
 

	
 

	
 
class MetadataEnum:
 
    """Map acceptable metadata values to their normalized forms.
 

	
 
    When a piece of metadata uses a set of allowed values, use this class to
 
    define them. You can also specify aliases that hooks will normalize to
 
    the primary values.
 
    """
 

	
 
    def __init__(self,
 
                 key: MetaKey,
 
                 standard_values: Iterable[MetaValueEnum],
 
                 aliases_map: Optional[Mapping[MetaValueEnum, MetaValueEnum]]=None,
 
    ) -> None:
 
        """Specify allowed values and aliases for this metadata.
 

	
 
        Arguments:
 

	
 
        * key: The name of the metadata key that uses this enum.
 
        * standard_values: A sequence of strings that enumerate the standard
 
          values for this metadata.
 
        * aliases_map: A mapping of strings to strings. The keys are
 
          additional allowed metadata values. The values are standard values
 
          that each key will evaluate to. The code asserts that all values are
 
          in standard_values.
 
        """
 
        self.key = key
conservancy_beancount/plugin/meta_project.py
Show inline comments
 
"""meta_project - Validate project metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
from pathlib import Path
 

	
 
import yaml
 
import yaml.error
 

	
 
from . import core
 
from .. import config as configmod
 
from .. import data
 
from .. import errors as errormod
 
from ..beancount_types import (
 
    MetaValueEnum,
 
    Transaction,
 
)
 

	
 
from typing import (
 
    Any,
 
    Dict,
 
    NoReturn,
 
    Optional,
 
    Set,
 
)
 

	
 
class MetaProject(core._NormalizePostingMetadataHook):
 
    DEFAULT_PROJECT = 'Conservancy'
 
    PROJECT_DATA_PATH = Path('Projects', 'project-data.yml')
 
    VALUES_ENUM = core.MetadataEnum('project', {DEFAULT_PROJECT})
 

	
 
    def __init__(self, config: configmod.Config, source_path: Path=PROJECT_DATA_PATH) -> None:
 
        repo_path = config.repository_path()
 
        if repo_path is None:
 
            raise self._config_error("no repository configured")
 
            self._config_error("no repository configured")
 
        project_data_path = repo_path / source_path
 
        source = {'filename': str(project_data_path)}
 
        try:
 
            with project_data_path.open() as yaml_file:
 
                project_data: Dict[str, Dict[str, Any]] = yaml.safe_load(yaml_file)
 
            names: Set[MetaValueEnum] = {self.DEFAULT_PROJECT}
 
            aliases: Dict[MetaValueEnum, MetaValueEnum] = {}
 
            for key, params in project_data.items():
 
                name = params.get('accountName', key)
 
                names.add(name)
 
                human_name = params.get('humanName', name)
 
                if name != human_name:
 
                    aliases[human_name] = name
 
                if name != key:
 
                    aliases[key] = name
 
        except AttributeError:
 
            self._config_error("loaded YAML data not in project-data format", project_data_path)
 
        except OSError as error:
 
            self._config_error(error.strerror, project_data_path)
 
        except yaml.error.YAMLError as error:
 
            self._config_error(error.args[0] or "YAML load error", project_data_path)
 
        else:
 
            self.VALUES_ENUM = core.MetadataEnum(self.METADATA_KEY, names, aliases)
 

	
 
    def _config_error(self, msg: str, filename: Optional[Path]=None):
 
    def _config_error(self, msg: str, filename: Optional[Path]=None) -> NoReturn:
 
        source = {}
 
        if filename is not None:
 
            source['filename'] = str(filename)
 
        raise errormod.ConfigurationError(
 
            "cannot load project data: " + msg,
 
            source=source,
 
        )
 

	
 
    def _run_on_post(self, txn: Transaction, post: data.Posting) -> bool:
 
        return post.account.is_under('Assets', 'Liabilities') is None
 

	
 
    def _default_value(self, txn: Transaction, post: data.Posting) -> MetaValueEnum:
 
        if post.account.is_under(
 
                'Accrued:VacationPayable',
 
                'Expenses:Payroll',
 
        ):
 
            return self.DEFAULT_PROJECT
 
        else:
 
            raise errormod.InvalidMetadataError(txn, self.METADATA_KEY, None, post)
conservancy_beancount/rtutil.py
Show inline comments
...
 
@@ -135,133 +135,134 @@ class RTLinkCache(_LinkCache):
 
        cursor = self._db.execute(
 
            'SELECT url FROM RTLinkCache WHERE ticket_id = ? AND attachment_id IS ?',
 
            key,
 
        )
 
        row = cursor.fetchone()
 
        if row is None:
 
            raise KeyError(key)
 
        else:
 
            retval: str = row[0]
 
            return retval
 

	
 
    def __setitem__(self, key: TicketAttachmentIds, value: Optional[str]) -> None:
 
        if value is None:
 
            self._nourls.add(key)
 
        else:
 
            ticket_id, attachment_id = key
 
            self._db.execute(
 
                'INSERT INTO RTLinkCache VALUES(?, ?, ?)',
 
                (ticket_id, attachment_id, value),
 
            )
 

	
 
    def __delitem__(self, key: TicketAttachmentIds) -> None:
 
        raise NotImplementedError("RTLinkCache.__delitem__")
 

	
 

	
 
class RT:
 
    """RT utility wrapper class
 

	
 
    Given an RT client object, this class provides common functionality for
 
    working with RT links in Beancount metadata:
 

	
 
    * Parse links
 
    * Verify that they refer to extant objects in RT
 
    * Convert metadata links to RT web links
 
    * Cache results, to reduce network requests.
 
      You can set up an RTLinkCache to cache links to disks over multiple runs.
 
      Refer to RTLinkCache's docstring for details and instructions.
 
    """
 

	
 
    PARSE_REGEXPS = [
 
        re.compile(r'^rt:([0-9]+)(?:/([0-9]+))?/?$'),
 
        re.compile(r'^rt://ticket/([0-9]+)(?:/attachments?/([0-9]+))?/?$'),
 
    ]
 

	
 
    def __init__(self, rt_client: rt.Rt, cache_db: Optional[sqlite3.Connection]=None) -> None:
 
        urlparts = urlparse.urlparse(rt_client.url)
 
        try:
 
            index = urlparts.path.rindex('/REST/')
 
        except ValueError:
 
            base_path = urlparts.path.rstrip('/') + '/'
 
        else:
 
            base_path = urlparts.path[:index + 1]
 
        self.url_base = urlparts._replace(path=base_path)
 
        self.rt = rt_client
 
        self._cache: _LinkCache
 
        if cache_db is None:
 
            self._cache = {}
 
        else:
 
            self._cache = RTLinkCache(cache_db)
 

	
 
    # mypy complains that the first argument isn't self, but this isn't meant
 
    # to be a method, it's just an internal decrator.
 
    def _cache_method(func: _URLLookup) -> _URLLookup:  # type:ignore[misc]
 
        @functools.wraps(func)
 
        def caching_wrapper(self,
 
        def caching_wrapper(self: 'RT',
 
                            ticket_id: RTId,
 
                            attachment_id: Optional[RTId]=None,
 
        ) -> Optional[str]:
 
            cache_key = (str(ticket_id), attachment_id and str(attachment_id))
 
            cache_key = (str(ticket_id),
 
                         None if attachment_id is None else str(attachment_id))
 
            url: Optional[str]
 
            try:
 
                url = self._cache[cache_key]
 
            except KeyError:
 
                if attachment_id is None:
 
                    url = func(self, ticket_id)
 
                else:
 
                    url = func(self, ticket_id, attachment_id)
 
                self._cache[cache_key] = url
 
            return url
 
        return caching_wrapper
 

	
 
    def _extend_url(self,
 
                    path_tail: str,
 
                    fragment: Optional[str]=None,
 
                    **query: str,
 
    ) -> str:
 
        if fragment is None:
 
            fragment = self.url_base.fragment
 
        else:
 
            fragment = urlparse.quote(fragment)
 
        if query:
 
            query_s = urlparse.urlencode(query)
 
        else:
 
            query_s = self.url_base.query
 
        urlparts = self.url_base._replace(
 
            path=self.url_base.path + urlparse.quote(path_tail),
 
            query=query_s,
 
            fragment=fragment,
 
        )
 
        return urlparse.urlunparse(urlparts)
 

	
 
    def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
 
        if txn_id is None:
 
            fragment = None
 
        else:
 
            fragment = 'txn-{}'.format(txn_id)
 
        return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
 

	
 
    @_cache_method
 
    def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
 
        attachment = self.rt.get_attachment(ticket_id, attachment_id)
 
        if attachment is None:
 
            return None
 
        mimetype = attachment.get('ContentType', '')
 
        if mimetype.startswith('text/'):
 
            return self._ticket_url(ticket_id, attachment['Transaction'])
 
        else:
 
            filename = attachment.get('Filename', '')
 
            if not filename:
 
                filename = 'RT{} attachment {}{}'.format(
 
                    ticket_id,
 
                    attachment_id,
 
                    mimetypes.guess_extension(mimetype) or '.bin',
 
                )
 
            path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
 
                attachment,
 
                filename,
 
            )
 
            return self._extend_url(path_tail)
 

	
 
    def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
 
        return self.url(ticket_id, attachment_id) is not None
 

	
setup.cfg
Show inline comments
 
[aliases]
 
test=pytest
 
typecheck=pytest --addopts="--mypy conservancy_beancount"
 

	
 
[mypy]
 
disallow_any_unimported = True
 
disallow_untyped_defs = True
 
show_error_codes = True
 
strict_equality = True
 
warn_redundant_casts = True
 
warn_return_any = True
 
warn_unreachable = True
 
warn_unused_configs = True
0 comments (0 inline, 0 general)