Changeset - d9420ac2b62f
[Not reviewed]
0 1 2
Brett Smith - 4 years ago 2020-03-28 18:31:17
brettcsmith@brettcsmith.org
meta_invoice: Start hook.
3 files changed with 185 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/plugin/meta_invoice.py
Show inline comments
 
new file 100644
 
"""meta_invoice - Validate invoice metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
from . import core
 
from .. import data
 
from .. import errors as errormod
 
from ..beancount_types import (
 
    MetaValueEnum,
 
    Transaction,
 
)
 

	
 
class MetaInvoice(core._PostingHook):
 
    METADATA_KEY = 'invoice'
 
    HOOK_GROUPS = frozenset(['metadata', METADATA_KEY])
 

	
 
    def _run_on_post(self, txn: Transaction, post: data.Posting) -> bool:
 
        return post.account.is_under('Accrued') is not None
 

	
 
    def post_run(self, txn: Transaction, post: data.Posting) -> errormod.Iter:
 
        try:
 
            problem = not post.meta.get_links(self.METADATA_KEY)
 
            value = None
 
        except TypeError:
 
            problem = True
 
            value = post.meta[self.METADATA_KEY]
 
        if problem:
 
            yield errormod.InvalidMetadataError(txn, self.METADATA_KEY, value, post)
tests/test_meta_invoice.py
Show inline comments
 
new file 100644
 
"""Test validation of invoice metadata"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount.plugin import meta_invoice
 

	
 
VALID_VALUES = {
 
    'Invoices/304321.pdf',
 
    'rt:123/456',
 
    'rt://ticket/234',
 
}
 

	
 
INVALID_VALUES = {
 
    '',
 
    ' ',
 
    '     ',
 
}
 

	
 
REQUIRED_ACCOUNTS = {
 
    'Accrued:AccountsPayable',
 
    'Accrued:AccountsReceivable',
 
}
 

	
 
NON_REQUIRED_ACCOUNTS = {
 
    'Assets:Cash',
 
    'Expenses:Other',
 
    'Income:Other',
 
    'Liabilities:CreditCard',
 
    'UnearnedIncome:Donations',
 
}
 

	
 
TEST_KEY = 'invoice'
 

	
 
@pytest.fixture(scope='module')
 
def hook():
 
    config = testutil.TestConfig()
 
    return meta_invoice.MetaInvoice(config)
 

	
 
@pytest.mark.parametrize('acct1,acct2,value', testutil.combine_values(
 
    REQUIRED_ACCOUNTS,
 
    NON_REQUIRED_ACCOUNTS,
 
    VALID_VALUES,
 
))
 
def test_valid_values_on_postings(hook, acct1, acct2, value):
 
    txn = testutil.Transaction(postings=[
 
        (acct2, -25),
 
        (acct1, 25, {TEST_KEY: value}),
 
    ])
 
    assert not list(hook.run(txn))
 

	
 
@pytest.mark.parametrize('acct1,acct2,value', testutil.combine_values(
 
    REQUIRED_ACCOUNTS,
 
    NON_REQUIRED_ACCOUNTS,
 
    INVALID_VALUES,
 
))
 
def test_invalid_values_on_postings(hook, acct1, acct2, value):
 
    txn = testutil.Transaction(postings=[
 
        (acct2, -25),
 
        (acct1, 25, {TEST_KEY: value}),
 
    ])
 
    actual = {error.message for error in hook.run(txn)}
 
    assert actual == {"{} missing {}".format(acct1, TEST_KEY)}
 

	
 
@pytest.mark.parametrize('acct1,acct2,value', testutil.combine_values(
 
    REQUIRED_ACCOUNTS,
 
    NON_REQUIRED_ACCOUNTS,
 
    testutil.NON_STRING_METADATA_VALUES,
 
))
 
def test_bad_type_values_on_postings(hook, acct1, acct2, value):
 
    txn = testutil.Transaction(postings=[
 
        (acct2, -25),
 
        (acct1, 25, {TEST_KEY: value}),
 
    ])
 
    expected_msg = "{} has wrong type of {}: expected str but is a {}".format(
 
        acct1,
 
        TEST_KEY,
 
        type(value).__name__,
 
    )
 
    actual = {error.message for error in hook.run(txn)}
 
    assert actual == {expected_msg}
 

	
 
@pytest.mark.parametrize('acct1,acct2,value', testutil.combine_values(
 
    REQUIRED_ACCOUNTS,
 
    NON_REQUIRED_ACCOUNTS,
 
    VALID_VALUES,
 
))
 
def test_valid_values_on_transaction(hook, acct1, acct2, value):
 
    txn = testutil.Transaction(**{TEST_KEY: value}, postings=[
 
        (acct2, -25),
 
        (acct1, 25),
 
    ])
 
    assert not list(hook.run(txn))
 

	
 
@pytest.mark.parametrize('acct1,acct2,value', testutil.combine_values(
 
    REQUIRED_ACCOUNTS,
 
    NON_REQUIRED_ACCOUNTS,
 
    INVALID_VALUES,
 
))
 
def test_invalid_values_on_transaction(hook, acct1, acct2, value):
 
    txn = testutil.Transaction(**{TEST_KEY: value}, postings=[
 
        (acct2, -25),
 
        (acct1, 25),
 
    ])
 
    actual = {error.message for error in hook.run(txn)}
 
    assert actual == {"{} missing {}".format(acct1, TEST_KEY)}
 

	
 
@pytest.mark.parametrize('acct1,acct2,value', testutil.combine_values(
 
    REQUIRED_ACCOUNTS,
 
    NON_REQUIRED_ACCOUNTS,
 
    testutil.NON_STRING_METADATA_VALUES,
 
))
 
def test_bad_type_values_on_transaction(hook, acct1, acct2, value):
 
    txn = testutil.Transaction(**{TEST_KEY: value}, postings=[
 
        (acct2, -25),
 
        (acct1, 25),
 
    ])
 
    expected_msg = "{} has wrong type of {}: expected str but is a {}".format(
 
        acct1,
 
        TEST_KEY,
 
        type(value).__name__,
 
    )
 
    actual = {error.message for error in hook.run(txn)}
 
    assert actual == {expected_msg}
tests/testutil.py
Show inline comments
 
"""Mock Beancount objects for testing"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import itertools
 

	
 
import beancount.core.amount as bc_amount
 
import beancount.core.data as bc_data
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
from conservancy_beancount import rtutil
 

	
 
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
 
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
 
FY_START_DATE = datetime.date(2020, 3, 1)
 
FY_MID_DATE = datetime.date(2020, 9, 1)
 
PAST_DATE = datetime.date(2000, 1, 1)
 
TESTS_DIR = Path(__file__).parent
 

	
 
def check_post_meta(txn, *expected_meta, default=None):
 
    assert len(txn.postings) == len(expected_meta)
 
    for post, expected in zip(txn.postings, expected_meta):
 
        if not expected:
 
            assert not post.meta
 
        else:
 
            actual = None if post.meta is None else {
 
                key: post.meta.get(key, default) for key in expected
 
            }
 
            assert actual == expected
 

	
 
def combine_values(*value_seqs):
 
    return itertools.islice(
 
        zip(*(itertools.cycle(seq) for seq in value_seqs)),
 
        max(len(seq) for seq in value_seqs),
 
    )
 

	
 
def parse_date(s, fmt='%Y-%m-%d'):
 
    return datetime.datetime.strptime(s, fmt).date()
 

	
 
def test_path(s):
 
    if s is None:
 
        return s
 
    s = Path(s)
 
    if not s.is_absolute():
 
        s = TESTS_DIR / s
 
    return s
 

	
 
def Amount(number, currency='USD'):
 
    return bc_amount.Amount(Decimal(number), currency)
 

	
 
def Posting(account, number,
 
            currency='USD', cost=None, price=None, flag=None,
 
            **meta):
 
    if not meta:
 
        meta = None
 
    return bc_data.Posting(
 
        account,
 
        bc_amount.Amount(Decimal(number), currency),
 
        cost,
 
        price,
 
        flag,
 
        meta,
 
    )
 

	
 
NON_STRING_METADATA_VALUES = [
 
    Decimal(5),
 
    FY_MID_DATE,
 
    Amount(50),
 
    Amount(500, None),
 
]
 

	
 
class Transaction:
 
    def __init__(self,
 
                 date=FY_MID_DATE, flag='*', payee=None,
 
                 narration='', tags=None, links=None, postings=None,
 
                 **meta):
 
        if isinstance(date, str):
 
            date = parse_date(date)
 
        self.date = date
 
        self.flag = flag
 
        self.payee = payee
 
        self.narration = narration
 
        self.tags = set(tags or '')
 
        self.links = set(links or '')
 
        self.postings = []
 
        self.meta = {
 
            'filename': '<test>',
 
            'lineno': 0,
 
        }
 
        self.meta.update(meta)
 
        for posting in postings:
 
            self.add_posting(*posting)
 

	
 
    def add_posting(self, arg, *args, **kwargs):
 
        """Add a posting to this transaction. Use any of these forms:
 

	
 
           txn.add_posting(account, number, …, kwarg=value, …)
 
           txn.add_posting(account, number, …, posting_kwargs_dict)
 
           txn.add_posting(posting_object)
 
        """
 
        if kwargs:
 
            posting = Posting(arg, *args, **kwargs)
 
        elif args:
 
            if isinstance(args[-1], dict):
 
                kwargs = args[-1]
 
                args = args[:-1]
 
            posting = Posting(arg, *args, **kwargs)
 
        else:
 
            posting = arg
 
        self.postings.append(posting)
 

	
 

	
 
class TestConfig:
 
    def __init__(self,
 
                 repo_path=None,
 
                 rt_client=None,
 
    ):
 
        self.repo_path = test_path(repo_path)
 
        self._rt_client = rt_client
 
        if rt_client is None:
 
            self._rt_wrapper = None
 
        else:
 
            self._rt_wrapper = rtutil.RT(rt_client)
 

	
 
    def repository_path(self):
 
        return self.repo_path
 

	
 
    def rt_client(self):
 
        return self._rt_client
 

	
 
    def rt_wrapper(self):
 
        return self._rt_wrapper
 

	
 

	
 
class _TicketBuilder:
 
    MESSAGE_ATTACHMENTS = [
 
        ('(Unnamed)', 'multipart/alternative', '0b'),
 
        ('(Unnamed)', 'text/plain', '1.2k'),
 
        ('(Unnamed)', 'text/html', '1.4k'),
 
    ]
 
    MISC_ATTACHMENTS = [
 
        ('Forwarded Message.eml', 'message/rfc822', '3.1k'),
 
        ('photo.jpg', 'image/jpeg', '65.2k'),
 
        ('document.pdf', 'application/pdf', '326k'),
 
        ('screenshot.png', 'image/png', '1.9m'),
 
        ('statement.txt', 'text/plain', '652b'),
 
    ]
 

	
 
    def __init__(self):
 
        self.id_seq = itertools.count(1)
 
        self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
 

	
 
    def new_attch(self, attch):
 
        return (str(next(self.id_seq)), *attch)
 

	
 
    def new_msg_with_attachments(self, attachments_count=1):
 
        for attch in self.MESSAGE_ATTACHMENTS:
 
            yield self.new_attch(attch)
 
        for _ in range(attachments_count):
 
            yield self.new_attch(next(self.misc_attchs))
 

	
 
    def new_messages(self, messages_count, attachments_count=None):
 
        for n in range(messages_count):
 
            if attachments_count is None:
 
                att_count = messages_count - n
 
            else:
 
                att_count = attachments_count
 
            yield from self.new_msg_with_attachments(att_count)
 

	
 

	
 
class RTClient:
 
    _builder = _TicketBuilder()
 
    DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
 
    TICKET_DATA = {
 
        '1': list(_builder.new_messages(1, 3)),
 
        '2': list(_builder.new_messages(2, 1)),
 
        '3': list(_builder.new_messages(3, 0)),
 
    }
 
    del _builder
 

	
 
    def __init__(self,
 
                 url=DEFAULT_URL,
 
                 default_login=None,
 
                 default_password=None,
 
                 proxy=None,
 
                 default_queue='General',
 
                 skip_login=False,
 
                 verify_cert=True,
 
                 http_auth=None,
 
    ):
 
        self.url = url
 
        if http_auth is None:
 
            self.user = default_login
 
            self.password = default_password
 
            self.auth_method = 'login'
 
            self.login_result = skip_login or None
 
        else:
 
            self.user = http_auth.username
 
            self.password = http_auth.password
 
            self.auth_method = type(http_auth).__name__
 
            self.login_result = True
 
        self.last_login = None
 

	
 
    def login(self, login=None, password=None):
 
        if login is None and password is None:
 
            login = self.user
 
            password = self.password
 
        self.login_result = bool(login and password and not password.startswith('bad'))
 
        self.last_login = (login, password, self.login_result)
 
        return self.login_result
 

	
 
    def get_attachments(self, ticket_id):
 
        try:
 
            return list(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            return None
 

	
 
    def get_attachment(self, ticket_id, attachment_id):
 
        try:
 
            att_seq = iter(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            return None
 
        att_id = str(attachment_id)
 
        multipart_id = None
 
        for attch in att_seq:
 
            if attch[0] == att_id:
 
                break
 
            elif attch[2].startswith('multipart/'):
 
                multipart_id = attch[0]
 
        else:
 
            return None
 
        tx_id = multipart_id or att_id
 
        if attch[1] == '(Unnamed)':
 
            filename = ''
 
        else:
 
            filename = attch[1]
 
        return {
 
            'id': att_id,
 
            'ContentType': attch[2],
 
            'Filename': filename,
 
            'Transaction': tx_id,
 
        }
 

	
 
    def get_ticket(self, ticket_id):
 
        ticket_id_s = str(ticket_id)
 
        if ticket_id_s not in self.TICKET_DATA:
 
            return None
 
        return {
 
            'id': 'ticket/{}'.format(ticket_id_s),
 
            'numerical_id': ticket_id_s,
 
        }
0 comments (0 inline, 0 general)