Changeset - 0d370c445b9e
[Not reviewed]
0 8 0
Brett Smith - 4 years ago 2020-03-19 21:23:27
brettcsmith@brettcsmith.org
plugin: User configuration is passed to hooks on initialization.
8 files changed with 62 insertions and 9 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/errors.py
Show inline comments
 
"""Error classes for plugins to report problems in the books"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
from typing import (
 
    Iterable,
 
)
 

	
 
class Error(Exception):
 
    def __init__(self, message, entry, source=None):
 
        self.message = message
 
        self.entry = entry
 
        self.source = entry.meta if source is None else source
 

	
 
    def __repr__(self):
 
        return "{clsname}<{source[filename]}:{source[lineno]}: {message}>".format(
 
            clsname=type(self).__name__,
 
            message=self.message,
 
            source=self.source,
 
        )
 

	
 
    def _fill_source(self, source, filename='conservancy_beancount', lineno=0):
 
        source.setdefault('filename', filename)
 
        source.setdefault('lineno', lineno)
 

	
 

	
 
Iter = Iterable[Error]
 

	
 
class ConfigurationError(Error):
 
    def __init__(self, message, entry=None, source=None):
 
        if source is None:
 
            source = {}
 
        self._fill_source(source)
 
        super().__init__(message, entry, source)
 

	
 

	
 
class InvalidMetadataError(Error):
 
    def __init__(self, txn, post, key, value=None, source=None):
 
        if value is None:
 
            msg_fmt = "{post.account} missing {key}"
 
        else:
 
            msg_fmt = "{post.account} has invalid {key}: {value}"
 
        super().__init__(
 
            msg_fmt.format(post=post, key=key, value=value),
 
            txn,
 
            source,
 
        )
conservancy_beancount/plugin/__init__.py
Show inline comments
 
"""Beancount plugin entry point for Conservancy"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import importlib
 

	
 
import beancount.core.data as bc_data
 

	
 
from typing import (
 
    AbstractSet,
 
    Any,
 
    Dict,
 
    Iterable,
 
    List,
 
    Set,
 
    Tuple,
 
    Type,
 
)
 
from ..beancount_types import (
 
    ALL_DIRECTIVES,
 
    Directive,
 
)
 
from .. import config as configmod
 
from .core import (
 
    Hook,
 
    HookName,
 
)
 
from ..errors import (
 
    Error,
 
)
 

	
 
__plugins__ = ['run']
 

	
 
class HookRegistry:
 
    def __init__(self) -> None:
 
        self.group_name_map: Dict[HookName, Set[Type[Hook]]] = {
 
            t.__name__: set() for t in ALL_DIRECTIVES
 
        }
 
        self.group_name_map['all'] = set()
 

	
 
    def add_hook(self, hook_cls: Type[Hook]) -> Type[Hook]:
 
        self.group_name_map['all'].add(hook_cls)
 
        self.group_name_map[hook_cls.DIRECTIVE.__name__].add(hook_cls)
 
        for key in hook_cls.HOOK_GROUPS:
 
            self.group_name_map.setdefault(key, set()).add(hook_cls)
 
        return hook_cls  # to allow use as a decorator
 

	
 
    # This method is too dynamic to typecheck.
 
    def import_hooks(self, mod_name, *hook_names, package=__module__):  # type:ignore
 
        module = importlib.import_module(mod_name, package)
 
        for hook_name in hook_names:
 
            self.add_hook(getattr(module, hook_name))
 

	
 
    def group_by_directive(self, config_str: str='') -> Iterable[Tuple[HookName, Type[Hook]]]:
 
        config_str = config_str.strip()
 
        if not config_str:
 
            config_str = 'all'
 
        elif config_str.startswith('-'):
 
            config_str = 'all ' + config_str
 
        available_hooks: Set[Type[Hook]] = set()
 
        for token in config_str.split():
 
            if token.startswith('-'):
 
                update_available = available_hooks.difference_update
 
                key = token[1:]
 
            else:
 
                update_available = available_hooks.update
 
                key = token
 
            try:
 
                update_set = self.group_name_map[key]
 
            except KeyError:
 
                raise ValueError("configuration refers to unknown hooks {!r}".format(key)) from None
 
            else:
 
                update_available(update_set)
 
        for directive in ALL_DIRECTIVES:
 
            key = directive.__name__
 
            for hook in self.group_name_map[key] & available_hooks:
 
                yield key, hook
 

	
 

	
 
HOOK_REGISTRY = HookRegistry()
 
HOOK_REGISTRY.import_hooks('.meta_expense_allocation', 'MetaExpenseAllocation')
 
HOOK_REGISTRY.import_hooks('.meta_tax_implication', 'MetaTaxImplication')
 

	
 
def run(
 
        entries: List[Directive],
 
        options_map: Dict[str, Any],
 
        config: str='',
 
        hook_registry: HookRegistry=HOOK_REGISTRY,
 
) -> Tuple[List[Directive], List[Error]]:
 
    errors: List[Error] = []
 
    hooks: Dict[HookName, List[Hook]] = {}
 
    user_config = configmod.Config()
 
    for key, hook_type in hook_registry.group_by_directive(config):
 
        hooks.setdefault(key, []).append(hook_type())
 
        try:
 
            hook = hook_type(user_config)
 
        except Error as error:
 
            errors.append(error)
 
        else:
 
            hooks.setdefault(key, []).append(hook)
 
    for entry in entries:
 
        entry_type = type(entry).__name__
 
        for hook in hooks[entry_type]:
 
            errors.extend(hook.run(entry))
 
    return entries, errors
 

	
conservancy_beancount/plugin/core.py
Show inline comments
 
"""Base classes for plugin checks"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import abc
 
import datetime
 
import re
 

	
 
from .. import config as configmod
 
from .. import data
 
from .. import errors as errormod
 

	
 
from typing import (
 
    Any,
 
    Dict,
 
    FrozenSet,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    Optional,
 
    TypeVar,
 
)
 
from ..beancount_types import (
 
    Account,
 
    Directive,
 
    MetaKey,
 
    MetaValue,
 
    MetaValueEnum,
 
    Transaction,
 
    Type,
 
)
 

	
 
### CONSTANTS
 

	
 
# I expect these will become configurable in the future, which is why I'm
 
# keeping them outside of a class, but for now constants will do.
 
DEFAULT_START_DATE: datetime.date = datetime.date(2020, 3, 1)
 
# The default stop date leaves a little room after so it's easy to test
 
# dates past the far end of the range.
 
DEFAULT_STOP_DATE: datetime.date = datetime.date(datetime.MAXYEAR, 1, 1)
 

	
 
### TYPE DEFINITIONS
 

	
 
HookName = str
 

	
 
Entry = TypeVar('Entry', bound=Directive)
 
class Hook(Generic[Entry], metaclass=abc.ABCMeta):
 
    DIRECTIVE: Type[Directive]
 
    HOOK_GROUPS: FrozenSet[HookName] = frozenset()
 

	
 
    def __init__(self, config: configmod.Config) -> None:
 
        pass
 
        # Subclasses that need configuration should override __init__ to check
 
        # and store it.
 

	
 
    @abc.abstractmethod
 
    def run(self, entry: Entry) -> errormod.Iter: ...
 

	
 
    def __init_subclass__(cls):
 
        cls.DIRECTIVE = cls.__orig_bases__[0].__args__[0]
 

	
 

	
 
TransactionHook = Hook[Transaction]
 

	
 
### HELPER CLASSES
 

	
 
class LessComparable(metaclass=abc.ABCMeta):
 
    @abc.abstractmethod
 
    def __le__(self, other: Any) -> bool: ...
 

	
 
    @abc.abstractmethod
 
    def __lt__(self, other: Any) -> bool: ...
 

	
 

	
 
CT = TypeVar('CT', bound=LessComparable)
 
class _GenericRange(Generic[CT]):
 
    """Convenience class to check whether a value is within a range.
 

	
 
    `foo in generic_range` is equivalent to `start <= foo < stop`.
 
    Since we have multiple user-configurable ranges, having the check
 
    encapsulated in an object helps implement the check consistently, and
 
    makes it easier for subclasses to override.
 
    """
 

	
 
    def __init__(self, start: CT, stop: CT) -> None:
 
        self.start = start
 
        self.stop = stop
 

	
 
    def __repr__(self) -> str:
 
        return "{clsname}({self.start!r}, {self.stop!r})".format(
 
            clsname=type(self).__name__,
 
            self=self,
 
        )
 

	
 
    def __contains__(self, item: CT) -> bool:
 
        return self.start <= item < self.stop
 

	
 

	
 
class MetadataEnum:
 
    """Map acceptable metadata values to their normalized forms.
 

	
 
    When a piece of metadata uses a set of allowed values, use this class to
 
    define them. You can also specify aliases that hooks will normalize to
 
    the primary values.
 
    """
 

	
 
    def __init__(self,
 
                 key: MetaKey,
 
                 standard_values: Iterable[MetaValueEnum],
 
                 aliases_map: Optional[Mapping[MetaValueEnum, MetaValueEnum]]=None,
 
    ) -> None:
 
        """Specify allowed values and aliases for this metadata.
 

	
 
        Arguments:
 

	
 
        * key: The name of the metadata key that uses this enum.
 
        * standard_values: A sequence of strings that enumerate the standard
 
          values for this metadata.
 
        * aliases_map: A mapping of strings to strings. The keys are
 
          additional allowed metadata values. The values are standard values
 
          that each key will evaluate to. The code asserts that all values are
 
          in standard_values.
 
        """
 
        self.key = key
 
        self._stdvalues = frozenset(standard_values)
 
        self._aliases: Dict[MetaValueEnum, MetaValueEnum] = dict(aliases_map or ())
 
        assert self._stdvalues.issuperset(self._aliases.values())
 
        self._aliases.update((v, v) for v in standard_values)
 

	
 
    def __repr__(self) -> str:
 
        return "{}<{}>".format(type(self).__name__, self.key)
 

	
 
    def __contains__(self, key: MetaValueEnum) -> bool:
 
        """Returns true if `key` is a standard value or alias."""
 
        return key in self._aliases
 

	
 
    def __getitem__(self, key: MetaValueEnum) -> MetaValueEnum:
 
        """Return the standard value for `key`.
 

	
 
        Raises KeyError if `key` is not a known value or alias.
 
        """
 
        return self._aliases[key]
 

	
 
    def __iter__(self) -> Iterator[MetaValueEnum]:
 
        """Iterate over standard values."""
 
        return iter(self._stdvalues)
 

	
 
    def get(self,
 
            key: MetaValueEnum,
 
            default_key: Optional[MetaValueEnum]=None,
 
    ) -> Optional[MetaValueEnum]:
 
        """Return self[key], or a default fallback if that doesn't exist.
 

	
 
        default_key is another key to look up, *not* a default value to return.
 
        This helps ensure you always get a standard value.
 
        """
 
        try:
 
            return self[key]
 
        except KeyError:
 
            if default_key is None:
 
                return None
 
            else:
 
                return self[default_key]
 

	
 

	
 
### HOOK SUBCLASSES
 

	
 
class _PostingHook(TransactionHook, metaclass=abc.ABCMeta):
 
    TXN_DATE_RANGE: _GenericRange = _GenericRange(DEFAULT_START_DATE, DEFAULT_STOP_DATE)
 

	
 
    def __init_subclass__(cls) -> None:
 
        cls.HOOK_GROUPS = cls.HOOK_GROUPS.union(['posting'])
 

	
 
    def _run_on_txn(self, txn: Transaction) -> bool:
 
        return txn.date in self.TXN_DATE_RANGE
 

	
 
    def _run_on_post(self, txn: Transaction, post: data.Posting) -> bool:
 
        return True
 

	
 
    def run(self, txn: Transaction) -> errormod.Iter:
 
        if self._run_on_txn(txn):
 
            for post in data.iter_postings(txn):
 
                if self._run_on_post(txn, post):
tests/test_meta_expense_allocation.py
Show inline comments
 
"""Test handling of expense-allocation metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount.plugin import meta_expense_allocation
 

	
 
VALID_VALUES = {
 
    'program': 'program',
 
    'administration': 'administration',
 
    'fundraising': 'fundraising',
 
    'admin': 'administration',
 
}
 

	
 
INVALID_VALUES = {
 
    'invalid',
 
    'porgram',
 
    'adimn',
 
    'fundrasing',
 
    '',
 
}
 

	
 
TEST_KEY = 'expense-allocation'
 

	
 
@pytest.fixture(scope='module')
 
def hook():
 
    return meta_expense_allocation.MetaExpenseAllocation()
 
    config = testutil.TestConfig()
 
    return meta_expense_allocation.MetaExpenseAllocation(config)
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_postings(hook, src_value, set_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Cash', -25),
 
        ('Expenses:General', 25, {TEST_KEY: src_value}),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: set_value})
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_postings(hook, src_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Cash', -25),
 
        ('Expenses:General', 25, {TEST_KEY: src_value}),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: src_value})
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_transactions(hook, src_value, set_value):
 
    txn = testutil.Transaction(**{TEST_KEY: src_value}, postings=[
 
        ('Assets:Cash', -25),
 
        ('Expenses:General', 25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: set_value})
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_transactions(hook, src_value):
 
    txn = testutil.Transaction(**{TEST_KEY: src_value}, postings=[
 
        ('Assets:Cash', -25),
 
        ('Expenses:General', 25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert errors
 
    testutil.check_post_meta(txn, None, None)
 

	
 
@pytest.mark.parametrize('account', [
 
    'Accrued:AccountsReceivable',
 
    'Assets:Cash',
 
    'Income:Donations',
 
    'Liabilities:CreditCard',
 
    'UnearnedIncome:Donations',
 
])
 
def test_non_expense_accounts_skipped(hook, account):
 
    meta = {TEST_KEY: 'program'}
 
    txn = testutil.Transaction(postings=[
 
        (account, -25),
 
        ('Expenses:General', 25, meta.copy()),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, meta)
 

	
 
@pytest.mark.parametrize('account,set_value', [
 
    ('Expenses:Services:Accounting', 'administration'),
 
    ('Expenses:Services:Administration', 'administration'),
 
    ('Expenses:Services:Advocacy', 'program'),
 
    ('Expenses:Services:Development', 'program'),
 
    ('Expenses:Services:Fundraising', 'fundraising'),
 
])
 
def test_default_values(hook, account, set_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Liabilites:CreditCard', -25),
 
        (account, 25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: set_value})
 

	
 
@pytest.mark.parametrize('date,set_value', [
 
    (testutil.EXTREME_FUTURE_DATE, None),
 
    (testutil.FUTURE_DATE, 'program'),
 
    (testutil.FY_START_DATE, 'program'),
 
    (testutil.FY_MID_DATE, 'program'),
 
    (testutil.PAST_DATE, None),
 
])
 
def test_default_value_set_in_date_range(hook, date, set_value):
 
    txn = testutil.Transaction(date=date, postings=[
 
        ('Liabilites:CreditCard', -25),
 
        ('Expenses:General', 25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    expect_meta = None if set_value is None else {TEST_KEY: set_value}
 
    testutil.check_post_meta(txn, None, expect_meta)
tests/test_meta_income_type.py
Show inline comments
 
"""Test handling of income-type metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount.plugin import meta_income_type
 

	
 
VALID_VALUES = {
 
    'Donations': 'Donations',
 
    'Payable-Derecognition': 'Payable-Derecognition',
 
    'RBI': 'RBI',
 
    'UBTI': 'UBTI',
 
}
 

	
 
INVALID_VALUES = {
 
    'Dontion',
 
    'Payble-Derecognitoin',
 
    'RIB',
 
    'UTBI',
 
    '',
 
}
 

	
 
TEST_KEY = 'income-type'
 

	
 
@pytest.fixture(scope='module')
 
def hook():
 
    return meta_income_type.MetaIncomeType()
 
    config = testutil.TestConfig()
 
    return meta_income_type.MetaIncomeType(config)
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_postings(hook, src_value, set_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Cash', 25),
 
        ('Income:Other', -25, {TEST_KEY: src_value}),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: set_value})
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_postings(hook, src_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Cash', 25),
 
        ('Income:Other', -25, {TEST_KEY: src_value}),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: src_value})
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_transactions(hook, src_value, set_value):
 
    txn = testutil.Transaction(**{TEST_KEY: src_value}, postings=[
 
        ('Assets:Cash', 25),
 
        ('Income:Other', -25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: set_value})
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_transactions(hook, src_value):
 
    txn = testutil.Transaction(**{TEST_KEY: src_value}, postings=[
 
        ('Assets:Cash', 25),
 
        ('Income:Other', -25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert errors
 
    testutil.check_post_meta(txn, None, None)
 

	
 
@pytest.mark.parametrize('account', [
 
    'Accrued:AccountsReceivable',
 
    'Assets:Cash',
 
    'Expenses:General',
 
    'Liabilities:CreditCard',
 
])
 
def test_non_income_accounts_skipped(hook, account):
 
    meta = {TEST_KEY: 'RBI'}
 
    txn = testutil.Transaction(postings=[
 
        (account, 25),
 
        ('Income:Other', -25, meta.copy()),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, meta)
 

	
 
@pytest.mark.parametrize('account,set_value', [
 
    ('Income:Donations', 'Donations'),
 
    ('Income:Honoraria', 'RBI'),
 
    ('Income:Interest', 'RBI'),
 
    ('Income:Interest:Dividend', 'RBI'),
 
    ('Income:Royalties', 'RBI'),
 
    ('Income:Sales', 'RBI'),
 
    ('Income:SoftwareDevelopment', 'RBI'),
 
    ('Income:TrademarkLicensing', 'RBI'),
 
    ('UnearnedIncome:Conferences:Registrations', 'RBI'),
 
    ('UnearnedIncome:MatchPledges', 'Donations'),
 
])
 
def test_default_values(hook, account, set_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Cash', 25),
 
        (account, -25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: set_value})
 

	
 
@pytest.mark.parametrize('account', [
 
    'Income:Other',
 
])
 
def test_no_default_value(hook, account):
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Cash', 25),
 
        (account, -25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert errors
 
    testutil.check_post_meta(txn, None, None)
 

	
 
@pytest.mark.parametrize('date,set_value', [
 
    (testutil.EXTREME_FUTURE_DATE, None),
 
    (testutil.FUTURE_DATE, 'Donations'),
 
    (testutil.FY_START_DATE, 'Donations'),
 
    (testutil.FY_MID_DATE, 'Donations'),
 
    (testutil.PAST_DATE, None),
 
])
 
def test_default_value_set_in_date_range(hook, date, set_value):
 
    txn = testutil.Transaction(date=date, postings=[
 
        ('Assets:Cash', 25),
 
        ('Income:Donations', -25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    expect_meta = None if set_value is None else {TEST_KEY: set_value}
 
    testutil.check_post_meta(txn, None, expect_meta)
tests/test_meta_tax_implication.py
Show inline comments
 
"""Test handling of tax-implication metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount.plugin import meta_tax_implication
 

	
 
VALID_VALUES = {
 
    '1099': '1099',
 
    'Accountant-Advises-No-1099': 'Accountant-Advises-No-1099',
 
    'Bank-Transfer': 'Bank-Transfer',
 
    'Foreign-Corporation': 'Foreign-Corporation',
 
    'Foreign-Individual-Contractor': 'Foreign-Individual-Contractor',
 
    'Fraud': 'Fraud',
 
    'HSA-Contribution': 'HSA-Contribution',
 
    'Loan': 'Loan',
 
    'Payroll': 'Payroll',
 
    'Refund': 'Refund',
 
    'Reimbursement': 'Reimbursement',
 
    'Retirement-Pretax': 'Retirement-Pretax',
 
    'Tax-Payment': 'Tax-Payment',
 
    'USA-501c3': 'USA-501c3',
 
    'USA-Corporation': 'USA-Corporation',
 
    'USA-LLC-No-1099': 'USA-LLC-No-1099',
 
    'W2': 'W2',
 
}
 

	
 
INVALID_VALUES = {
 
    '199',
 
    'W3',
 
    'Payrol',
 
    '',
 
}
 

	
 
TEST_KEY = 'tax-implication'
 

	
 
@pytest.fixture(scope='module')
 
def hook():
 
    return meta_tax_implication.MetaTaxImplication()
 
    config = testutil.TestConfig()
 
    return meta_tax_implication.MetaTaxImplication(config)
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_postings(hook, src_value, set_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Accrued:AccountsPayable', 25),
 
        ('Assets:Cash', -25, {TEST_KEY: src_value}),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: set_value})
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_postings(hook, src_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Accrued:AccountsPayable', 25),
 
        ('Assets:Cash', -25, {TEST_KEY: src_value}),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: src_value})
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_transactions(hook, src_value, set_value):
 
    txn = testutil.Transaction(**{TEST_KEY: src_value}, postings=[
 
        ('Accrued:AccountsPayable', 25),
 
        ('Assets:Cash', -25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, {TEST_KEY: set_value})
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_transactions(hook, src_value):
 
    txn = testutil.Transaction(**{TEST_KEY: src_value}, postings=[
 
        ('Accrued:AccountsPayable', 25),
 
        ('Assets:Cash', -25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert errors
 
    testutil.check_post_meta(txn, None, None)
 

	
 
@pytest.mark.parametrize('account', [
 
    'Accrued:AccountsPayable',
 
    'Expenses:General',
 
    'Liabilities:CreditCard',
 
])
 
def test_non_asset_accounts_skipped(hook, account):
 
    meta = {TEST_KEY: 'USA-Corporation'}
 
    txn = testutil.Transaction(postings=[
 
        (account, 25),
 
        ('Assets:Cash', -25, meta.copy()),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, meta)
 

	
 
def test_prepaid_expenses_skipped(hook, ):
 
    txn = testutil.Transaction(postings=[
 
        ('Expenses:General', 25),
 
        ('Assets:PrepaidExpenses', -25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, None)
 

	
 
def test_asset_credits_skipped(hook, ):
 
    txn = testutil.Transaction(postings=[
 
        ('Income:Donations', -25),
 
        ('Assets:Cash', 25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert not errors
 
    testutil.check_post_meta(txn, None, None)
 

	
 
@pytest.mark.parametrize('date,need_value', [
 
    (testutil.EXTREME_FUTURE_DATE, False),
 
    (testutil.FUTURE_DATE, True),
 
    (testutil.FY_START_DATE, True),
 
    (testutil.FY_MID_DATE, True),
 
    (testutil.PAST_DATE, False),
 
])
 
def test_validation_only_in_date_range(hook, date, need_value):
 
    txn = testutil.Transaction(date=date, postings=[
 
        ('Liabilites:CreditCard', 25),
 
        ('Assets:Cash', -25),
 
    ])
 
    errors = list(hook.run(txn))
 
    assert bool(errors) == bool(need_value)
 
    testutil.check_post_meta(txn, None, None)
tests/test_plugin.py
Show inline comments
 
"""Test main plugin"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import beancount_types, plugin
 
from conservancy_beancount import beancount_types, errors as errormod, plugin
 

	
 
HOOK_REGISTRY = plugin.HookRegistry()
 

	
 
class NonError(errormod.Error):
 
    pass
 

	
 

	
 
class TransactionHook:
 
    DIRECTIVE = beancount_types.Transaction
 
    HOOK_GROUPS = frozenset()
 

	
 
    def __init__(self, config):
 
        self.config = config
 

	
 
    def run(self, txn):
 
        assert False, "something called base class run method"
 

	
 

	
 
@HOOK_REGISTRY.add_hook
 
class ConfigurationError(TransactionHook):
 
    HOOK_GROUPS = frozenset(['unconfigured'])
 

	
 
    def __init__(self, config):
 
        raise errormod.ConfigurationError("testing error")
 

	
 

	
 
@HOOK_REGISTRY.add_hook
 
class TransactionError(TransactionHook):
 
    HOOK_GROUPS = frozenset(['configured'])
 

	
 
    def run(self, txn):
 
        return ['txn:{}'.format(id(txn))]
 
        return [NonError('txn:{}'.format(id(txn)), txn)]
 

	
 

	
 
@HOOK_REGISTRY.add_hook
 
class PostingError(TransactionHook):
 
    HOOK_GROUPS = frozenset(['configured', 'posting'])
 

	
 
    def run(self, txn):
 
        return ['post:{}'.format(id(post)) for post in txn.postings]
 
        return [NonError('post:{}'.format(id(post)), txn)
 
                for post in txn.postings]
 

	
 

	
 
@pytest.fixture
 
def config_map():
 
    return {}
 

	
 
@pytest.fixture
 
def easy_entries():
 
    return [
 
        testutil.Transaction(postings=[
 
            ('Income:Donations', -25),
 
            ('Assets:Cash', 25),
 
        ]),
 
        testutil.Transaction(postings=[
 
            ('Expenses:General', 10),
 
            ('Liabilites:CreditCard', -10),
 
        ]),
 
    ]
 

	
 
def map_errors(errors):
 
    retval = {}
 
    for errkey in errors:
 
        key, _, errid = errkey.partition(':')
 
    for error in errors:
 
        key, _, errid = error.message.partition(':')
 
        retval.setdefault(key, set()).add(errid)
 
    return retval
 

	
 
@pytest.mark.parametrize('group_str,expected', [
 
    (None, [TransactionError, PostingError]),
 
    ('', [TransactionError, PostingError]),
 
    ('all', [TransactionError, PostingError]),
 
    ('Transaction', [TransactionError, PostingError]),
 
    ('-posting', [TransactionError]),
 
    ('-configured posting', [PostingError]),
 
    ('configured -posting', [TransactionError]),
 
])
 
def test_registry_group_by_directive(group_str, expected):
 
    args = () if group_str is None else (group_str,)
 
    actual = {hook for _, hook in HOOK_REGISTRY.group_by_directive(*args)}
 
    assert actual.issuperset(expected)
 
    if len(expected) == 1:
 
        assert not (TransactionError in actual and PostingError in actual)
 

	
 
def test_registry_unknown_group_name():
 
    with pytest.raises(ValueError):
 
        next(HOOK_REGISTRY.group_by_directive('UnKnownTestGroup'))
 

	
 
def test_run_with_multiple_hooks(easy_entries, config_map):
 
    out_entries, errors = plugin.run(easy_entries, config_map, '', HOOK_REGISTRY)
 
    assert len(out_entries) == 2
 
    errmap = map_errors(errors)
 
    assert len(errmap.get('txn', '')) == 2
 
    assert len(errmap.get('post', '')) == 4
 

	
 
def test_run_with_one_hook(easy_entries, config_map):
 
    out_entries, errors = plugin.run(easy_entries, config_map, 'posting', HOOK_REGISTRY)
 
    assert len(out_entries) == 2
 
    errmap = map_errors(errors)
 
    assert len(errmap.get('txn', '')) == 0
 
    assert len(errmap.get('post', '')) == 4
tests/testutil.py
Show inline comments
 
"""Mock Beancount objects for testing"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 

	
 
import beancount.core.amount as bc_amount
 
import beancount.core.data as bc_data
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
 
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
 
FY_START_DATE = datetime.date(2020, 3, 1)
 
FY_MID_DATE = datetime.date(2020, 9, 1)
 
PAST_DATE = datetime.date(2000, 1, 1)
 

	
 
def check_post_meta(txn, *expected_meta, default=None):
 
    assert len(txn.postings) == len(expected_meta)
 
    for post, expected in zip(txn.postings, expected_meta):
 
        if not expected:
 
            assert not post.meta
 
        else:
 
            actual = None if post.meta is None else {
 
                key: post.meta.get(key, default) for key in expected
 
            }
 
            assert actual == expected
 

	
 
def parse_date(s, fmt='%Y-%m-%d'):
 
    return datetime.datetime.strptime(s, fmt).date()
 

	
 
def Posting(account, number,
 
            currency='USD', cost=None, price=None, flag=None,
 
            **meta):
 
    if not meta:
 
        meta = None
 
    return bc_data.Posting(
 
        account,
 
        bc_amount.Amount(Decimal(number), currency),
 
        cost,
 
        price,
 
        flag,
 
        meta,
 
    )
 

	
 
class Transaction:
 
    def __init__(self,
 
                 date=FY_MID_DATE, flag='*', payee=None,
 
                 narration='', tags=None, links=None, postings=None,
 
                 **meta):
 
        if isinstance(date, str):
 
            date = parse_date(date)
 
        self.date = date
 
        self.flag = flag
 
        self.payee = payee
 
        self.narration = narration
 
        self.tags = set(tags or '')
 
        self.links = set(links or '')
 
        self.postings = []
 
        self.meta = {
 
            'filename': '<test>',
 
            'lineno': 0,
 
        }
 
        self.meta.update(meta)
 
        for posting in postings:
 
            self.add_posting(*posting)
 

	
 
    def add_posting(self, arg, *args, **kwargs):
 
        """Add a posting to this transaction. Use any of these forms:
 

	
 
           txn.add_posting(account, number, …, kwarg=value, …)
 
           txn.add_posting(account, number, …, posting_kwargs_dict)
 
           txn.add_posting(posting_object)
 
        """
 
        if kwargs:
 
            posting = Posting(arg, *args, **kwargs)
 
        elif args:
 
            if isinstance(args[-1], dict):
 
                kwargs = args[-1]
 
                args = args[:-1]
 
            posting = Posting(arg, *args, **kwargs)
 
        else:
 
            posting = arg
 
        self.postings.append(posting)
 

	
 

	
 
class TestConfig:
 
    def __init__(self, repo_path=None):
 
        self.repo_path = None if repo_path is None else Path(repo_path)
 

	
 
    def repository_path(self):
 
        return self.repo_path
0 comments (0 inline, 0 general)