"""test_reports_ledger.py - Unit tests for general ledger report""" # Copyright © 2020 Brett Smith # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import collections import copy import datetime import io import re import pytest from . import testutil import odf.table import odf.text from beancount.core import data as bc_data from beancount import loader as bc_loader from conservancy_beancount import data from conservancy_beancount.reports import core from conservancy_beancount.reports import ledger Acct = data.Account _ledger_load = bc_loader.load_file(testutil.test_path('books/ledger.beancount')) DEFAULT_REPORT_SHEETS = [ 'Balance', 'Income', 'Expenses', 'Equity', 'Assets:Receivable', 'Liabilities:Payable', 'Assets:PayPal', 'Assets', 'Liabilities', ] PROJECT_REPORT_SHEETS = [ 'Balance', 'Income', 'Expenses', 'Assets:Receivable', 'Liabilities:Payable', ] OVERSIZE_RE = re.compile( r'^([A-Za-z0-9:]+) has ([0-9,]+) rows, over size ([0-9,]+)$' ) START_DATE = datetime.date(2018, 3, 1) MID_DATE = datetime.date(2019, 3, 1) STOP_DATE = datetime.date(2020, 3, 1) @pytest.fixture def ledger_entries(): return copy.deepcopy(_ledger_load[0]) class NotFound(Exception): pass class NoSheet(NotFound): pass class NoHeader(NotFound): pass class ExpectedPostings(core.RelatedPostings): def slice_date_range(self, start_date, end_date): postings = enumerate(self) for start_index, post in postings: if start_date <= post.meta.date: break else: start_index += 1 if end_date <= post.meta.date: end_index = start_index else: for end_index, post in postings: if end_date <= post.meta.date: break else: end_index = None return (self[:start_index].balance_at_cost(), self[start_index:end_index]) def check_report(self, ods, start_date, end_date): account = self[0].account norm_func = core.normalize_amount_func(account) open_bal, expect_posts = self.slice_date_range(start_date, end_date) open_bal = norm_func(open_bal) for sheet in ods.getElementsByType(odf.table.Table): sheet_account = sheet.getAttribute('name').replace(' ', ':') if sheet_account and account.is_under(sheet_account): break else: raise NoSheet(account) rows = iter(sheet.getElementsByType(odf.table.TableRow)) for row in rows: cells = row.childNodes if len(cells) == 2 and cells[-1].text.startswith(f'{account} '): break else: if expect_posts: raise NoHeader(account) else: return closing_bal = norm_func(expect_posts.balance_at_cost()) if account.is_under('Assets', 'Equity', 'Liabilities'): opening_row = testutil.ODSCell.from_row(next(rows)) assert opening_row[0].value == start_date assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0') closing_bal += open_bal for expected in expect_posts: cells = iter(testutil.ODSCell.from_row(next(rows))) assert next(cells).value == expected.meta.date assert next(cells).text == (expected.meta.get('entity') or '') assert next(cells).text == (expected.meta.txn.narration or '') if expected.cost is None: assert not next(cells).text assert next(cells).value == norm_func(expected.units.number) else: assert next(cells).value == norm_func(expected.units.number) assert next(cells).value == norm_func(expected.at_cost().number) closing_row = testutil.ODSCell.from_row(next(rows)) assert closing_row[0].value == end_date assert closing_row[4].text == closing_bal.format(None, empty='0', sep='\0') def get_sheet_names(ods): return [sheet.getAttribute('name').replace(' ', ':') for sheet in ods.getElementsByType(odf.table.Table)] def check_oversize_logs(caplog, accounts, sheet_size): actual = {} for log in caplog.records: match = OVERSIZE_RE.match(log.message) if match: assert int(match.group(3).replace(',', '')) == sheet_size actual[match.group(1)] = int(match.group(2).replace(',', '')) expected = {name: size for name, size in accounts.items() if size > sheet_size} assert actual == expected def test_plan_sheets_no_change(): have = { Acct('Assets:Cash'): 10, Acct('Income:Donations'): 20, } want = ['Assets', 'Income'] actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100) assert actual == want @pytest.mark.parametrize('have', [ {}, {Acct('Income:Other'): 10}, {Acct('Assets:Checking'): 20, Acct('Expenses:Other'): 15}, ]) def test_plan_sheets_includes_accounts_without_transactions(have): want = ['Assets', 'Income', 'Expenses'] actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100) assert actual == want def test_plan_sheets_single_split(): have = { Acct('Assets:Cash'): 60, Acct('Assets:Checking'): 80, Acct('Income:Donations'): 50, Acct('Expenses:Travel'): 90, Acct('Expenses:FilingFees'): 25, } want = ['Assets', 'Income', 'Expenses'] actual = ledger.LedgerODS.plan_sheets(have, want, 100) assert actual == [ 'Assets:Checking', 'Assets', 'Income', 'Expenses:Travel', 'Expenses', ] def test_plan_sheets_split_subtree(): have = { Acct('Assets:Bank1:Checking'): 80, Acct('Assets:Bank1:Savings'): 10, Acct('Assets:Cash:USD'): 20, Acct('Assets:Cash:EUR'): 15, } actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100) assert actual == ['Assets:Bank1', 'Assets'] def test_plan_sheets_ambiguous_split(): have = { Acct('Assets:Bank1:Checking'): 80, Acct('Assets:Bank1:Savings'): 40, Acct('Assets:Receivable:Accounts'): 40, Acct('Assets:Cash'): 10, } actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100) # :Savings cannot fit with :Checking, so it's important that the return # value disambiguate that. assert actual == ['Assets:Bank1:Checking', 'Assets'] def test_plan_sheets_oversize(caplog): have = { Acct('Assets:Checking'): 150, Acct('Assets:Cash'): 50, } actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100) assert actual == ['Assets:Checking', 'Assets'] check_oversize_logs(caplog, have, 100) def test_plan_sheets_all_oversize(caplog): have = { Acct('Assets:Checking'): 150, Acct('Assets:Cash'): 150, } actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100) # In this case, each account should appear in alphabetical order. assert actual == ['Assets:Cash', 'Assets:Checking'] check_oversize_logs(caplog, have, 100) def test_plan_sheets_full_split_required(caplog): have = { Acct('Assets:Bank:Savings'): 98, Acct('Assets:Bank:Checking'): 96, Acct('Assets:Bank:Investment'): 94, } actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100) assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets'] assert not caplog.records @pytest.mark.parametrize('start_date,stop_date', [ (START_DATE, STOP_DATE), (START_DATE, MID_DATE), (MID_DATE, STOP_DATE), (START_DATE.replace(month=6), START_DATE.replace(month=12)), (STOP_DATE, STOP_DATE.replace(month=12)), ]) def test_date_range_report(ledger_entries, start_date, stop_date): postings = list(data.Posting.from_entries(ledger_entries)) report = ledger.LedgerODS(start_date, stop_date) report.write(iter(postings)) for _, expected in ExpectedPostings.group_by_account(postings): expected.check_report(report.document, start_date, stop_date) @pytest.mark.parametrize('sheet_names', [ ('Income', 'Expenses'), ('Assets:Receivable', 'Liabilities:Payable'), ]) def test_account_names_report(ledger_entries, sheet_names): postings = list(data.Posting.from_entries(ledger_entries)) report = ledger.LedgerODS(START_DATE, STOP_DATE, sheet_names=sheet_names) report.write(iter(postings)) for key, expected in ExpectedPostings.group_by_account(postings): should_find = key.startswith(sheet_names) try: expected.check_report(report.document, START_DATE, STOP_DATE) except NotFound: assert not should_find else: assert should_find def run_main(arglist, config=None): if config is None: config = testutil.TestConfig( books_path=testutil.test_path('books/ledger.beancount'), rt_client=testutil.RTClient(), ) arglist.insert(0, '--output-file=-') output = io.BytesIO() errors = io.StringIO() retcode = ledger.main(arglist, output, errors, config) output.seek(0) return retcode, output, errors def test_main(ledger_entries): retcode, output, errors = run_main([ '-b', START_DATE.isoformat(), '-e', STOP_DATE.isoformat(), ]) assert not errors.getvalue() assert retcode == 0 ods = odf.opendocument.load(output) assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:] postings = data.Posting.from_entries(ledger_entries) for _, expected in ExpectedPostings.group_by_account(postings): expected.check_report(ods, START_DATE, STOP_DATE) @pytest.mark.parametrize('project,start_date,stop_date', [ ('eighteen', START_DATE, MID_DATE.replace(day=30)), ('nineteen', MID_DATE, STOP_DATE), ]) def test_main_project_report(ledger_entries, project, start_date, stop_date): postings = data.Posting.from_entries(ledger_entries) for key, related in ExpectedPostings.group_by_meta(postings, 'project'): if key == project: break assert key == project retcode, output, errors = run_main([ f'--begin={start_date.isoformat()}', f'--end={stop_date.isoformat()}', project, ]) assert not errors.getvalue() assert retcode == 0 ods = odf.opendocument.load(output) assert get_sheet_names(ods) == PROJECT_REPORT_SHEETS[:] for _, expected in ExpectedPostings.group_by_account(related): expected.check_report(ods, start_date, stop_date) def test_main_no_postings(caplog): retcode, output, errors = run_main(['NonexistentProject']) assert retcode == 24 assert any(log.levelname == 'WARNING' for log in caplog.records)