Changeset - 747ef25da618
[Not reviewed]
0 5 0
Brett Smith - 4 years ago 2020-03-30 03:18:40
brettcsmith@brettcsmith.org
setup: Disallow untyped defs.

Mostly this meant giving annotations to low-value functions like
the error classes and __init_subclass__, but it's worth it for
the future strictness+documentation value.
5 files changed with 71 insertions and 14 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/errors.py
Show inline comments
 
"""Error classes for plugins to report problems in the books"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import beancount.core.data as bc_data
 

	
 
from typing import (
 
    Any,
 
    Iterable,
 
    Optional,
 
)
 

	
 
from .beancount_types import (
 
    Directive,
 
    MetaKey,
 
    MetaValue,
 
    Posting,
 
    Transaction,
 
    Type,
 
)
 

	
 
Meta = Optional[bc_data.Meta]
 

	
 
class Error(Exception):
 
    def __init__(self, message, entry, source=None):
 
    def __init__(self,
 
                 message: str,
 
                 entry: Optional[Directive],
 
                 source: Meta=None,
 
    ) -> None:
 
        self.message = message
 
        self.entry = entry
 
        self.source = entry.meta if source is None else source
 
        if source:
 
            self.source = source
 
        elif entry is not None:
 
            self.source = entry.meta
 
        else:
 
            self.source = {}
 
            self._fill_source(self.source, '<unknown>')
 

	
 
    def __repr__(self):
 
    def __repr__(self) -> str:
 
        return "{clsname}<{source[filename]}:{source[lineno]}: {message}>".format(
 
            clsname=type(self).__name__,
 
            message=self.message,
 
            source=self.source,
 
        )
 

	
 
    def _fill_source(self, source, filename='conservancy_beancount', lineno=0):
 
    def _fill_source(self,
 
                     source: bc_data.Meta,
 
                     filename: str='conservancy_beancount',
 
                     lineno: int=0,
 
    ) -> None:
 
        source.setdefault('filename', filename)
 
        source.setdefault('lineno', lineno)
 

	
 

	
 
Iter = Iterable[Error]
 

	
 
class BrokenLinkError(Error):
 
    def __init__(self, txn, key, link, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 link: str,
 
                 source: Meta=None,
 
    ) -> None:
 
        super().__init__(
 
            "{} not found in repository: {}".format(key, link),
 
            txn,
 
            source,
 
        )
 

	
 

	
 
class BrokenRTLinkError(Error):
 
    def __init__(self, txn, key, link, parsed=True, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 link: str,
 
                 parsed: Any=True,
 
                 source: Meta=None,
 
    ) -> None:
 
        if parsed:
 
            msg_fmt = "{} not found in RT: {}"
 
        else:
 
            msg_fmt = "{} link is malformed: {}"
 
        super().__init__(
 
            msg_fmt.format(key, link),
 
            txn,
 
            source,
 
        )
 

	
 

	
 
class ConfigurationError(Error):
 
    def __init__(self, message, entry=None, source=None):
 
    def __init__(self,
 
                 message: str,
 
                 entry: Optional[Directive]=None,
 
                 source: Meta=None,
 
    ) -> None:
 
        if source is None:
 
            source = {}
 
        self._fill_source(source)
 
        super().__init__(message, entry, source)
 

	
 

	
 
class InvalidMetadataError(Error):
 
    def __init__(self, txn, key, value=None, post=None, need_type=str, source=None):
 
    def __init__(self,
 
                 txn: Transaction,
 
                 key: MetaKey,
 
                 value: Optional[MetaValue]=None,
 
                 post: Optional[bc_data.Posting]=None,
 
                 need_type: Type=str,
 
                 source: Meta=None,
 
    ) -> None:
 
        if post is None:
 
            srcname = 'transaction'
 
        else:
 
            srcname = post.account
 
        if value is None:
 
            msg = "{} missing {}".format(srcname, key)
 
        elif isinstance(value, need_type):
 
            msg = "{} has invalid {}: {}".format(srcname, key, value)
 
        else:
 
            msg = "{} has wrong type of {}: expected {} but is a {}".format(
 
                srcname, key, need_type.__name__, type(value).__name__,
 
            )
 
        super().__init__(msg, txn, source)
conservancy_beancount/plugin/core.py
Show inline comments
...
 
@@ -48,50 +48,51 @@ from ..beancount_types import (
 
# I expect these will become configurable in the future, which is why I'm
 
# keeping them outside of a class, but for now constants will do.
 
DEFAULT_START_DATE: datetime.date = datetime.date(2020, 3, 1)
 
# The default stop date leaves a little room after so it's easy to test
 
# dates past the far end of the range.
 
DEFAULT_STOP_DATE: datetime.date = datetime.date(datetime.MAXYEAR, 1, 1)
 

	
 
### TYPE DEFINITIONS
 

	
 
HookName = str
 

	
 
Entry = TypeVar('Entry', bound=Directive)
 
class Hook(Generic[Entry], metaclass=abc.ABCMeta):
 
    DIRECTIVE: Type[Directive]
 
    HOOK_GROUPS: FrozenSet[HookName] = frozenset()
 

	
 
    def __init__(self, config: configmod.Config) -> None:
 
        pass
 
        # Subclasses that need configuration should override __init__ to check
 
        # and store it.
 

	
 
    @abc.abstractmethod
 
    def run(self, entry: Entry) -> errormod.Iter: ...
 

	
 
    def __init_subclass__(cls):
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]
 
    def __init_subclass__(cls) -> None:
 
        # cls.__orig_bases__ comes from the ABCMeta metaclass
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]  # type:ignore[attr-defined]
 

	
 

	
 
TransactionHook = Hook[Transaction]
 

	
 
### HELPER CLASSES
 

	
 
class LessComparable(metaclass=abc.ABCMeta):
 
    @abc.abstractmethod
 
    def __le__(self, other: Any) -> bool: ...
 

	
 
    @abc.abstractmethod
 
    def __lt__(self, other: Any) -> bool: ...
 

	
 

	
 
CT = TypeVar('CT', bound=LessComparable)
 
class _GenericRange(Generic[CT]):
 
    """Convenience class to check whether a value is within a range.
 

	
 
    `foo in generic_range` is equivalent to `start <= foo < stop`.
 
    Since we have multiple user-configurable ranges, having the check
 
    encapsulated in an object helps implement the check consistently, and
 
    makes it easier for subclasses to override.
 
    """
 

	
conservancy_beancount/plugin/meta_project.py
Show inline comments
...
 
@@ -10,81 +10,82 @@
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
from pathlib import Path
 

	
 
import yaml
 
import yaml.error
 

	
 
from . import core
 
from .. import config as configmod
 
from .. import data
 
from .. import errors as errormod
 
from ..beancount_types import (
 
    MetaValueEnum,
 
    Transaction,
 
)
 

	
 
from typing import (
 
    Any,
 
    Dict,
 
    NoReturn,
 
    Optional,
 
    Set,
 
)
 

	
 
class MetaProject(core._NormalizePostingMetadataHook):
 
    DEFAULT_PROJECT = 'Conservancy'
 
    PROJECT_DATA_PATH = Path('Projects', 'project-data.yml')
 
    VALUES_ENUM = core.MetadataEnum('project', {DEFAULT_PROJECT})
 

	
 
    def __init__(self, config: configmod.Config, source_path: Path=PROJECT_DATA_PATH) -> None:
 
        repo_path = config.repository_path()
 
        if repo_path is None:
 
            raise self._config_error("no repository configured")
 
            self._config_error("no repository configured")
 
        project_data_path = repo_path / source_path
 
        source = {'filename': str(project_data_path)}
 
        try:
 
            with project_data_path.open() as yaml_file:
 
                project_data: Dict[str, Dict[str, Any]] = yaml.safe_load(yaml_file)
 
            names: Set[MetaValueEnum] = {self.DEFAULT_PROJECT}
 
            aliases: Dict[MetaValueEnum, MetaValueEnum] = {}
 
            for key, params in project_data.items():
 
                name = params.get('accountName', key)
 
                names.add(name)
 
                human_name = params.get('humanName', name)
 
                if name != human_name:
 
                    aliases[human_name] = name
 
                if name != key:
 
                    aliases[key] = name
 
        except AttributeError:
 
            self._config_error("loaded YAML data not in project-data format", project_data_path)
 
        except OSError as error:
 
            self._config_error(error.strerror, project_data_path)
 
        except yaml.error.YAMLError as error:
 
            self._config_error(error.args[0] or "YAML load error", project_data_path)
 
        else:
 
            self.VALUES_ENUM = core.MetadataEnum(self.METADATA_KEY, names, aliases)
 

	
 
    def _config_error(self, msg: str, filename: Optional[Path]=None):
 
    def _config_error(self, msg: str, filename: Optional[Path]=None) -> NoReturn:
 
        source = {}
 
        if filename is not None:
 
            source['filename'] = str(filename)
 
        raise errormod.ConfigurationError(
 
            "cannot load project data: " + msg,
 
            source=source,
 
        )
 

	
 
    def _run_on_post(self, txn: Transaction, post: data.Posting) -> bool:
 
        return post.account.is_under('Assets', 'Liabilities') is None
 

	
 
    def _default_value(self, txn: Transaction, post: data.Posting) -> MetaValueEnum:
 
        if post.account.is_under(
 
                'Accrued:VacationPayable',
 
                'Expenses:Payroll',
 
        ):
 
            return self.DEFAULT_PROJECT
 
        else:
 
            raise errormod.InvalidMetadataError(txn, self.METADATA_KEY, None, post)
conservancy_beancount/rtutil.py
Show inline comments
...
 
@@ -175,53 +175,54 @@ class RT:
 
        re.compile(r'^rt:([0-9]+)(?:/([0-9]+))?/?$'),
 
        re.compile(r'^rt://ticket/([0-9]+)(?:/attachments?/([0-9]+))?/?$'),
 
    ]
 

	
 
    def __init__(self, rt_client: rt.Rt, cache_db: Optional[sqlite3.Connection]=None) -> None:
 
        urlparts = urlparse.urlparse(rt_client.url)
 
        try:
 
            index = urlparts.path.rindex('/REST/')
 
        except ValueError:
 
            base_path = urlparts.path.rstrip('/') + '/'
 
        else:
 
            base_path = urlparts.path[:index + 1]
 
        self.url_base = urlparts._replace(path=base_path)
 
        self.rt = rt_client
 
        self._cache: _LinkCache
 
        if cache_db is None:
 
            self._cache = {}
 
        else:
 
            self._cache = RTLinkCache(cache_db)
 

	
 
    # mypy complains that the first argument isn't self, but this isn't meant
 
    # to be a method, it's just an internal decrator.
 
    def _cache_method(func: _URLLookup) -> _URLLookup:  # type:ignore[misc]
 
        @functools.wraps(func)
 
        def caching_wrapper(self,
 
        def caching_wrapper(self: 'RT',
 
                            ticket_id: RTId,
 
                            attachment_id: Optional[RTId]=None,
 
        ) -> Optional[str]:
 
            cache_key = (str(ticket_id), attachment_id and str(attachment_id))
 
            cache_key = (str(ticket_id),
 
                         None if attachment_id is None else str(attachment_id))
 
            url: Optional[str]
 
            try:
 
                url = self._cache[cache_key]
 
            except KeyError:
 
                if attachment_id is None:
 
                    url = func(self, ticket_id)
 
                else:
 
                    url = func(self, ticket_id, attachment_id)
 
                self._cache[cache_key] = url
 
            return url
 
        return caching_wrapper
 

	
 
    def _extend_url(self,
 
                    path_tail: str,
 
                    fragment: Optional[str]=None,
 
                    **query: str,
 
    ) -> str:
 
        if fragment is None:
 
            fragment = self.url_base.fragment
 
        else:
 
            fragment = urlparse.quote(fragment)
 
        if query:
 
            query_s = urlparse.urlencode(query)
 
        else:
setup.cfg
Show inline comments
 
[aliases]
 
test=pytest
 
typecheck=pytest --addopts="--mypy conservancy_beancount"
 

	
 
[mypy]
 
disallow_any_unimported = True
 
disallow_untyped_defs = True
 
show_error_codes = True
 
strict_equality = True
 
warn_redundant_casts = True
 
warn_return_any = True
 
warn_unreachable = True
 
warn_unused_configs = True
0 comments (0 inline, 0 general)