Changeset - 6c7603fa6c73
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-07-21 02:45:14
brettcsmith@brettcsmith.org
ledger: Add options to control account totals display.
3 files changed with 140 insertions and 52 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/ledger.py
Show inline comments
...
 
@@ -118,12 +118,20 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
                 accounts: Optional[Sequence[str]]=None,
 
                 rt_wrapper: Optional[rtutil.RT]=None,
 
                 sheet_size: Optional[int]=None,
 
                 totals_with_entries: Optional[Sequence[str]]=None,
 
                 totals_without_entries: Optional[Sequence[str]]=None,
 
    ) -> None:
 
        if sheet_size is None:
 
            sheet_size = self.SHEET_SIZE
 
        if totals_with_entries is None:
 
            totals_with_entries = [s for s in self.ACCOUNT_COLUMNS if ':' not in s]
 
        if totals_without_entries is None:
 
            totals_without_entries = totals_with_entries
 
        super().__init__(rt_wrapper)
 
        self.date_range = ranges.DateRange(start_date, stop_date)
 
        self.sheet_size = sheet_size
 
        self.totals_with_entries = totals_with_entries
 
        self.totals_without_entries = totals_without_entries
 

	
 
        if accounts is None:
 
            self.accounts = set(data.Account.iter_accounts())
...
 
@@ -312,7 +320,7 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
            self.balance_cell(self.norm_func(balance), stylename=self.style_bold),
 
        )
 

	
 
    def start_section(self, key: data.Account) -> None:
 
    def start_section(self, key: data.Account, *, force_total: bool=False) -> None:
 
        self.add_row()
 
        self.add_row(
 
            odf.table.TableCell(),
...
 
@@ -325,7 +333,8 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
            ),
 
        )
 
        self.norm_func = core.normalize_amount_func(key)
 
        self._report_section_balance(key, 'start')
 
        if force_total or key.is_under(*self.totals_with_entries):
 
            self._report_section_balance(key, 'start')
 

	
 
    def end_section(self, key: data.Account) -> None:
 
        self._report_section_balance(key, 'stop')
...
 
@@ -409,8 +418,10 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
            postings = self.account_groups[account]
 
            if postings:
 
                super().write(postings)
 
            elif account.is_open_on_date(self.date_range.start):
 
                self.start_section(account)
 
            elif not account.is_open_on_date(self.date_range.start):
 
                pass
 
            elif account.is_under(*self.totals_without_entries):
 
                self.start_section(account, force_total=True)
 
                self.end_section(account)
 
        for index in range(using_sheet_index + 1, len(sheet_names)):
 
            self.start_sheet(sheet_names[index])
...
 
@@ -450,6 +461,23 @@ date was also not specified.
 
multiple times. You can specify a part of the account hierarchy, or an account
 
classification from metadata. If not specified, the default set adapts to your
 
search criteria.
 
""")
 
    parser.add_argument(
 
        '--show-totals', '-S',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""When entries for this account appear in the report, include
 
account balance(s) as well. You can specify this option multiple times. Pass in
 
a part of the account hierarchy. The default is all accounts.
 
""")
 
    parser.add_argument(
 
        '--add-totals', '-T',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""When an account could be included in the report but does not
 
have any entries in the date range, include a header and account balance(s) for
 
it. You can specify this option multiple times. Pass in a part of the account
 
hierarchy. The default set adapts to your search criteria.
 
""")
 
    parser.add_argument(
 
        '--sheet-size', '--size',
...
 
@@ -479,6 +507,8 @@ metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.add_totals is None and args.search_terms:
 
        args.add_totals = []
 
    if args.accounts is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.accounts = [
...
 
@@ -548,6 +578,8 @@ def main(arglist: Optional[Sequence[str]]=None,
 
            args.accounts,
 
            rt_wrapper,
 
            args.sheet_size,
 
            args.show_totals,
 
            args.add_totals,
 
        )
 
    except ValueError as error:
 
        logger.error("%s: %r", *error.args)
setup.py
Show inline comments
...
 
@@ -5,7 +5,7 @@ from setuptools import setup
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.5.11',
 
    version='1.5.12',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
tests/test_reports_ledger.py
Show inline comments
...
 
@@ -67,11 +67,48 @@ STOP_DATE = datetime.date(2020, 3, 1)
 
def ledger_entries():
 
    return copy.deepcopy(_ledger_load[0])
 

	
 
def iter_accounts(entries):
 
    for entry in entries:
 
        if isinstance(entry, bc_data.Open):
 
            yield entry.account
 

	
 
class NotFound(Exception): pass
 
class NoSheet(NotFound): pass
 
class NoHeader(NotFound): pass
 

	
 
class ExpectedPostings(core.RelatedPostings):
 
    @classmethod
 
    def find_section(cls, ods, account):
 
        for sheet in ods.getElementsByType(odf.table.Table):
 
            sheet_account = sheet.getAttribute('name').replace(' ', ':')
 
            if sheet_account and account.is_under(sheet_account):
 
                break
 
        else:
 
            raise NoSheet(account)
 
        rows = iter(sheet.getElementsByType(odf.table.TableRow))
 
        for row in rows:
 
            cells = row.childNodes
 
            if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
 
                break
 
        else:
 
            raise NoHeader(account)
 
        return rows
 

	
 
    @classmethod
 
    def check_not_in_report(cls, ods, *accounts):
 
        for account in accounts:
 
            with pytest.raises(NotFound):
 
                cls.find_section(ods, data.Account(account))
 

	
 
    @classmethod
 
    def check_in_report(cls, ods, account, start_date=START_DATE, end_date=STOP_DATE):
 
        date = end_date + datetime.timedelta(days=1)
 
        txn = testutil.Transaction(date=date, postings=[
 
            (account, 0),
 
        ])
 
        related = cls(data.Posting.from_txn(txn))
 
        related.check_report(ods, start_date, end_date)
 

	
 
    def slice_date_range(self, start_date, end_date):
 
        postings = enumerate(self)
 
        for start_index, post in postings:
...
 
@@ -90,29 +127,14 @@ class ExpectedPostings(core.RelatedPostings):
 
        return (self[:start_index].balance_at_cost(),
 
                self[start_index:end_index])
 

	
 
    def check_report(self, ods, start_date, end_date):
 
    def check_report(self, ods, start_date, end_date, expect_totals=True):
 
        account = self[0].account
 
        norm_func = core.normalize_amount_func(account)
 
        open_bal, expect_posts = self.slice_date_range(start_date, end_date)
 
        open_bal = norm_func(open_bal)
 
        for sheet in ods.getElementsByType(odf.table.Table):
 
            sheet_account = sheet.getAttribute('name').replace(' ', ':')
 
            if sheet_account and account.is_under(sheet_account):
 
                break
 
        else:
 
            raise NoSheet(account)
 
        rows = iter(sheet.getElementsByType(odf.table.TableRow))
 
        for row in rows:
 
            cells = row.childNodes
 
            if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
 
                break
 
        else:
 
            if expect_posts:
 
                raise NoHeader(account)
 
            else:
 
                return
 
        closing_bal = norm_func(expect_posts.balance_at_cost())
 
        if account.is_under('Assets', 'Liabilities'):
 
        rows = self.find_section(ods, account)
 
        if expect_totals and account.is_under('Assets', 'Liabilities'):
 
            opening_row = testutil.ODSCell.from_row(next(rows))
 
            assert opening_row[0].value == start_date
 
            assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0')
...
 
@@ -128,10 +150,11 @@ class ExpectedPostings(core.RelatedPostings):
 
            else:
 
                assert next(cells).value == norm_func(expected.units.number)
 
                assert next(cells).value == norm_func(expected.at_cost().number)
 
        closing_row = testutil.ODSCell.from_row(next(rows))
 
        assert closing_row[0].value == end_date
 
        empty = '$0.00' if expect_posts else '0'
 
        assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0')
 
        if expect_totals:
 
            closing_row = testutil.ODSCell.from_row(next(rows))
 
            assert closing_row[0].value == end_date
 
            empty = '$0.00' if expect_posts else '0'
 
            assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0')
 

	
 

	
 
def get_sheet_names(ods):
...
 
@@ -236,6 +259,14 @@ def test_plan_sheets_full_split_required(caplog):
 
    assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets']
 
    assert not caplog.records
 

	
 
def build_report(ledger_entries, start_date, stop_date, *args, **kwargs):
 
    postings = list(data.Posting.from_entries(iter(ledger_entries)))
 
    with clean_account_meta():
 
        data.Account.load_openings_and_closings(iter(ledger_entries))
 
        report = ledger.LedgerODS(start_date, stop_date, *args, **kwargs)
 
        report.write(iter(postings))
 
    return postings, report
 

	
 
@pytest.mark.parametrize('start_date,stop_date', [
 
    (START_DATE, STOP_DATE),
 
    (START_DATE, MID_DATE),
...
 
@@ -244,32 +275,49 @@ def test_plan_sheets_full_split_required(caplog):
 
    (STOP_DATE, STOP_DATE.replace(month=12)),
 
])
 
def test_date_range_report(ledger_entries, start_date, stop_date):
 
    postings = list(data.Posting.from_entries(iter(ledger_entries)))
 
    with clean_account_meta():
 
        data.Account.load_openings_and_closings(iter(ledger_entries))
 
        report = ledger.LedgerODS(start_date, stop_date)
 
        report.write(iter(postings))
 
    for _, expected in ExpectedPostings.group_by_account(postings):
 
        expected.check_report(report.document, start_date, stop_date)
 
    postings, report = build_report(ledger_entries, start_date, stop_date)
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        try:
 
            related = expected[account]
 
        except KeyError:
 
            ExpectedPostings.check_in_report(report.document, account, start_date, stop_date)
 
        else:
 
            related.check_report(report.document, start_date, stop_date)
 

	
 
@pytest.mark.parametrize('tot_accts', [
 
    (),
 
    ('Assets', 'Liabilities'),
 
    ('Income', 'Expenses'),
 
    ('Assets', 'Liabilities', 'Income', 'Expenses'),
 
])
 
def test_report_filter_totals(ledger_entries, tot_accts):
 
    postings, report = build_report(ledger_entries, START_DATE, STOP_DATE,
 
                                    totals_with_entries=tot_accts,
 
                                    totals_without_entries=tot_accts)
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        expect_totals = account.startswith(tot_accts)
 
        if account in expected and expected[account][-1].meta.date >= START_DATE:
 
            expected[account].check_report(report.document, START_DATE, STOP_DATE,
 
                                           expect_totals=expect_totals)
 
        elif expect_totals:
 
            ExpectedPostings.check_in_report(report.document, account)
 
        else:
 
            ExpectedPostings.check_not_in_report(report.document, account)
 

	
 
@pytest.mark.parametrize('accounts', [
 
    ('Income', 'Expenses'),
 
    ('Assets:Receivable', 'Liabilities:Payable'),
 
])
 
def test_account_names_report(ledger_entries, accounts):
 
    postings = list(data.Posting.from_entries(iter(ledger_entries)))
 
    with clean_account_meta():
 
        data.Account.load_openings_and_closings(iter(ledger_entries))
 
        report = ledger.LedgerODS(START_DATE, STOP_DATE, accounts=accounts)
 
        report.write(iter(postings))
 
    for key, expected in ExpectedPostings.group_by_account(postings):
 
        should_find = key.startswith(accounts)
 
        try:
 
            expected.check_report(report.document, START_DATE, STOP_DATE)
 
        except NotFound:
 
            assert not should_find
 
    postings, report = build_report(ledger_entries, START_DATE, STOP_DATE, accounts)
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        if account.startswith(accounts):
 
            expected[account].check_report(report.document, START_DATE, STOP_DATE)
 
        else:
 
            assert should_find
 
            ExpectedPostings.check_not_in_report(report.document, account)
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
...
 
@@ -295,9 +343,13 @@ def test_main(ledger_entries):
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:]
 
    postings = data.Posting.from_entries(ledger_entries)
 
    for _, expected in ExpectedPostings.group_by_account(postings):
 
        expected.check_report(ods, START_DATE, STOP_DATE)
 
    postings = data.Posting.from_entries(iter(ledger_entries))
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        try:
 
            expected[account].check_report(ods, START_DATE, STOP_DATE)
 
        except KeyError:
 
            ExpectedPostings.check_in_report(ods, account)
 

	
 
@pytest.mark.parametrize('acct_arg', [
 
    'Liabilities',
...
 
@@ -351,7 +403,7 @@ def test_main_account_classification_splits_hierarchy(ledger_entries):
 
    ('nineteen', MID_DATE, STOP_DATE),
 
])
 
def test_main_project_report(ledger_entries, project, start_date, stop_date):
 
    postings = data.Posting.from_entries(ledger_entries)
 
    postings = data.Posting.from_entries(iter(ledger_entries))
 
    for key, related in ExpectedPostings.group_by_meta(postings, 'project'):
 
        if key == project:
 
            break
...
 
@@ -365,8 +417,12 @@ def test_main_project_report(ledger_entries, project, start_date, stop_date):
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    assert get_sheet_names(ods) == PROJECT_REPORT_SHEETS[:]
 
    for _, expected in ExpectedPostings.group_by_account(related):
 
        expected.check_report(ods, start_date, stop_date)
 
    expected = dict(ExpectedPostings.group_by_account(related))
 
    for account in iter_accounts(ledger_entries):
 
        try:
 
            expected[account].check_report(ods, start_date, stop_date)
 
        except KeyError:
 
            ExpectedPostings.check_not_in_report(ods, account)
 

	
 
@pytest.mark.parametrize('arg', [
 
    'Assets:NoneSuchBank',
0 comments (0 inline, 0 general)