Changeset - 547ae657808f
[Not reviewed]
0 6 0
Brett Smith - 4 years ago 2020-03-08 15:32:03
brettcsmith@brettcsmith.org
plugin.core: _meta_set properly handles when post.meta is None.

post is a NamedTuple, so attribute assignment is not allowed.
Instead we have to construct a whole new Posting.
6 files changed with 23 insertions and 20 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/plugin/__init__.py
Show inline comments
...
 
@@ -41,53 +41,53 @@ class HookRegistry:
 
            else:
 
                hook_groups.append(hook_name)
 
                break
 
        for key in hook_groups:
 
            self.group_hooks_map.setdefault(key, set()).add(hook_cls)
 
        return hook_cls  # to allow use as a decorator
 

	
 
    def import_hooks(self, mod_name, *hook_names, package=__module__):
 
        module = importlib.import_module(mod_name, package)
 
        for hook_name in hook_names:
 
            self.add_hook(getattr(module, hook_name))
 

	
 
    def group_by_directive(self, config_str=''):
 
        config_str = config_str.strip()
 
        if not config_str:
 
            config_str = 'all'
 
        elif config_str.startswith('-'):
 
            config_str = 'all ' + config_str
 
        available_hooks = set()
 
        for token in config_str.split():
 
            if token.startswith('-'):
 
                update_available = available_hooks.difference_update
 
                key = token[1:]
 
            else:
 
                update_available = available_hooks.update
 
                key = token
 
            try:
 
                update_set = self.group_hooks_map[key]
 
            except KeyError:
 
                raise ValueError("configuration refers to unknown hooks {!r}".format(key)) from None
 
            else:
 
                update_available(update_set)
 
        return {key: [hook() for hook in self.group_hooks_map[key] & available_hooks]
 
                for key in self.DIRECTIVES}
 

	
 

	
 
HOOK_REGISTRY = HookRegistry()
 
HOOK_REGISTRY.import_hooks('.meta_expense_allocation', 'MetaExpenseAllocation')
 
HOOK_REGISTRY.import_hooks('.meta_tax_implication', 'MetaTaxImplication')
 

	
 
def run(entries, options_map, config='', hook_registry=HOOK_REGISTRY):
 
    errors = []
 
    hooks = hook_registry.group_by_directive(config)
 
    for entry in entries:
 
        entry_type = type(entry).__name__
 
        for hook in hooks[entry_type]:
 
            errors.extend(hook.run(entry))
 
        if entry_type == 'Transaction':
 
            for post in entry.postings:
 
            for index, post in enumerate(entry.postings):
 
                for hook in hooks['Posting']:
 
                    errors.extend(hook.run(entry, post))
 
                    errors.extend(hook.run(entry, post, index))
 
    return entries, errors
 

	
conservancy_beancount/plugin/core.py
Show inline comments
...
 
@@ -87,92 +87,93 @@ class MetadataEnum:
 
        """Return the standard value for `key`.
 

	
 
        Raises KeyError if `key` is not a known value or alias.
 
        """
 
        return self._aliases[key]
 

	
 
    def __iter__(self):
 
        """Iterate over standard values."""
 
        return iter(self._stdvalues)
 

	
 
    def get(self, key, default_key=None):
 
        """Return self[key], or a default fallback if that doesn't exist.
 

	
 
        default_key is another key to look up, *not* a default value to return.
 
        This helps ensure you always get a standard value.
 
        """
 
        try:
 
            return self[key]
 
        except KeyError:
 
            if default_key is None:
 
                return None
 
            else:
 
                return self[default_key]
 

	
 

	
 
class PostingChecker:
 
    """Base class to normalize posting metadata from an enum."""
 
    # This class provides basic functionality to filter postings, normalize
 
    # metadata values, and set default values.
 
    # Subclasses should set:
 
    # * METADATA_KEY: A string with the name of the metadata key to normalize.
 
    # * ACCOUNTS: Only check postings that match these account names.
 
    #   Can be a tuple of account prefix strings, or a regexp.
 
    # * VALUES_ENUM: A MetadataEnum with allowed values and aliases.
 
    # Subclasses may wish to override _default_value and _should_check.
 
    # See below.
 

	
 
    HOOK_GROUPS = frozenset(['Posting', 'metadata'])
 
    ACCOUNTS = ('',)
 
    TXN_DATE_RANGE = _GenericRange(DEFAULT_START_DATE, DEFAULT_STOP_DATE)
 
    VALUES_ENUM = {}
 

	
 
    def _meta_get(self, txn, post, key, default=None):
 
        try:
 
            return post.meta[key]
 
        except (KeyError, TypeError):
 
            return txn.meta.get(key, default)
 

	
 
    def _meta_set(self, post, key, value):
 
    def _meta_set(self, txn, post, post_index, key, value):
 
        if post.meta is None:
 
            post.meta = {}
 
        post.meta[key] = value
 
            txn.postings[post_index] = Posting(*post[:5], {key: value})
 
        else:
 
            post.meta[key] = value
 

	
 
    # If the posting does not specify METADATA_KEY, the hook calls
 
    # _default_value to get a default. This method should either return
 
    # a value string from METADATA_ENUM, or else raise InvalidMetadataError.
 
    # This base implementation does the latter.
 
    def _default_value(self, txn, post):
 
        raise errormod.InvalidMetadataError(txn, post, self.METADATA_KEY)
 

	
 
    # The hook calls _should_check on every posting and only checks postings
 
    # when the method returns true. This base method checks the transaction
 
    # date is in TXN_DATE_RANGE, and the posting account name matches ACCOUNTS.
 
    def _should_check(self, txn, post):
 
        ok = txn.date in self.TXN_DATE_RANGE
 
        if isinstance(self.ACCOUNTS, tuple):
 
            ok = ok and post.account.startswith(self.ACCOUNTS)
 
        else:
 
            ok = ok and re.search(self.ACCOUNTS, post.account)
 
        return ok
 

	
 
    def run(self, txn, post):
 
        errors = []
 
        if not self._should_check(txn, post):
 
            return errors
 
        source_value = self._meta_get(txn, post, self.METADATA_KEY)
 
        set_value = source_value
 
        if source_value is None:
 
            try:
 
                set_value = self._default_value(txn, post)
 
            except errormod._BaseError as error:
 
                errors.append(error)
 
        else:
 
            try:
 
                set_value = self.VALUES_ENUM[source_value]
 
            except KeyError:
 
                errors.append(errormod.InvalidMetadataError(
 
                    txn, post, self.METADATA_KEY, source_value,
 
                ))
 
        if not errors:
 
            self._meta_set(post, self.METADATA_KEY, set_value)
 
        return errors
tests/test_meta_expenseAllocation.py
Show inline comments
 
"""Test handling of expenseAllocation metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount.plugin import meta_expense_allocation
 

	
 
VALID_VALUES = {
 
    'program': 'program',
 
    'administration': 'administration',
 
    'fundraising': 'fundraising',
 
    'admin': 'administration',
 
}
 

	
 
INVALID_VALUES = {
 
    'invalid',
 
    'porgram',
 
    'adimn',
 
    'fundrasing',
 
    '',
 
}
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_postings(src_value, set_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Cash', -25),
 
        ('Expenses:General', 25, {'expenseAllocation': src_value}),
 
    ])
 
    checker = meta_expense_allocation.MetaExpenseAllocation()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert not errors
 
    assert txn.postings[-1].meta.get('expenseAllocation') == set_value
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_postings(src_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Cash', -25),
 
        ('Expenses:General', 25, {'expenseAllocation': src_value}),
 
    ])
 
    checker = meta_expense_allocation.MetaExpenseAllocation()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert errors
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_transactions(src_value, set_value):
 
    txn = testutil.Transaction(expenseAllocation=src_value, postings=[
 
        ('Assets:Cash', -25),
 
        ('Expenses:General', 25),
 
    ])
 
    checker = meta_expense_allocation.MetaExpenseAllocation()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert not errors
 
    assert txn.postings[-1].meta.get('expenseAllocation') == set_value
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_transactions(src_value):
 
    txn = testutil.Transaction(expenseAllocation=src_value, postings=[
 
        ('Assets:Cash', -25),
 
        ('Expenses:General', 25),
 
    ])
 
    checker = meta_expense_allocation.MetaExpenseAllocation()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert errors
 

	
 
@pytest.mark.parametrize('account', [
 
    'Accrued:AccountsReceivable',
 
    'Assets:Cash',
 
    'Income:Donations',
 
    'Liabilities:CreditCard',
 
    'UnearnedIncome:Donations',
 
])
 
def test_non_expense_accounts_skipped(account):
 
    txn = testutil.Transaction(postings=[
 
        (account, -25),
 
        ('Expenses:General', 25, {'expenseAllocation': 'program'}),
 
    ])
 
    checker = meta_expense_allocation.MetaExpenseAllocation()
 
    errors = checker.run(txn, txn.postings[0])
 
    errors = checker.run(txn, txn.postings[0], 0)
 
    assert not errors
 

	
 
@pytest.mark.parametrize('account,set_value', [
 
    ('Expenses:Services:Accounting', 'administration'),
 
    ('Expenses:Services:Administration', 'administration'),
 
    ('Expenses:Services:Advocacy', 'program'),
 
    ('Expenses:Services:Development', 'program'),
 
    ('Expenses:Services:Fundraising', 'fundraising'),
 
])
 
def test_default_values(account, set_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Liabilites:CreditCard', -25),
 
        (account, 25),
 
    ])
 
    checker = meta_expense_allocation.MetaExpenseAllocation()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert not errors
 
    assert txn.postings[-1].meta['expenseAllocation'] == set_value
 

	
 
@pytest.mark.parametrize('date,set_value', [
 
    (testutil.EXTREME_FUTURE_DATE, False),
 
    (testutil.FUTURE_DATE, True),
 
    (testutil.FY_START_DATE, True),
 
    (testutil.FY_MID_DATE, True),
 
    (testutil.PAST_DATE, False),
 
])
 
def test_default_value_set_in_date_range(date, set_value):
 
    txn = testutil.Transaction(date=date, postings=[
 
        ('Liabilites:CreditCard', -25),
 
        ('Expenses:General', 25),
 
    ])
 
    checker = meta_expense_allocation.MetaExpenseAllocation()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert not errors
 
    got_value = (txn.postings[-1].meta or {}).get('expenseAllocation')
 
    assert bool(got_value) == bool(set_value)
tests/test_meta_taxImplication.py
Show inline comments
...
 
@@ -9,122 +9,122 @@
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount.plugin import meta_tax_implication
 

	
 
VALID_VALUES = {
 
    '1099': '1099',
 
    'Accountant-Advises-No-1099': 'Accountant-Advises-No-1099',
 
    'Bank-Transfer': 'Bank-Transfer',
 
    'Foreign-Corporation': 'Foreign-Corporation',
 
    'Foreign-Individual-Contractor': 'Foreign-Individual-Contractor',
 
    'Fraud': 'Fraud',
 
    'HSA-Contribution': 'HSA-Contribution',
 
    'Loan': 'Loan',
 
    'Payroll': 'Payroll',
 
    'Refund': 'Refund',
 
    'Reimbursement': 'Reimbursement',
 
    'Retirement-Pretax': 'Retirement-Pretax',
 
    'Tax-Payment': 'Tax-Payment',
 
    'USA-501c3': 'USA-501c3',
 
    'USA-Corporation': 'USA-Corporation',
 
    'USA-LLC-No-1099': 'USA-LLC-No-1099',
 
    'W2': 'W2',
 
}
 

	
 
INVALID_VALUES = {
 
    '199',
 
    'W3',
 
    'Payrol',
 
    '',
 
}
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_postings(src_value, set_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Accrued:AccountsPayable', 25),
 
        ('Assets:Cash', -25, {'taxImplication': src_value}),
 
    ])
 
    checker = meta_tax_implication.MetaTaxImplication()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert not errors
 
    assert txn.postings[-1].meta.get('taxImplication') == set_value
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_postings(src_value):
 
    txn = testutil.Transaction(postings=[
 
        ('Accrued:AccountsPayable', 25),
 
        ('Assets:Cash', -25, {'taxImplication': src_value}),
 
    ])
 
    checker = meta_tax_implication.MetaTaxImplication()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert errors
 

	
 
@pytest.mark.parametrize('src_value,set_value', VALID_VALUES.items())
 
def test_valid_values_on_transactions(src_value, set_value):
 
    txn = testutil.Transaction(taxImplication=src_value, postings=[
 
        ('Accrued:AccountsPayable', 25),
 
        ('Assets:Cash', -25),
 
    ])
 
    checker = meta_tax_implication.MetaTaxImplication()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert not errors
 
    assert txn.postings[-1].meta.get('taxImplication') == set_value
 

	
 
@pytest.mark.parametrize('src_value', INVALID_VALUES)
 
def test_invalid_values_on_transactions(src_value):
 
    txn = testutil.Transaction(taxImplication=src_value, postings=[
 
        ('Accrued:AccountsPayable', 25),
 
        ('Assets:Cash', -25),
 
    ])
 
    checker = meta_tax_implication.MetaTaxImplication()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert errors
 

	
 
@pytest.mark.parametrize('account', [
 
    'Accrued:AccountsPayable',
 
    'Expenses:General',
 
    'Liabilities:CreditCard',
 
])
 
def test_non_asset_accounts_skipped(account):
 
    txn = testutil.Transaction(postings=[
 
        (account, 25),
 
        ('Assets:Cash', -25, {'taxImplication': 'USA-Corporation'}),
 
    ])
 
    checker = meta_tax_implication.MetaTaxImplication()
 
    errors = checker.run(txn, txn.postings[0])
 
    errors = checker.run(txn, txn.postings[0], 0)
 
    assert not errors
 

	
 
def test_asset_credits_skipped():
 
    txn = testutil.Transaction(postings=[
 
        ('Income:Donations', -25),
 
        ('Assets:Cash', 25),
 
    ])
 
    checker = meta_tax_implication.MetaTaxImplication()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert not errors
 
    assert not txn.postings[-1].meta
 

	
 
@pytest.mark.parametrize('date,need_value', [
 
    (testutil.EXTREME_FUTURE_DATE, False),
 
    (testutil.FUTURE_DATE, True),
 
    (testutil.FY_START_DATE, True),
 
    (testutil.FY_MID_DATE, True),
 
    (testutil.PAST_DATE, False),
 
])
 
def test_default_value_set_in_date_range(date, need_value):
 
    txn = testutil.Transaction(date=date, postings=[
 
        ('Liabilites:CreditCard', 25),
 
        ('Assets:Cash', -25),
 
    ])
 
    checker = meta_tax_implication.MetaTaxImplication()
 
    errors = checker.run(txn, txn.postings[-1])
 
    errors = checker.run(txn, txn.postings[-1], -1)
 
    assert bool(errors) == bool(need_value)
tests/test_plugin_run.py
Show inline comments
 
"""Test main plugin run loop"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import plugin
 

	
 
CONFIG_MAP = {}
 
HOOK_REGISTRY = plugin.HookRegistry()
 

	
 
@HOOK_REGISTRY.add_hook
 
class TransactionCounter:
 
    HOOK_GROUPS = frozenset(['Transaction', 'counter'])
 

	
 
    def run(self, txn):
 
        return ['txn:{}'.format(id(txn))]
 

	
 

	
 
@HOOK_REGISTRY.add_hook
 
class PostingCounter(TransactionCounter):
 
    HOOK_GROUPS = frozenset(['Posting', 'counter'])
 

	
 
    def run(self, txn, post):
 
    def run(self, txn, post, post_index):
 
        return ['post:{}'.format(id(post))]
 

	
 

	
 
def map_errors(errors):
 
    retval = {}
 
    for errkey in errors:
 
        key, _, errid = errkey.partition(':')
 
        retval.setdefault(key, set()).add(errid)
 
    return retval
 

	
 
def test_with_multiple_hooks():
 
    in_entries = [
 
        testutil.Transaction(postings=[
 
            ('Income:Donations', -25),
 
            ('Assets:Cash', 25),
 
        ]),
 
        testutil.Transaction(postings=[
 
            ('Expenses:General', 10),
 
            ('Liabilites:CreditCard', -10),
 
        ]),
 
    ]
 
    out_entries, errors = plugin.run(in_entries, CONFIG_MAP, '', HOOK_REGISTRY)
 
    assert len(out_entries) == 2
 
    errmap = map_errors(errors)
 
    assert len(errmap.get('txn', '')) == 2
 
    assert len(errmap.get('post', '')) == 4
 

	
 
def test_with_posting_hooks_only():
 
    in_entries = [
 
        testutil.Transaction(postings=[
 
            ('Income:Donations', -25),
 
            ('Assets:Cash', 25),
 
        ]),
 
        testutil.Transaction(postings=[
 
            ('Expenses:General', 10),
 
            ('Liabilites:CreditCard', -10),
 
        ]),
 
    ]
 
    out_entries, errors = plugin.run(in_entries, CONFIG_MAP, 'Posting', HOOK_REGISTRY)
 
    assert len(out_entries) == 2
 
    errmap = map_errors(errors)
 
    assert len(errmap.get('txn', '')) == 0
 
    assert len(errmap.get('post', '')) == 4
tests/testutil.py
Show inline comments
 
"""Mock Beancount objects for testing"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 

	
 
import beancount.core.amount as bc_amount
 
import beancount.core.data as bc_data
 

	
 
from decimal import Decimal
 

	
 
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
 
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
 
FY_START_DATE = datetime.date(2020, 3, 1)
 
FY_MID_DATE = datetime.date(2020, 9, 1)
 
PAST_DATE = datetime.date(2000, 1, 1)
 

	
 
def parse_date(s, fmt='%Y-%m-%d'):
 
    return datetime.datetime.strptime(s, fmt).date()
 

	
 
def Posting(account, number,
 
            currency='USD', cost=None, price=None, flag=None,
 
            **meta):
 
    if not meta:
 
        meta = None
 
    return bc_data.Posting(
 
        account,
 
        bc_amount.Amount(Decimal(number), currency),
 
        cost,
 
        price,
 
        flag,
 
        meta,
 
    )
 

	
 
class Transaction:
 
    def __init__(self,
 
                 date=FY_MID_DATE, flag='*', payee=None,
 
                 narration='', tags=None, links=None, postings=None,
 
                 **meta):
 
        if isinstance(date, str):
 
            date = parse_date(date)
 
        self.date = date
 
        self.flag = flag
 
        self.payee = payee
 
        self.narration = narration
 
        self.tags = set(tags or '')
 
        self.links = set(links or '')
 
        self.postings = []
 
        self.meta = {
 
            'filename': '<test>',
 
            'lineno': 0,
 
        }
 
        self.meta.update(meta)
 
        for posting in postings:
 
            self.add_posting(*posting)
 

	
 
    def add_posting(self, arg, *args, **kwargs):
 
        """Add a posting to this transaction. Use any of these forms:
 

	
 
           txn.add_posting(account, number, …, kwarg=value, …)
 
           txn.add_posting(account, number, …, posting_kwargs_dict)
 
           txn.add_posting(posting_object)
 
        """
 
        if kwargs:
 
            posting = Posting(arg, *args, **kwargs)
 
        elif args:
 
            if isinstance(args[-1], dict):
 
                kwargs = args[-1]
 
                args = args[:-1]
 
            posting = Posting(arg, *args, **kwargs)
 
        else:
 
            posting = arg
 
        self.postings.append(posting)
0 comments (0 inline, 0 general)