Changeset - 3fbc14d377ac
[Not reviewed]
2 5 2
Brett Smith - 4 years ago 2020-03-15 20:03:57
brettcsmith@brettcsmith.org
Improve organization between modules.

* Rename _typing to beancount_types to better reflect what it is.
* LessComparable isn't a Beancount type, so move that to
plugin.core with its dependent helper classes.
* Errors are a core Beancount concept, so move that module to the
top level and have it include appropriate type definitions.
7 files changed with 35 insertions and 32 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/beancount_types.py
Show inline comments
 
file renamed from conservancy_beancount/_typing.py to conservancy_beancount/beancount_types.py
 
"""Type definitions for Conservancy Beancount code"""
 
"""Type definitions for Beancount data structures"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import abc
 
import datetime
 

	
 
import beancount.core.data as bc_data
 
from .plugin import errors
 

	
 
from typing import (
 
    Any,
 
    FrozenSet,
 
    Iterable,
 
    List,
 
    NamedTuple,
 
    Optional,
 
    Set,
 
    Tuple,
 
    Type,
 
)
 

	
 
Account = bc_data.Account
 
Error = errors._BaseError
 
ErrorIter = Iterable[Error]
 
MetaKey = str
 
MetaValue = Any
 
MetaValueEnum = str
 
Posting = bc_data.Posting
 

	
 
class LessComparable(metaclass=abc.ABCMeta):
 
    @abc.abstractmethod
 
    def __le__(self, other: Any) -> bool: ...
 

	
 
    @abc.abstractmethod
 
    def __lt__(self, other: Any) -> bool: ...
 

	
 

	
 
class Directive(NamedTuple):
 
    meta: bc_data.Meta
 
    date: datetime.date
 

	
 

	
 
class Transaction(Directive):
 
    flag: bc_data.Flag
 
    payee: Optional[str]
 
    narration: str
 
    tags: Set
 
    links: Set
 
    postings: List[Posting]
 

	
 

	
 
ALL_DIRECTIVES: FrozenSet[Type[Directive]] = frozenset([
 
    Transaction,
 
])
conservancy_beancount/errors.py
Show inline comments
 
file renamed from conservancy_beancount/plugin/errors.py to conservancy_beancount/errors.py
 
"""Error classes for plugins to report problems in the books"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
class _BaseError(Exception):
 
from typing import (
 
    Iterable,
 
)
 

	
 
class Error(Exception):
 
    def __init__(self, message, entry, source=None):
 
        self.message = message
 
        self.entry = entry
 
        self.source = entry.meta if source is None else source
 

	
 
    def __repr__(self):
 
        return "{clsname}<{source[filename]}:{source[lineno]}: {message}>".format(
 
            clsname=type(self).__name__,
 
            message=self.message,
 
            source=self.source,
 
        )
 

	
 

	
 
class InvalidMetadataError(_BaseError):
 
Iter = Iterable[Error]
 

	
 
class InvalidMetadataError(Error):
 
    def __init__(self, txn, post, key, value=None, source=None):
 
        if value is None:
 
            msg_fmt = "{post.account} missing {key}"
 
        else:
 
            msg_fmt = "{post.account} has invalid {key}: {value}"
 
        super().__init__(
 
            msg_fmt.format(post=post, key=key, value=value),
 
            txn,
 
            source,
 
        )
conservancy_beancount/plugin/__init__.py
Show inline comments
 
"""Beancount plugin entry point for Conservancy"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import importlib
 

	
 
import beancount.core.data as bc_data
 

	
 
from typing import (
 
    AbstractSet,
 
    Any,
 
    Dict,
 
    List,
 
    Mapping,
 
    Set,
 
    Tuple,
 
    Type,
 
)
 
from .._typing import (
 
from ..beancount_types import (
 
    ALL_DIRECTIVES,
 
    Directive,
 
    Error,
 
)
 
from .core import (
 
    Hook,
 
    HookName,
 
)
 
from ..errors import (
 
    Error,
 
)
 

	
 
__plugins__ = ['run']
 

	
 
class HookRegistry:
 
    def __init__(self) -> None:
 
        self.group_name_map: Dict[HookName, Set[Type[Hook]]] = {
 
            t.__name__: set() for t in ALL_DIRECTIVES
 
        }
 
        self.group_name_map['all'] = set()
 

	
 
    def add_hook(self, hook_cls: Type[Hook]) -> Type[Hook]:
 
        self.group_name_map['all'].add(hook_cls)
 
        self.group_name_map[hook_cls.DIRECTIVE.__name__].add(hook_cls)
 
        for key in hook_cls.HOOK_GROUPS:
 
            self.group_name_map.setdefault(key, set()).add(hook_cls)
 
        return hook_cls  # to allow use as a decorator
 

	
 
    def import_hooks(self, mod_name, *hook_names, package=__module__):
 
        module = importlib.import_module(mod_name, package)
 
        for hook_name in hook_names:
 
            self.add_hook(getattr(module, hook_name))
 

	
 
    def group_by_directive(self, config_str: str='') -> Mapping[HookName, List[Hook]]:
 
        config_str = config_str.strip()
 
        if not config_str:
 
            config_str = 'all'
 
        elif config_str.startswith('-'):
 
            config_str = 'all ' + config_str
 
        available_hooks: Set[Type[Hook]] = set()
 
        for token in config_str.split():
 
            if token.startswith('-'):
 
                update_available = available_hooks.difference_update
 
                key = token[1:]
 
            else:
 
                update_available = available_hooks.update
 
                key = token
 
            try:
 
                update_set = self.group_name_map[key]
 
            except KeyError:
 
                raise ValueError("configuration refers to unknown hooks {!r}".format(key)) from None
 
            else:
 
                update_available(update_set)
 
        return {
 
            t.__name__: [hook() for hook in self.group_name_map[t.__name__] & available_hooks]
 
            for t in ALL_DIRECTIVES
 
        }
 

	
 

	
conservancy_beancount/plugin/core.py
Show inline comments
 
"""Base classes for plugin checks"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import abc
 
import datetime
 
import re
 

	
 
from . import errors as errormod
 
from .. import errors as errormod
 

	
 
from typing import (
 
    Any,
 
    FrozenSet,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    Optional,
 
    TypeVar,
 
)
 
from .._typing import (
 
from ..beancount_types import (
 
    Account,
 
    Directive,
 
    Error,
 
    ErrorIter,
 
    LessComparable,
 
    MetaKey,
 
    MetaValue,
 
    MetaValueEnum,
 
    Posting,
 
    Transaction,
 
    Type,
 
)
 

	
 
### CONSTANTS
 

	
 
# I expect these will become configurable in the future, which is why I'm
 
# keeping them outside of a class, but for now constants will do.
 
DEFAULT_START_DATE: datetime.date = datetime.date(2020, 3, 1)
 
# The default stop date leaves a little room after so it's easy to test
 
# dates past the far end of the range.
 
DEFAULT_STOP_DATE: datetime.date = datetime.date(datetime.MAXYEAR, 1, 1)
 

	
 
### TYPE DEFINITIONS
 

	
 
HookName = str
 

	
 
Entry = TypeVar('Entry', bound=Directive)
 
class Hook(Generic[Entry], metaclass=abc.ABCMeta):
 
    DIRECTIVE: Type[Directive]
 
    HOOK_GROUPS: FrozenSet[HookName] = frozenset()
 

	
 
    @abc.abstractmethod
 
    def run(self, entry: Entry) -> ErrorIter: ...
 
    def run(self, entry: Entry) -> errormod.Iter: ...
 

	
 
    def __init_subclass__(cls):
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]
 

	
 

	
 
TransactionHook = Hook[Transaction]
 

	
 
### HELPER CLASSES
 

	
 
class LessComparable(metaclass=abc.ABCMeta):
 
    @abc.abstractmethod
 
    def __le__(self, other: Any) -> bool: ...
 

	
 
    @abc.abstractmethod
 
    def __lt__(self, other: Any) -> bool: ...
 

	
 

	
 
CT = TypeVar('CT', bound=LessComparable)
 
class _GenericRange(Generic[CT]):
 
    """Convenience class to check whether a value is within a range.
 

	
 
    `foo in generic_range` is equivalent to `start <= foo < stop`.
 
    Since we have multiple user-configurable ranges, having the check
 
    encapsulated in an object helps implement the check consistently, and
 
    makes it easier for subclasses to override.
 
    """
 

	
 
    def __init__(self, start: CT, stop: CT) -> None:
 
        self.start = start
 
        self.stop = stop
 

	
 
    def __repr__(self) -> str:
 
        return "{clsname}({self.start!r}, {self.stop!r})".format(
 
            clsname=type(self).__name__,
 
            self=self,
 
        )
 

	
 
    def __contains__(self, item: CT) -> bool:
 
        return self.start <= item < self.stop
 

	
 

	
 
class MetadataEnum:
 
    """Map acceptable metadata values to their normalized forms.
 

	
 
    When a piece of metadata uses a set of allowed values, use this class to
 
    define them. You can also specify aliases that hooks will normalize to
 
    the primary values.
 
    """
 

	
 
    def __init__(self,
 
                 key: MetaKey,
 
                 standard_values: Iterable[MetaValueEnum],
 
                 aliases_map: Mapping[MetaValueEnum, MetaValueEnum],
 
    ) -> None:
 
        """Specify allowed values and aliases for this metadata.
 

	
 
        Arguments:
 

	
 
        * key: The name of the metadata key that uses this enum.
 
        * standard_values: A sequence of strings that enumerate the standard
 
          values for this metadata.
 
        * aliases_map: A mapping of strings to strings. The keys are
 
          additional allowed metadata values. The values are standard values
 
          that each key will evaluate to. The code asserts that all values are
 
          in standard_values.
...
 
@@ -155,97 +161,97 @@ class MetadataEnum:
 
        This helps ensure you always get a standard value.
 
        """
 
        try:
 
            return self[key]
 
        except KeyError:
 
            if default_key is None:
 
                return None
 
            else:
 
                return self[default_key]
 

	
 

	
 
### HOOK SUBCLASSES
 

	
 
class _PostingHook(TransactionHook, metaclass=abc.ABCMeta):
 
    TXN_DATE_RANGE: _GenericRange = _GenericRange(DEFAULT_START_DATE, DEFAULT_STOP_DATE)
 

	
 
    def __init_subclass__(cls) -> None:
 
        cls.HOOK_GROUPS = cls.HOOK_GROUPS.union(['posting'])
 

	
 
    def _meta_get(self,
 
                  txn: Transaction,
 
                  post: Posting,
 
                  key: MetaKey,
 
                  default: MetaValue=None,
 
    ) -> MetaValue:
 
        if post.meta and key in post.meta:
 
            return post.meta[key]
 
        else:
 
            return txn.meta.get(key, default)
 

	
 
    def _meta_set(self,
 
                  txn: Transaction,
 
                  post: Posting,
 
                  post_index: int,
 
                  key: MetaKey,
 
                  value: MetaValue,
 
    ) -> None:
 
        if post.meta is None:
 
            txn.postings[post_index] = Posting(*post[:5], {key: value})
 
        else:
 
            post.meta[key] = value
 

	
 
    def _run_on_txn(self, txn: Transaction) -> bool:
 
        return txn.date in self.TXN_DATE_RANGE
 

	
 
    def _run_on_post(self, txn: Transaction, post: Posting) -> bool:
 
        return True
 

	
 
    def run(self, txn: Transaction) -> ErrorIter:
 
    def run(self, txn: Transaction) -> errormod.Iter:
 
        if self._run_on_txn(txn):
 
            for index, post in enumerate(txn.postings):
 
                if self._run_on_post(txn, post):
 
                    yield from self.post_run(txn, post, index)
 

	
 
    @abc.abstractmethod
 
    def post_run(self, txn: Transaction, post: Posting, post_index: int) -> ErrorIter: ...
 
    def post_run(self, txn: Transaction, post: Posting, post_index: int) -> errormod.Iter: ...
 

	
 

	
 
class _NormalizePostingMetadataHook(_PostingHook):
 
    """Base class to normalize posting metadata from an enum."""
 
    # This class provides basic functionality to filter postings, normalize
 
    # metadata values, and set default values.
 
    METADATA_KEY: MetaKey
 
    VALUES_ENUM: MetadataEnum
 

	
 
    def __init_subclass__(cls) -> None:
 
        super().__init_subclass__()
 
        cls.METADATA_KEY = cls.VALUES_ENUM.key
 
        cls.HOOK_GROUPS = cls.HOOK_GROUPS.union(['metadata', cls.METADATA_KEY])
 

	
 
    # If the posting does not specify METADATA_KEY, the hook calls
 
    # _default_value to get a default. This method should either return
 
    # a value string from METADATA_ENUM, or else raise InvalidMetadataError.
 
    # This base implementation does the latter.
 
    def _default_value(self, txn: Transaction, post: Posting) -> MetaValueEnum:
 
        raise errormod.InvalidMetadataError(txn, post, self.METADATA_KEY)
 

	
 
    def post_run(self, txn: Transaction, post: Posting, post_index: int) -> ErrorIter:
 
    def post_run(self, txn: Transaction, post: Posting, post_index: int) -> errormod.Iter:
 
        source_value = self._meta_get(txn, post, self.METADATA_KEY)
 
        set_value = source_value
 
        error: Optional[Error] = None
 
        error: Optional[errormod.Error] = None
 
        if source_value is None:
 
            try:
 
                set_value = self._default_value(txn, post)
 
            except errormod._BaseError as error_:
 
            except errormod.Error as error_:
 
                error = error_
 
        else:
 
            try:
 
                set_value = self.VALUES_ENUM[source_value]
 
            except KeyError:
 
                error = errormod.InvalidMetadataError(
 
                    txn, post, self.METADATA_KEY, source_value,
 
                )
 
        if error is None:
 
            self._meta_set(txn, post, post_index, self.METADATA_KEY, set_value)
 
        else:
 
            yield error
conservancy_beancount/plugin/meta_expense_allocation.py
Show inline comments
 
"""meta_expense_allocation - Validate expense-allocation metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
from . import core
 
from .._typing import (
 
from ..beancount_types import (
 
    MetaValueEnum,
 
    Posting,
 
    Transaction,
 
)
 

	
 
class MetaExpenseAllocation(core._NormalizePostingMetadataHook):
 
    VALUES_ENUM = core.MetadataEnum('expense-allocation', {
 
        'administration',
 
        'fundraising',
 
        'program',
 
    }, {
 
        'admin': 'administration',
 
    })
 
    DEFAULT_VALUES = {
 
        'Expenses:Services:Accounting': VALUES_ENUM['administration'],
 
        'Expenses:Services:Administration': VALUES_ENUM['administration'],
 
        'Expenses:Services:Fundraising': VALUES_ENUM['fundraising'],
 
    }
 

	
 
    def _run_on_post(self, txn: Transaction, post: Posting) -> bool:
 
        return post.account.startswith('Expenses:')
 

	
 
    def _default_value(self, txn: Transaction, post: Posting) -> MetaValueEnum:
 
        return self.DEFAULT_VALUES.get(post.account, 'program')
conservancy_beancount/plugin/meta_tax_implication.py
Show inline comments
 
"""meta_tax_implication - Validate tax-implication metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import decimal
 

	
 
from . import core
 
from .._typing import (
 
from ..beancount_types import (
 
    Posting,
 
    Transaction,
 
)
 

	
 
DEFAULT_STOP_AMOUNT = decimal.Decimal(0)
 

	
 
class MetaTaxImplication(core._NormalizePostingMetadataHook):
 
    VALUES_ENUM = core.MetadataEnum('tax-implication', [
 
        '1099',
 
        'Accountant-Advises-No-1099',
 
        'Bank-Transfer',
 
        'Foreign-Corporation',
 
        'Foreign-Individual-Contractor',
 
        'Fraud',
 
        'HSA-Contribution',
 
        'Loan',
 
        'Payroll',
 
        'Refund',
 
        'Reimbursement',
 
        'Retirement-Pretax',
 
        'Tax-Payment',
 
        'USA-501c3',
 
        'USA-Corporation',
 
        'USA-LLC-No-1099',
 
        'W2',
 
    ], {})
 

	
 
    def _run_on_post(self, txn: Transaction, post: Posting) -> bool:
 
        return bool(
 
            post.account.startswith('Assets:')
 
            and post.units.number
 
            and post.units.number < DEFAULT_STOP_AMOUNT
 
        )
tests/test_plugin_run.py
Show inline comments
 
"""Test main plugin run loop"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import plugin, _typing
 
from conservancy_beancount import beancount_types, plugin
 

	
 
CONFIG_MAP = {}
 
HOOK_REGISTRY = plugin.HookRegistry()
 

	
 
@HOOK_REGISTRY.add_hook
 
class TransactionCounter:
 
    DIRECTIVE = _typing.Transaction
 
    DIRECTIVE = beancount_types.Transaction
 
    HOOK_GROUPS = frozenset()
 

	
 
    def run(self, txn):
 
        return ['txn:{}'.format(id(txn))]
 

	
 

	
 
@HOOK_REGISTRY.add_hook
 
class PostingCounter(TransactionCounter):
 
    DIRECTIVE = _typing.Transaction
 
    DIRECTIVE = beancount_types.Transaction
 
    HOOK_GROUPS = frozenset(['posting'])
 

	
 
    def run(self, txn):
 
        return ['post:{}'.format(id(post)) for post in txn.postings]
 

	
 

	
 
def map_errors(errors):
 
    retval = {}
 
    for errkey in errors:
 
        key, _, errid = errkey.partition(':')
 
        retval.setdefault(key, set()).add(errid)
 
    return retval
 

	
 
def test_with_multiple_hooks():
 
    in_entries = [
 
        testutil.Transaction(postings=[
 
            ('Income:Donations', -25),
 
            ('Assets:Cash', 25),
 
        ]),
 
        testutil.Transaction(postings=[
 
            ('Expenses:General', 10),
 
            ('Liabilites:CreditCard', -10),
 
        ]),
 
    ]
 
    out_entries, errors = plugin.run(in_entries, CONFIG_MAP, '', HOOK_REGISTRY)
 
    assert len(out_entries) == 2
 
    errmap = map_errors(errors)
 
    assert len(errmap.get('txn', '')) == 2
 
    assert len(errmap.get('post', '')) == 4
 

	
 
def test_with_posting_hooks_only():
 
    in_entries = [
 
        testutil.Transaction(postings=[
 
            ('Income:Donations', -25),
 
            ('Assets:Cash', 25),
 
        ]),
 
        testutil.Transaction(postings=[
 
            ('Expenses:General', 10),
 
            ('Liabilites:CreditCard', -10),
 
        ]),
 
    ]
 
    out_entries, errors = plugin.run(in_entries, CONFIG_MAP, 'posting', HOOK_REGISTRY)
 
    assert len(out_entries) == 2
 
    errmap = map_errors(errors)
 
    assert len(errmap.get('txn', '')) == 0
 
    assert len(errmap.get('post', '')) == 4
0 comments (0 inline, 0 general)