Changeset - 6c7603fa6c73
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-07-21 02:45:14
brettcsmith@brettcsmith.org
ledger: Add options to control account totals display.
3 files changed with 140 insertions and 52 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/ledger.py
Show inline comments
...
 
@@ -109,30 +109,38 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
    # formats (Excel 2007 supports over 1 million rows per worksheet),
 
    # keeping the default limit conservative seems good to avoid running into
 
    # other limits (like the number of hyperlinks per worksheet), plus just
 
    # better for human organization and readability.
 
    SHEET_SIZE = 65500
 

	
 
    def __init__(self,
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
                 accounts: Optional[Sequence[str]]=None,
 
                 rt_wrapper: Optional[rtutil.RT]=None,
 
                 sheet_size: Optional[int]=None,
 
                 totals_with_entries: Optional[Sequence[str]]=None,
 
                 totals_without_entries: Optional[Sequence[str]]=None,
 
    ) -> None:
 
        if sheet_size is None:
 
            sheet_size = self.SHEET_SIZE
 
        if totals_with_entries is None:
 
            totals_with_entries = [s for s in self.ACCOUNT_COLUMNS if ':' not in s]
 
        if totals_without_entries is None:
 
            totals_without_entries = totals_with_entries
 
        super().__init__(rt_wrapper)
 
        self.date_range = ranges.DateRange(start_date, stop_date)
 
        self.sheet_size = sheet_size
 
        self.totals_with_entries = totals_with_entries
 
        self.totals_without_entries = totals_without_entries
 

	
 
        if accounts is None:
 
            self.accounts = set(data.Account.iter_accounts())
 
            self.required_sheet_names = list(self.ACCOUNT_COLUMNS)
 
        else:
 
            self.accounts = set()
 
            self.required_sheet_names = []
 
            for acct_spec in accounts:
 
                subaccounts = frozenset(data.Account.iter_accounts_by_hierarchy(acct_spec))
 
                if subaccounts:
 
                    self.accounts.update(subaccounts)
 
                    self._require_sheet(acct_spec)
...
 
@@ -303,38 +311,39 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
                balance = related.period_bal
 
                description = "Period Total"
 
        self.add_row(
 
            self.date_cell(date, stylename=self.merge_styles(
 
                self.style_bold, self.style_date,
 
            )),
 
            odf.table.TableCell(),
 
            self.string_cell(description, stylename=self.style_bold),
 
            odf.table.TableCell(),
 
            self.balance_cell(self.norm_func(balance), stylename=self.style_bold),
 
        )
 

	
 
    def start_section(self, key: data.Account) -> None:
 
    def start_section(self, key: data.Account, *, force_total: bool=False) -> None:
 
        self.add_row()
 
        self.add_row(
 
            odf.table.TableCell(),
 
            self.string_cell(
 
                f"{key} Ledger"
 
                f" From {self.date_range.start.isoformat()}"
 
                f" To {self.date_range.stop.isoformat()}",
 
                stylename=self.style_bold,
 
                numbercolumnsspanned=len(self.sheet_columns) - 1,
 
            ),
 
        )
 
        self.norm_func = core.normalize_amount_func(key)
 
        self._report_section_balance(key, 'start')
 
        if force_total or key.is_under(*self.totals_with_entries):
 
            self._report_section_balance(key, 'start')
 

	
 
    def end_section(self, key: data.Account) -> None:
 
        self._report_section_balance(key, 'stop')
 

	
 
    def write_row(self, row: data.Posting) -> None:
 
        if row.cost is None:
 
            amount_cell = odf.table.TableCell()
 
        else:
 
            amount_cell = self.currency_cell(self.norm_func(row.units))
 
        self.add_row(
 
            self.date_cell(row.meta.date),
 
            self.string_cell(row.meta.get('entity') or ''),
...
 
@@ -400,26 +409,28 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
            tally_by_account, self.required_sheet_names, self.sheet_size,
 
        )
 
        using_sheet_index = -1
 
        for sheet_index, account in core.sort_and_filter_accounts(
 
                tally_by_account, sheet_names,
 
        ):
 
            while using_sheet_index < sheet_index:
 
                using_sheet_index += 1
 
                self.start_sheet(sheet_names[using_sheet_index])
 
            postings = self.account_groups[account]
 
            if postings:
 
                super().write(postings)
 
            elif account.is_open_on_date(self.date_range.start):
 
                self.start_section(account)
 
            elif not account.is_open_on_date(self.date_range.start):
 
                pass
 
            elif account.is_under(*self.totals_without_entries):
 
                self.start_section(account, force_total=True)
 
                self.end_section(account)
 
        for index in range(using_sheet_index + 1, len(sheet_names)):
 
            self.start_sheet(sheet_names[index])
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
...
 
@@ -441,24 +452,41 @@ The default is one year ago.
 
The default is a year after the start date, or 30 days from today if the start
 
date was also not specified.
 
""")
 
    parser.add_argument(
 
        '--account', '-a',
 
        dest='accounts',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""Show this account in the report. You can specify this option
 
multiple times. You can specify a part of the account hierarchy, or an account
 
classification from metadata. If not specified, the default set adapts to your
 
search criteria.
 
""")
 
    parser.add_argument(
 
        '--show-totals', '-S',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""When entries for this account appear in the report, include
 
account balance(s) as well. You can specify this option multiple times. Pass in
 
a part of the account hierarchy. The default is all accounts.
 
""")
 
    parser.add_argument(
 
        '--add-totals', '-T',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""When an account could be included in the report but does not
 
have any entries in the date range, include a header and account balance(s) for
 
it. You can specify this option multiple times. Pass in a part of the account
 
hierarchy. The default set adapts to your search criteria.
 
""")
 
    parser.add_argument(
 
        '--sheet-size', '--size',
 
        metavar='SIZE',
 
        type=int,
 
        default=LedgerODS.SHEET_SIZE,
 
        help="""Try to limit sheets to this many rows. The report will
 
automatically create new sheets to make this happen. When that's not possible,
 
it will issue a warning.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
...
 
@@ -470,24 +498,26 @@ The default is `LedgerReport_<StartDate>_<StopDate>.ods`.
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.add_totals is None and args.search_terms:
 
        args.add_totals = []
 
    if args.accounts is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.accounts = [
 
                'Income',
 
                'Expenses',
 
                'Assets:Receivable',
 
                'Assets:Prepaid',
 
                'Liabilities:UnearnedIncome',
 
                'Liabilities:Payable',
 
            ]
 
        else:
 
            args.accounts = list(LedgerODS.ACCOUNT_COLUMNS)
...
 
@@ -539,24 +569,26 @@ def main(arglist: Optional[Sequence[str]]=None,
 
        postings = search_term.filter_postings(postings)
 

	
 
    rt_wrapper = config.rt_wrapper()
 
    if rt_wrapper is None:
 
        logger.warning("could not initialize RT client; spreadsheet links will be broken")
 
    try:
 
        report = LedgerODS(
 
            args.start_date,
 
            args.stop_date,
 
            args.accounts,
 
            rt_wrapper,
 
            args.sheet_size,
 
            args.show_totals,
 
            args.add_totals,
 
        )
 
    except ValueError as error:
 
        logger.error("%s: %r", *error.args)
 
        return 2
 
    report.write(postings)
 
    if not any(report.account_groups.values()):
 
        logger.warning("no matching postings found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 

	
 
    if args.output_file is None:
 
        out_dir_path = config.repository_path() or Path()
 
        args.output_file = out_dir_path / 'LedgerReport_{}_{}.ods'.format(
setup.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.5.11',
 
    version='1.5.12',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'babel>=2.6',  # Debian:python3-babel
 
        'beancount>=2.2',  # Debian:beancount
 
        # 1.4.1 crashes when trying to save some documents.
 
        'odfpy>=1.4.0,!=1.4.1',  # Debian:python3-odf
 
        'PyYAML>=3.0',  # Debian:python3-yaml
 
        'regex',  # Debian:python3-regex
 
        'rt>=2.0',
tests/test_reports_ledger.py
Show inline comments
...
 
@@ -58,89 +58,112 @@ PROJECT_REPORT_SHEETS = DEFAULT_REPORT_SHEETS[:5] + [
 
del PROJECT_REPORT_SHEETS[3]
 
OVERSIZE_RE = re.compile(
 
    r'^([A-Za-z0-9:]+) has ([0-9,]+) rows, over size ([0-9,]+)$'
 
)
 
START_DATE = datetime.date(2018, 3, 1)
 
MID_DATE = datetime.date(2019, 3, 1)
 
STOP_DATE = datetime.date(2020, 3, 1)
 

	
 
@pytest.fixture
 
def ledger_entries():
 
    return copy.deepcopy(_ledger_load[0])
 

	
 
def iter_accounts(entries):
 
    for entry in entries:
 
        if isinstance(entry, bc_data.Open):
 
            yield entry.account
 

	
 
class NotFound(Exception): pass
 
class NoSheet(NotFound): pass
 
class NoHeader(NotFound): pass
 

	
 
class ExpectedPostings(core.RelatedPostings):
 
    @classmethod
 
    def find_section(cls, ods, account):
 
        for sheet in ods.getElementsByType(odf.table.Table):
 
            sheet_account = sheet.getAttribute('name').replace(' ', ':')
 
            if sheet_account and account.is_under(sheet_account):
 
                break
 
        else:
 
            raise NoSheet(account)
 
        rows = iter(sheet.getElementsByType(odf.table.TableRow))
 
        for row in rows:
 
            cells = row.childNodes
 
            if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
 
                break
 
        else:
 
            raise NoHeader(account)
 
        return rows
 

	
 
    @classmethod
 
    def check_not_in_report(cls, ods, *accounts):
 
        for account in accounts:
 
            with pytest.raises(NotFound):
 
                cls.find_section(ods, data.Account(account))
 

	
 
    @classmethod
 
    def check_in_report(cls, ods, account, start_date=START_DATE, end_date=STOP_DATE):
 
        date = end_date + datetime.timedelta(days=1)
 
        txn = testutil.Transaction(date=date, postings=[
 
            (account, 0),
 
        ])
 
        related = cls(data.Posting.from_txn(txn))
 
        related.check_report(ods, start_date, end_date)
 

	
 
    def slice_date_range(self, start_date, end_date):
 
        postings = enumerate(self)
 
        for start_index, post in postings:
 
            if start_date <= post.meta.date:
 
                break
 
        else:
 
            start_index += 1
 
        if end_date <= post.meta.date:
 
            end_index = start_index
 
        else:
 
            for end_index, post in postings:
 
                if end_date <= post.meta.date:
 
                    break
 
            else:
 
                end_index = None
 
        return (self[:start_index].balance_at_cost(),
 
                self[start_index:end_index])
 

	
 
    def check_report(self, ods, start_date, end_date):
 
    def check_report(self, ods, start_date, end_date, expect_totals=True):
 
        account = self[0].account
 
        norm_func = core.normalize_amount_func(account)
 
        open_bal, expect_posts = self.slice_date_range(start_date, end_date)
 
        open_bal = norm_func(open_bal)
 
        for sheet in ods.getElementsByType(odf.table.Table):
 
            sheet_account = sheet.getAttribute('name').replace(' ', ':')
 
            if sheet_account and account.is_under(sheet_account):
 
                break
 
        else:
 
            raise NoSheet(account)
 
        rows = iter(sheet.getElementsByType(odf.table.TableRow))
 
        for row in rows:
 
            cells = row.childNodes
 
            if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
 
                break
 
        else:
 
            if expect_posts:
 
                raise NoHeader(account)
 
            else:
 
                return
 
        closing_bal = norm_func(expect_posts.balance_at_cost())
 
        if account.is_under('Assets', 'Liabilities'):
 
        rows = self.find_section(ods, account)
 
        if expect_totals and account.is_under('Assets', 'Liabilities'):
 
            opening_row = testutil.ODSCell.from_row(next(rows))
 
            assert opening_row[0].value == start_date
 
            assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0')
 
            closing_bal += open_bal
 
        for expected in expect_posts:
 
            cells = iter(testutil.ODSCell.from_row(next(rows)))
 
            assert next(cells).value == expected.meta.date
 
            assert next(cells).text == (expected.meta.get('entity') or '')
 
            assert next(cells).text == (expected.meta.txn.narration or '')
 
            if expected.cost is None:
 
                assert not next(cells).text
 
                assert next(cells).value == norm_func(expected.units.number)
 
            else:
 
                assert next(cells).value == norm_func(expected.units.number)
 
                assert next(cells).value == norm_func(expected.at_cost().number)
 
        closing_row = testutil.ODSCell.from_row(next(rows))
 
        assert closing_row[0].value == end_date
 
        empty = '$0.00' if expect_posts else '0'
 
        assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0')
 
        if expect_totals:
 
            closing_row = testutil.ODSCell.from_row(next(rows))
 
            assert closing_row[0].value == end_date
 
            empty = '$0.00' if expect_posts else '0'
 
            assert closing_row[4].text == closing_bal.format(None, empty=empty, sep='\0')
 

	
 

	
 
def get_sheet_names(ods):
 
    return [sheet.getAttribute('name').replace(' ', ':')
 
            for sheet in ods.getElementsByType(odf.table.Table)]
 

	
 
def check_oversize_logs(caplog, accounts, sheet_size):
 
    actual = {}
 
    for log in caplog.records:
 
        match = OVERSIZE_RE.match(log.message)
 
        if match:
 
            assert int(match.group(3).replace(',', '')) == sheet_size
...
 
@@ -227,58 +250,83 @@ def test_plan_sheets_all_oversize(caplog):
 
    check_oversize_logs(caplog, have, 100)
 

	
 
def test_plan_sheets_full_split_required(caplog):
 
    have = {
 
        Acct('Assets:Bank:Savings'): 98,
 
        Acct('Assets:Bank:Checking'): 96,
 
        Acct('Assets:Bank:Investment'): 94,
 
    }
 
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
 
    assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets']
 
    assert not caplog.records
 

	
 
def build_report(ledger_entries, start_date, stop_date, *args, **kwargs):
 
    postings = list(data.Posting.from_entries(iter(ledger_entries)))
 
    with clean_account_meta():
 
        data.Account.load_openings_and_closings(iter(ledger_entries))
 
        report = ledger.LedgerODS(start_date, stop_date, *args, **kwargs)
 
        report.write(iter(postings))
 
    return postings, report
 

	
 
@pytest.mark.parametrize('start_date,stop_date', [
 
    (START_DATE, STOP_DATE),
 
    (START_DATE, MID_DATE),
 
    (MID_DATE, STOP_DATE),
 
    (START_DATE.replace(month=6), START_DATE.replace(month=12)),
 
    (STOP_DATE, STOP_DATE.replace(month=12)),
 
])
 
def test_date_range_report(ledger_entries, start_date, stop_date):
 
    postings = list(data.Posting.from_entries(iter(ledger_entries)))
 
    with clean_account_meta():
 
        data.Account.load_openings_and_closings(iter(ledger_entries))
 
        report = ledger.LedgerODS(start_date, stop_date)
 
        report.write(iter(postings))
 
    for _, expected in ExpectedPostings.group_by_account(postings):
 
        expected.check_report(report.document, start_date, stop_date)
 
    postings, report = build_report(ledger_entries, start_date, stop_date)
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        try:
 
            related = expected[account]
 
        except KeyError:
 
            ExpectedPostings.check_in_report(report.document, account, start_date, stop_date)
 
        else:
 
            related.check_report(report.document, start_date, stop_date)
 

	
 
@pytest.mark.parametrize('tot_accts', [
 
    (),
 
    ('Assets', 'Liabilities'),
 
    ('Income', 'Expenses'),
 
    ('Assets', 'Liabilities', 'Income', 'Expenses'),
 
])
 
def test_report_filter_totals(ledger_entries, tot_accts):
 
    postings, report = build_report(ledger_entries, START_DATE, STOP_DATE,
 
                                    totals_with_entries=tot_accts,
 
                                    totals_without_entries=tot_accts)
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        expect_totals = account.startswith(tot_accts)
 
        if account in expected and expected[account][-1].meta.date >= START_DATE:
 
            expected[account].check_report(report.document, START_DATE, STOP_DATE,
 
                                           expect_totals=expect_totals)
 
        elif expect_totals:
 
            ExpectedPostings.check_in_report(report.document, account)
 
        else:
 
            ExpectedPostings.check_not_in_report(report.document, account)
 

	
 
@pytest.mark.parametrize('accounts', [
 
    ('Income', 'Expenses'),
 
    ('Assets:Receivable', 'Liabilities:Payable'),
 
])
 
def test_account_names_report(ledger_entries, accounts):
 
    postings = list(data.Posting.from_entries(iter(ledger_entries)))
 
    with clean_account_meta():
 
        data.Account.load_openings_and_closings(iter(ledger_entries))
 
        report = ledger.LedgerODS(START_DATE, STOP_DATE, accounts=accounts)
 
        report.write(iter(postings))
 
    for key, expected in ExpectedPostings.group_by_account(postings):
 
        should_find = key.startswith(accounts)
 
        try:
 
            expected.check_report(report.document, START_DATE, STOP_DATE)
 
        except NotFound:
 
            assert not should_find
 
    postings, report = build_report(ledger_entries, START_DATE, STOP_DATE, accounts)
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        if account.startswith(accounts):
 
            expected[account].check_report(report.document, START_DATE, STOP_DATE)
 
        else:
 
            assert should_find
 
            ExpectedPostings.check_not_in_report(report.document, account)
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/ledger.beancount'),
 
            rt_client=testutil.RTClient(),
 
        )
 
    arglist.insert(0, '--output-file=-')
 
    output = io.BytesIO()
 
    errors = io.StringIO()
 
    with clean_account_meta():
 
        retcode = ledger.main(arglist, output, errors, config)
...
 
@@ -286,27 +334,31 @@ def run_main(arglist, config=None):
 
    return retcode, output, errors
 

	
 
def test_main(ledger_entries):
 
    retcode, output, errors = run_main([
 
        '-b', START_DATE.isoformat(),
 
        '-e', STOP_DATE.isoformat(),
 
    ])
 
    output.seek(0)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:]
 
    postings = data.Posting.from_entries(ledger_entries)
 
    for _, expected in ExpectedPostings.group_by_account(postings):
 
        expected.check_report(ods, START_DATE, STOP_DATE)
 
    postings = data.Posting.from_entries(iter(ledger_entries))
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        try:
 
            expected[account].check_report(ods, START_DATE, STOP_DATE)
 
        except KeyError:
 
            ExpectedPostings.check_in_report(ods, account)
 

	
 
@pytest.mark.parametrize('acct_arg', [
 
    'Liabilities',
 
    'Accounts payable',
 
])
 
def test_main_account_limit(ledger_entries, acct_arg):
 
    retcode, output, errors = run_main([
 
        '-a', acct_arg,
 
        '-b', START_DATE.isoformat(),
 
        '-e', STOP_DATE.isoformat(),
 
    ])
 
    assert not errors.getvalue()
...
 
@@ -342,40 +394,44 @@ def test_main_account_classification_splits_hierarchy(ledger_entries):
 
        try:
 
            expected.check_report(ods, START_DATE, STOP_DATE)
 
        except NotFound:
 
            assert not should_find, f"{account} not found in report"
 
        else:
 
            assert should_find, f"{account} in report but should be excluded"
 

	
 
@pytest.mark.parametrize('project,start_date,stop_date', [
 
    ('eighteen', START_DATE, MID_DATE.replace(day=30)),
 
    ('nineteen', MID_DATE, STOP_DATE),
 
])
 
def test_main_project_report(ledger_entries, project, start_date, stop_date):
 
    postings = data.Posting.from_entries(ledger_entries)
 
    postings = data.Posting.from_entries(iter(ledger_entries))
 
    for key, related in ExpectedPostings.group_by_meta(postings, 'project'):
 
        if key == project:
 
            break
 
    assert key == project
 
    retcode, output, errors = run_main([
 
        f'--begin={start_date.isoformat()}',
 
        f'--end={stop_date.isoformat()}',
 
        project,
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    assert get_sheet_names(ods) == PROJECT_REPORT_SHEETS[:]
 
    for _, expected in ExpectedPostings.group_by_account(related):
 
        expected.check_report(ods, start_date, stop_date)
 
    expected = dict(ExpectedPostings.group_by_account(related))
 
    for account in iter_accounts(ledger_entries):
 
        try:
 
            expected[account].check_report(ods, start_date, stop_date)
 
        except KeyError:
 
            ExpectedPostings.check_not_in_report(ods, account)
 

	
 
@pytest.mark.parametrize('arg', [
 
    'Assets:NoneSuchBank',
 
    'Funny money',
 
])
 
def test_main_invalid_account(caplog, arg):
 
    retcode, output, errors = run_main(['-a', arg])
 
    assert retcode == 2
 
    assert any(log.message.endswith(f': {arg!r}') for log in caplog.records)
 

	
 
def test_main_no_postings(caplog):
 
    retcode, output, errors = run_main(['NonexistentProject'])
0 comments (0 inline, 0 general)