"""test_reports_accrual - Unit tests for accrual report""" # Copyright © 2020 Brett Smith # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import collections import copy import datetime import io import itertools import logging import re import pytest from . import testutil from beancount import loader as bc_loader from conservancy_beancount import data from conservancy_beancount import rtutil from conservancy_beancount.reports import accrual from conservancy_beancount.reports import core _accruals_load = bc_loader.load_file(testutil.test_path('books/accruals.beancount')) ACCRUAL_TXNS = [ entry for entry in _accruals_load[0] if hasattr(entry, 'narration') and entry.narration != 'Opening balances' ] ACCRUALS_COUNT = sum( 1 for txn in ACCRUAL_TXNS for post in txn.postings if post.account.startswith(('Assets:Receivable:', 'Liabilities:Payable:')) ) ACCOUNTS = [ 'Assets:Receivable:Accounts', 'Assets:Receivable:Loans', 'Liabilities:Payable:Accounts', 'Liabilities:Payable:Vacation', ] CONSISTENT_METADATA = [ 'contract', 'entity', 'purchase-order', ] class RTClient(testutil.RTClient): TICKET_DATA = { '40': [ ('400', 'invoice feb.csv', 'text/csv', '40.4k'), ], '44': [ ('440', 'invoice feb.csv', 'text/csv', '40.4k'), ], '490': [], '505': [], '510': [ ('4000', 'contract.pdf', 'application/pdf', '1.4m'), ('5100', 'invoice april.pdf', 'application/pdf', '1.5m'), ('5105', 'payment.png', 'image/png', '51.5k'), ('6100', 'invoice may.pdf', 'application/pdf', '1.6m'), ], '515': [], '520': [], } @pytest.fixture def accrual_postings(): return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS)) def check_link_regexp(regexp, match_s, first_link_only=False): assert regexp assert re.search(regexp, match_s) assert re.search(regexp, match_s + ' postlink') assert re.search(regexp, match_s + '0') is None assert re.search(regexp, '1' + match_s) is None end_match = re.search(regexp, 'prelink ' + match_s) if first_link_only: assert end_match is None else: assert end_match def accruals_by_meta(postings, value, key='invoice', wrap_type=iter): return wrap_type( post for post in postings if post.meta.get(key) == value and post.account.is_under('Assets:Receivable', 'Liabilities:Payable') ) @pytest.mark.parametrize('link_fmt', [ '{}', 'rt:{}', 'rt://ticket/{}', ]) def test_search_term_parse_rt_shortcuts(link_fmt): key, regexp = accrual.SearchTerm.parse(link_fmt.format(220)) assert key == 'rt-id' check_link_regexp(regexp, 'rt:220', first_link_only=True) check_link_regexp(regexp, 'rt://ticket/220', first_link_only=True) @pytest.mark.parametrize('link_fmt', [ '{}/{}', 'rt:{}/{}', 'rt://ticket/{}/attachments/{}', ]) def test_search_term_parse_invoice_shortcuts(link_fmt): key, regexp = accrual.SearchTerm.parse(link_fmt.format(330, 660)) assert key == 'invoice' check_link_regexp(regexp, 'rt:330/660') check_link_regexp(regexp, 'rt://ticket/330/attachments/660') @pytest.mark.parametrize('key', [ 'approval', 'contract', 'invoice', ]) def test_search_term_parse_metadata_rt_shortcut(key): actual_key, regexp = accrual.SearchTerm.parse(f'{key}=440/420') assert actual_key == key check_link_regexp(regexp, 'rt:440/420') check_link_regexp(regexp, 'rt://ticket/440/attachments/420') @pytest.mark.parametrize('key', [ None, 'approval', 'contract', 'invoice', ]) def test_search_term_parse_repo_link(key): document = '1234.pdf' if key is None: key = 'invoice' search = document else: search = f'{key}={document}' actual_key, regexp = accrual.SearchTerm.parse(search) assert actual_key == key check_link_regexp(regexp, document) @pytest.mark.parametrize('search,unmatched', [ ('1234.pdf', '1234_pdf'), ]) def test_search_term_parse_regexp_escaping(search, unmatched): _, regexp = accrual.SearchTerm.parse(search) assert re.search(regexp, unmatched) is None @pytest.mark.parametrize('search_terms,expect_count,check_func', [ ([], ACCRUALS_COUNT, lambda post: post.account.is_under( 'Assets:Receivable:', 'Liabilities:Payable:', )), ([('rt-id', '^rt:505$')], 2, lambda post: post.meta['entity'] == 'DonorA'), ([('invoice', r'^rt:\D+515/')], 1, lambda post: post.meta['entity'] == 'MatchingProgram'), ([('entity', '^Lawyer$')], 3, lambda post: post.meta['rt-id'] == 'rt:510'), ([('entity', '^Lawyer$'), ('contract', '^rt:510/')], 2, lambda post: post.meta['invoice'].startswith('rt:510/')), ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False), ]) def test_filter_search(accrual_postings, search_terms, expect_count, check_func): actual = list(accrual.filter_search(accrual_postings, search_terms)) if expect_count < ACCRUALS_COUNT: assert ACCRUALS_COUNT > len(actual) >= expect_count else: assert len(actual) == ACCRUALS_COUNT for post in actual: assert check_func(post) @pytest.mark.parametrize('arg,expected', [ ('balance', accrual.BalanceReport), ('outgoing', accrual.OutgoingReport), ('bal', accrual.BalanceReport), ('out', accrual.OutgoingReport), ('outgoings', accrual.OutgoingReport), ]) def test_report_type_by_name(arg, expected): assert accrual.ReportType.by_name(arg.lower()).value is expected assert accrual.ReportType.by_name(arg.title()).value is expected assert accrual.ReportType.by_name(arg.upper()).value is expected @pytest.mark.parametrize('arg', [ 'unknown', 'blance', 'outgong', ]) def test_report_type_by_unknown_name(arg): # Raising ValueError helps argparse generate good messages. with pytest.raises(ValueError): accrual.ReportType.by_name(arg) @pytest.mark.parametrize('acct_name', ACCOUNTS) def test_accrual_postings_consistent_account(acct_name): meta = {'invoice': '{acct_name} invoice.pdf'} txn = testutil.Transaction(postings=[ (acct_name, 50, meta), (acct_name, 25, meta), ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) assert related.account == acct_name assert related.accounts == {acct_name} @pytest.mark.parametrize('cost', [ testutil.Cost('1.2', 'USD'), None, ]) def test_accrual_postings_consistent_cost(cost): meta = {'invoice': 'FXinvoice.pdf'} txn = testutil.Transaction(postings=[ (ACCOUNTS[0], 60, 'EUR', cost, meta), (ACCOUNTS[0], 30, 'EUR', cost, meta), ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) assert related.cost == cost assert related.costs == {cost} @pytest.mark.parametrize('meta_key,acct_name', testutil.combine_values( CONSISTENT_METADATA, ACCOUNTS, )) def test_accrual_postings_consistent_metadata(meta_key, acct_name): meta_value = f'{meta_key}.pdf' meta = { meta_key: meta_value, 'invoice': f'invoice with {meta_key}.pdf', } txn = testutil.Transaction(postings=[ (acct_name, 70, meta), (acct_name, 35, meta), ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) attr_name = meta_key.replace('-', '_') assert getattr(related, attr_name) == meta_value assert getattr(related, f'{attr_name}s') == {meta_value} def test_accrual_postings_inconsistent_account(): meta = {'invoice': 'invoice.pdf'} txn = testutil.Transaction(postings=[ (acct_name, index, meta) for index, acct_name in enumerate(ACCOUNTS) ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) assert related.account is related.INCONSISTENT assert related.accounts == set(ACCOUNTS) def test_accrual_postings_inconsistent_cost(): meta = {'invoice': 'FXinvoice.pdf'} costs = { testutil.Cost('1.1', 'USD'), testutil.Cost('1.2', 'USD'), } txn = testutil.Transaction(postings=[ (ACCOUNTS[0], 10, 'EUR', cost, meta) for cost in costs ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) assert related.cost is related.INCONSISTENT assert related.costs == costs @pytest.mark.parametrize('meta_key,acct_name', testutil.combine_values( CONSISTENT_METADATA, ACCOUNTS, )) def test_accrual_postings_inconsistent_metadata(meta_key, acct_name): invoice = 'invoice with {meta_key}.pdf' meta_value = f'{meta_key}.pdf' txn = testutil.Transaction(postings=[ (acct_name, 20, {'invoice': invoice, meta_key: meta_value}), (acct_name, 35, {'invoice': invoice}), ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) attr_name = meta_key.replace('-', '_') assert getattr(related, attr_name) is related.INCONSISTENT assert getattr(related, f'{attr_name}s') == {meta_value, None} @pytest.mark.parametrize('meta_key,account', testutil.combine_values( CONSISTENT_METADATA, ACCOUNTS, )) def test_consistency_check_when_consistent(meta_key, account): invoice = f'test-{meta_key}-invoice' meta_value = f'test-{meta_key}-value' meta = { 'invoice': invoice, meta_key: meta_value, } txn = testutil.Transaction(postings=[ (account, 100, meta), (account, -100, meta), ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) assert not list(related.report_inconsistencies()) @pytest.mark.parametrize('meta_key,account', testutil.combine_values( ['approval', 'fx-rate', 'statement'], ACCOUNTS, )) def test_consistency_check_ignored_metadata(meta_key, account): invoice = f'test-{meta_key}-invoice' txn = testutil.Transaction(postings=[ (account, 100, {'invoice': invoice, meta_key: 'credit'}), (account, -100, {'invoice': invoice, meta_key: 'debit'}), ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) assert not list(related.report_inconsistencies()) @pytest.mark.parametrize('meta_key,account', testutil.combine_values( CONSISTENT_METADATA, ACCOUNTS, )) def test_consistency_check_when_inconsistent(meta_key, account): invoice = f'test-{meta_key}-invoice' txn = testutil.Transaction(postings=[ (account, 100, {'invoice': invoice, meta_key: 'credit', 'lineno': 1}), (account, -100, {'invoice': invoice, meta_key: 'debit', 'lineno': 2}), ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) errors = list(related.report_inconsistencies()) for exp_lineno, (actual, exp_msg) in enumerate(itertools.zip_longest(errors, [ f'inconsistent {meta_key} for invoice {invoice}: credit', f'inconsistent {meta_key} for invoice {invoice}: debit', ]), 1): assert actual.message == exp_msg assert actual.entry is txn assert actual.source.get('lineno') == exp_lineno def test_consistency_check_cost(): account = ACCOUNTS[0] invoice = 'test-cost-invoice' txn = testutil.Transaction(postings=[ (account, 100, 'EUR', ('1.1251', 'USD'), {'invoice': invoice, 'lineno': 1}), (account, -100, 'EUR', ('1.125', 'USD'), {'invoice': invoice, 'lineno': 2}), ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) errors = list(related.report_inconsistencies()) for post, err in itertools.zip_longest(txn.postings, errors): assert err.message == f'inconsistent cost for invoice {invoice}: {post.cost}' assert err.entry is txn assert err.source.get('lineno') == post.meta['lineno'] def test_make_consistent_not_needed(): invoice = 'Invoices/ConsistentDoc.pdf' other_meta = {key: f'{key}.pdf' for key in CONSISTENT_METADATA} # We intentionally make inconsistencies in "minor" metadata that shouldn't # split out the group. txn = testutil.Transaction(postings=[ (ACCOUNTS[0], 20, {**other_meta, 'invoice': invoice}), (ACCOUNTS[0], 25, {'invoice': invoice}), ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) consistent = related.make_consistent() actual_key, actual_postings = next(consistent) assert actual_key == invoice assert actual_postings is related assert next(consistent, None) is None @pytest.mark.parametrize('acct_name,invoice,day', testutil.combine_values( ACCOUNTS, ['FIXME', '', None, *testutil.NON_STRING_METADATA_VALUES], itertools.count(1), )) def test_make_consistent_bad_invoice(acct_name, invoice, day): txn = testutil.Transaction(date=datetime.date(2019, 1, day), postings=[ (acct_name, index * 10, {'invoice': invoice}) for index in range(1, 4) ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) consistent = dict(related.make_consistent()) assert len(consistent) == 1 actual = consistent.get(f'{acct_name} None {invoice}') assert actual assert len(actual) == 3 for act_post, exp_post in zip(actual, txn.postings): assert act_post.units == exp_post.units assert act_post.meta.get('invoice') == invoice def test_make_consistent_across_accounts(): invoice = 'Invoices/CrossAccount.pdf' txn = testutil.Transaction(date=datetime.date(2019, 2, 1), postings=[ (acct_name, 100, {'invoice': invoice}) for acct_name in ACCOUNTS ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) consistent = dict(related.make_consistent()) assert len(consistent) == len(ACCOUNTS) for acct_name in ACCOUNTS: actual = consistent[f'{invoice} {acct_name}'] assert len(actual) == 1 assert actual[0].account == acct_name def test_make_consistent_both_invoice_and_account(): txn = testutil.Transaction(date=datetime.date(2019, 2, 2), postings=[ (acct_name, 150) for acct_name in ACCOUNTS ]) related = accrual.AccrualPostings(data.Posting.from_txn(txn)) consistent = dict(related.make_consistent()) assert len(consistent) == len(ACCOUNTS) for acct_name in ACCOUNTS: actual = consistent[f'{acct_name} None None'] assert len(actual) == 1 assert actual[0].account == acct_name def check_output(output, expect_patterns): output.seek(0) testutil.check_lines_match(iter(output), expect_patterns) def run_outgoing(invoice, postings, rt_client=None): if rt_client is None: rt_client = RTClient() if not isinstance(postings, core.RelatedPostings): postings = accruals_by_meta(postings, invoice, wrap_type=accrual.AccrualPostings) output = io.StringIO() report = accrual.OutgoingReport(rt_client, output) report.run({invoice: postings}) return output @pytest.mark.parametrize('invoice,expected', [ ('rt:505/5050', "Zero balance outstanding since 2020-05-05"), ('rt:510/5100', "Zero balance outstanding since 2020-05-10"), ('rt:510/6100', "-280.00 USD outstanding since 2020-06-10"), ('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2020-05-15",), ]) def test_balance_report(accrual_postings, invoice, expected, caplog): related = accruals_by_meta(accrual_postings, invoice, wrap_type=accrual.AccrualPostings) output = io.StringIO() report = accrual.BalanceReport(output) report.run({invoice: related}) assert not caplog.records check_output(output, [invoice, expected]) def test_outgoing_report(accrual_postings, caplog): invoice = 'rt:510/6100' output = run_outgoing(invoice, accrual_postings) rt_url = RTClient.DEFAULT_URL[:-9] rt_id_url = rf'\b{re.escape(f"{rt_url}Ticket/Display.html?id=510")}\b' contract_url = rf'\b{re.escape(f"{rt_url}Ticket/Attachment/4000/4000/contract.pdf")}\b' assert not caplog.records check_output(output, [ r'^PAYMENT FOR APPROVAL:$', r'^REQUESTOR: Mx\. 510 $', r'^TOTAL TO PAY: \$280\.00$', fr'^AGREEMENT: {contract_url}', r'^PAYMENT TO: Hon\. Mx\. 510$', r'^PAYMENT METHOD: payment method 510$', r'^BEANCOUNT ENTRIES:$', # For each transaction, check for the date line, a metadata, and the # Expenses posting. r'^\s*2020-06-10\s', fr'^\s+rt-id: "{rt_id_url}"$', r'^\s+Expenses:Services:Legal\s+220\.00 USD$', r'^\s*2020-06-12\s', fr'^\s+contract: "{contract_url}"$', r'^\s+Expenses:FilingFees\s+60\.00 USD$', ]) def test_outgoing_report_custom_field_fallbacks(accrual_postings, caplog): invoice = 'rt:510/6100' rt_client = RTClient(want_cfs=False) output = run_outgoing(invoice, accrual_postings, rt_client) assert not caplog.records check_output(output, [ r'^PAYMENT FOR APPROVAL:$', r'^REQUESTOR: $', r'^PAYMENT TO:\s*$', r'^PAYMENT METHOD:\s*$', ]) def test_outgoing_report_fx_amounts(accrual_postings, caplog): invoice = 'rt:520/5200' output = run_outgoing(invoice, accrual_postings) assert not caplog.records check_output(output, [ r'^PAYMENT FOR APPROVAL:$', r'^REQUESTOR: Mx\. 520 $', r'^TOTAL TO PAY: 1,000\.00 EUR \(\$1,100.00\)$', ]) def test_outgoing_report_without_rt_id(accrual_postings, caplog): invoice = 'rt://ticket/515/attachments/5150' output = run_outgoing(invoice, accrual_postings) assert caplog.records log = caplog.records[0] assert log.message.startswith( f"can't generate outgoings report for {invoice} because no RT ticket available:", ) assert not output.getvalue() def run_main(arglist, config=None): if config is None: config = testutil.TestConfig( books_path=testutil.test_path('books/accruals.beancount'), rt_client=RTClient(), ) output = io.StringIO() errors = io.StringIO() retcode = accrual.main(arglist, output, errors, config) return retcode, output, errors def check_main_fails(arglist, config, error_flags, error_patterns): retcode, output, errors = run_main(arglist, config) assert retcode > 16 assert (retcode - 16) & error_flags check_output(errors, error_patterns) assert not output.getvalue() @pytest.mark.parametrize('arglist', [ ['--report-type=balance', 'entity=EarlyBird'], ['--report-type=outgoing', 'entity=EarlyBird'], ]) def test_output_excludes_payments(arglist): retcode, output, errors = run_main(arglist) assert not errors.getvalue() assert retcode == 0 output.seek(0) for line in output: assert not re.match(r'\brt:4\d\b', line) @pytest.mark.parametrize('arglist,expect_invoice', [ (['40'], 'rt:40/400'), (['44/440'], 'rt:44/440'), ]) def test_output_payments_when_only_match(arglist, expect_invoice): retcode, output, errors = run_main(arglist) assert not errors.getvalue() assert retcode == 0 check_output(output, [ rf'^{re.escape(expect_invoice)}:$', r' outstanding since ', ]) @pytest.mark.parametrize('arglist', [ ['510'], ['510/6100'], ['entity=Lawyer'], ]) def test_main_outgoing_report(arglist): retcode, output, errors = run_main(arglist) assert not errors.getvalue() assert retcode == 0 rt_url = RTClient.DEFAULT_URL[:-9] rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=510>') contract_url = re.escape(f'<{rt_url}Ticket/Attachment/4000/4000/contract.pdf>') check_output(output, [ r'^REQUESTOR: Mx\. 510 $', r'^TOTAL TO PAY: \$280\.00$', r'^\s*2020-06-12\s', r'^\s+Expenses:FilingFees\s+60\.00 USD$', ]) @pytest.mark.parametrize('arglist', [ ['-t', 'balance'], ['515/5150'], ['entity=MatchingProgram'], ]) def test_main_balance_report(arglist): retcode, output, errors = run_main(arglist) assert not errors.getvalue() assert retcode == 0 check_output(output, [ r'\brt://ticket/515/attachments/5150:$', r'^\s+1,500\.00 USD outstanding since 2020-05-15$', ]) def test_main_no_books(): check_main_fails([], testutil.TestConfig(), 1 | 8, [ r':1: +no books to load in configuration\b', ]) @pytest.mark.parametrize('arglist', [ ['499'], ['505/99999'], ['entity=NonExistent'], ]) def test_main_no_matches(arglist): check_main_fails(arglist, None, 8, [ r': WARNING: no matching entries found to report$', ]) def test_main_no_rt(): config = testutil.TestConfig( books_path=testutil.test_path('books/accruals.beancount'), ) check_main_fails(['-t', 'out'], config, 4, [ r': ERROR: unable to generate outgoing report: RT client is required\b', ])