diff --git a/tests/test_reports_ledger.py b/tests/test_reports_ledger.py index 20964edf8b41c6f3b726b22ff4a7b3905f74ce52..db95bfb50a27154b5812ecb4af77bcd84de5c695 100644 --- a/tests/test_reports_ledger.py +++ b/tests/test_reports_ledger.py @@ -67,11 +67,48 @@ STOP_DATE = datetime.date(2020, 3, 1) def ledger_entries(): return copy.deepcopy(_ledger_load[0]) +def iter_accounts(entries): + for entry in entries: + if isinstance(entry, bc_data.Open): + yield entry.account + class NotFound(Exception): pass class NoSheet(NotFound): pass class NoHeader(NotFound): pass class ExpectedPostings(core.RelatedPostings): + @classmethod + def find_section(cls, ods, account): + for sheet in ods.getElementsByType(odf.table.Table): + sheet_account = sheet.getAttribute('name').replace(' ', ':') + if sheet_account and account.is_under(sheet_account): + break + else: + raise NoSheet(account) + rows = iter(sheet.getElementsByType(odf.table.TableRow)) + for row in rows: + cells = row.childNodes + if len(cells) == 2 and cells[-1].text.startswith(f'{account} '): + break + else: + raise NoHeader(account) + return rows + + @classmethod + def check_not_in_report(cls, ods, *accounts): + for account in accounts: + with pytest.raises(NotFound): + cls.find_section(ods, data.Account(account)) + + @classmethod + def check_in_report(cls, ods, account, start_date=START_DATE, end_date=STOP_DATE): + date = end_date + datetime.timedelta(days=1) + txn = testutil.Transaction(date=date, postings=[ + (account, 0), + ]) + related = cls(data.Posting.from_txn(txn)) + related.check_report(ods, start_date, end_date) + def slice_date_range(self, start_date, end_date): postings = enumerate(self) for start_index, post in postings: @@ -90,29 +127,14 @@ class ExpectedPostings(core.RelatedPostings): return (self[:start_index].balance_at_cost(), self[start_index:end_index]) - def check_report(self, ods, start_date, end_date): + def check_report(self, ods, start_date, end_date, expect_totals=True): account = self[0].account norm_func = core.normalize_amount_func(account) open_bal, expect_posts = self.slice_date_range(start_date, end_date) open_bal = norm_func(open_bal) - for sheet in ods.getElementsByType(odf.table.Table): - sheet_account = sheet.getAttribute('name').replace(' ', ':') - if sheet_account and account.is_under(sheet_account): - break - else: - raise NoSheet(account) - rows = iter(sheet.getElementsByType(odf.table.TableRow)) - for row in rows: - cells = row.childNodes - if len(cells) == 2 and cells[-1].text.startswith(f'{account} '): - break - else: - if expect_posts: - raise NoHeader(account) - else: - return closing_bal = norm_func(expect_posts.balance_at_cost()) - if account.is_under('Assets', 'Liabilities'): + rows = self.find_section(ods, account) + if expect_totals and account.is_under('Assets', 'Liabilities'): opening_row = testutil.ODSCell.from_row(next(rows)) assert opening_row[0].value == start_date assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0') @@ -128,10 +150,11 @@ class ExpectedPostings(core.RelatedPostings): else: assert next(cells).value == norm_func(expected.units.number) assert next(cells).value == norm_func(expected.at_cost().number) - closing_row = testutil.ODSCell.from_row(next(rows)) - assert closing_row[0].value == end_date - empty = '$0.00' if expect_posts else '0' - assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0') + if expect_totals: + closing_row = testutil.ODSCell.from_row(next(rows)) + assert closing_row[0].value == end_date + empty = '$0.00' if expect_posts else '0' + assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0') def get_sheet_names(ods): @@ -236,6 +259,14 @@ def test_plan_sheets_full_split_required(caplog): assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets'] assert not caplog.records +def build_report(ledger_entries, start_date, stop_date, *args, **kwargs): + postings = list(data.Posting.from_entries(iter(ledger_entries))) + with clean_account_meta(): + data.Account.load_openings_and_closings(iter(ledger_entries)) + report = ledger.LedgerODS(start_date, stop_date, *args, **kwargs) + report.write(iter(postings)) + return postings, report + @pytest.mark.parametrize('start_date,stop_date', [ (START_DATE, STOP_DATE), (START_DATE, MID_DATE), @@ -244,32 +275,49 @@ def test_plan_sheets_full_split_required(caplog): (STOP_DATE, STOP_DATE.replace(month=12)), ]) def test_date_range_report(ledger_entries, start_date, stop_date): - postings = list(data.Posting.from_entries(iter(ledger_entries))) - with clean_account_meta(): - data.Account.load_openings_and_closings(iter(ledger_entries)) - report = ledger.LedgerODS(start_date, stop_date) - report.write(iter(postings)) - for _, expected in ExpectedPostings.group_by_account(postings): - expected.check_report(report.document, start_date, stop_date) + postings, report = build_report(ledger_entries, start_date, stop_date) + expected = dict(ExpectedPostings.group_by_account(postings)) + for account in iter_accounts(ledger_entries): + try: + related = expected[account] + except KeyError: + ExpectedPostings.check_in_report(report.document, account, start_date, stop_date) + else: + related.check_report(report.document, start_date, stop_date) + +@pytest.mark.parametrize('tot_accts', [ + (), + ('Assets', 'Liabilities'), + ('Income', 'Expenses'), + ('Assets', 'Liabilities', 'Income', 'Expenses'), +]) +def test_report_filter_totals(ledger_entries, tot_accts): + postings, report = build_report(ledger_entries, START_DATE, STOP_DATE, + totals_with_entries=tot_accts, + totals_without_entries=tot_accts) + expected = dict(ExpectedPostings.group_by_account(postings)) + for account in iter_accounts(ledger_entries): + expect_totals = account.startswith(tot_accts) + if account in expected and expected[account][-1].meta.date >= START_DATE: + expected[account].check_report(report.document, START_DATE, STOP_DATE, + expect_totals=expect_totals) + elif expect_totals: + ExpectedPostings.check_in_report(report.document, account) + else: + ExpectedPostings.check_not_in_report(report.document, account) @pytest.mark.parametrize('accounts', [ ('Income', 'Expenses'), ('Assets:Receivable', 'Liabilities:Payable'), ]) def test_account_names_report(ledger_entries, accounts): - postings = list(data.Posting.from_entries(iter(ledger_entries))) - with clean_account_meta(): - data.Account.load_openings_and_closings(iter(ledger_entries)) - report = ledger.LedgerODS(START_DATE, STOP_DATE, accounts=accounts) - report.write(iter(postings)) - for key, expected in ExpectedPostings.group_by_account(postings): - should_find = key.startswith(accounts) - try: - expected.check_report(report.document, START_DATE, STOP_DATE) - except NotFound: - assert not should_find + postings, report = build_report(ledger_entries, START_DATE, STOP_DATE, accounts) + expected = dict(ExpectedPostings.group_by_account(postings)) + for account in iter_accounts(ledger_entries): + if account.startswith(accounts): + expected[account].check_report(report.document, START_DATE, STOP_DATE) else: - assert should_find + ExpectedPostings.check_not_in_report(report.document, account) def run_main(arglist, config=None): if config is None: @@ -295,9 +343,13 @@ def test_main(ledger_entries): assert retcode == 0 ods = odf.opendocument.load(output) assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:] - postings = data.Posting.from_entries(ledger_entries) - for _, expected in ExpectedPostings.group_by_account(postings): - expected.check_report(ods, START_DATE, STOP_DATE) + postings = data.Posting.from_entries(iter(ledger_entries)) + expected = dict(ExpectedPostings.group_by_account(postings)) + for account in iter_accounts(ledger_entries): + try: + expected[account].check_report(ods, START_DATE, STOP_DATE) + except KeyError: + ExpectedPostings.check_in_report(ods, account) @pytest.mark.parametrize('acct_arg', [ 'Liabilities', @@ -351,7 +403,7 @@ def test_main_account_classification_splits_hierarchy(ledger_entries): ('nineteen', MID_DATE, STOP_DATE), ]) def test_main_project_report(ledger_entries, project, start_date, stop_date): - postings = data.Posting.from_entries(ledger_entries) + postings = data.Posting.from_entries(iter(ledger_entries)) for key, related in ExpectedPostings.group_by_meta(postings, 'project'): if key == project: break @@ -365,8 +417,12 @@ def test_main_project_report(ledger_entries, project, start_date, stop_date): assert retcode == 0 ods = odf.opendocument.load(output) assert get_sheet_names(ods) == PROJECT_REPORT_SHEETS[:] - for _, expected in ExpectedPostings.group_by_account(related): - expected.check_report(ods, start_date, stop_date) + expected = dict(ExpectedPostings.group_by_account(related)) + for account in iter_accounts(ledger_entries): + try: + expected[account].check_report(ods, start_date, stop_date) + except KeyError: + ExpectedPostings.check_not_in_report(ods, account) @pytest.mark.parametrize('arg', [ 'Assets:NoneSuchBank',