Changeset - 7a9bc2da5040
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-20 13:11:01
brettcsmith@brettcsmith.org
reports: Add sort_and_filter_accounts() function.

Extracted from the ledger report.
3 files changed with 106 insertions and 19 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -1024,48 +1024,91 @@ class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
 
        cell = odf.table.TableCell(valuetype='string', **attrs)
 
        cell.addElement(odf.text.P(text=text))
 
        return cell
 

	
 
    def write_row(self, row: RT) -> None:
 
        """Write a single row of input data to the spreadsheet
 

	
 
        This default implementation adds a single row to the spreadsheet,
 
        with one cell per element of the row. The type of each element
 
        determines what kind of cell is created.
 

	
 
        This implementation will help get you started, but you'll probably
 
        want to override it to specify styles.
 
        """
 
        out_row = odf.table.TableRow()
 
        for cell_source in row:
 
            if isinstance(cell_source, (int, float, Decimal)):
 
                cell = self.float_cell(cell_source)
 
            else:
 
                cell = self.string_cell(cell_source)
 
            out_row.addElement(cell)
 
        self.sheet.addElement(out_row)
 

	
 
    def save_file(self, out_file: BinaryIO) -> None:
 
        self.document.write(out_file)
 

	
 
    def save_path(self, path: Path, mode: str='w') -> None:
 
        with path.open(f'{mode}b') as out_file:
 
            out_file = cast(BinaryIO, out_file)
 
            self.save_file(out_file)
 

	
 

	
 
def normalize_amount_func(account_name: str) -> Callable[[T], T]:
 
    """Get a function to normalize amounts for reporting
 

	
 
    Given an account name, return a function that can be used on "amounts"
 
    under that account (including numbers, Amount objects, and Balance objects)
 
    to normalize them for reporting. Right now that means make flipping the
 
    sign for accounts where "normal" postings are negative.
 
    """
 
    if account_name.startswith(('Assets:', 'Expenses:')):
 
        # We can't just return operator.pos because Beancount's Amount class
 
        # doesn't implement __pos__.
 
        return lambda amt: amt
 
    elif account_name.startswith(('Equity:', 'Income:', 'Liabilities:')):
 
        return operator.neg
 
    else:
 
        raise ValueError(f"unrecognized account name {account_name!r}")
 

	
 
def sort_and_filter_accounts(
 
        accounts: Iterable[data.Account],
 
        order: Sequence[str],
 
) -> Iterator[Tuple[int, data.Account]]:
 
    """Reorganize accounts based on an ordered set of names
 

	
 
    This function takes a iterable of Account objects, and a sequence of
 
    account names. Usually the account names are higher parts of the account
 
    hierarchy like Income, Equity, or Assets:Receivable.
 

	
 
    It returns an iterator of 2-tuples, ``(index, account)`` where ``index`` is
 
    an index into the ordering sequence, and ``account`` is one of the input
 
    Account objects that's under the account name ``order[index]``. Tuples are
 
    sorted, so ``index`` increases monotonically, and Account objects using the
 
    same index are yielded sorted by name.
 

	
 
    For example, if your order is
 
    ``['Liabilities:Payable', 'Assets:Receivable']``, the return value will
 
    first yield zero or more results with index 0 and an account under
 
    Liabilities:Payable, then zero or more results with index 1 and an account
 
    under Accounts:Receivable.
 

	
 
    Input Accounts that are not under any of the account names in ``order`` do
 
    not appear in the output iterator. That's the filtering part.
 

	
 
    Note that if none of the input Accounts are under one of the ordering
 
    sequence accounts, its index will never appear in the results. This is why
 
    the 2-tuples include an index rather than the original account name string,
 
    to make it easier for callers to know when this happens and do something
 
    with unused ordering accounts.
 
    """
 
    index_map = {s: ii for ii, s in enumerate(order)}
 
    retval: Mapping[int, List[data.Account]] = collections.defaultdict(list)
 
    for account in accounts:
 
        acct_key = account.is_under(*order)
 
        if acct_key is not None:
 
            retval[index_map[acct_key]].append(account)
 
    return (
 
        (key, account)
 
        for key in sorted(retval)
 
        for account in sorted(retval[key])
 
    )
conservancy_beancount/reports/ledger.py
Show inline comments
...
 
@@ -195,113 +195,96 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
        maybe_split = cls._group_tally(tally_by_account[:index], group_func)
 
        must_split = cls._group_tally(tally_by_account[index:], group_func)
 
        for subkey, must_split_tally in sorted(must_split.items()):
 
            split_names = cls._split_sheet(
 
                maybe_split.get(subkey, []) + must_split_tally, sheet_size, subkey,
 
            )
 
            # We must be willing to split out at least as many sheets as there
 
            # are accounts that didn't fit. Do that first.
 
            yield from itertools.islice(split_names, len(must_split_tally))
 
            # After that, we can be in one of two cases:
 
            # 1. There is no next sheet. All the accounts, including the
 
            #    maybe_splits and must_splits, fit on planned subsheets.
 
            #    Update state to note we don't need a sheet for them anymore.
 
            # 2. The next sheet is named `subkey`, and is planned to include
 
            #    all of our maybe_split accounts. However, we don't need to
 
            #    yield that sheet name, because those accounts already fit in
 
            #    the sheet we're planning, and it would be a needless split.
 
            next_sheet_name = next(split_names, None)
 
            if next_sheet_name is None:
 
                maybe_split.pop(subkey, None)
 
            else:
 
                assert next_sheet_name == subkey
 
                assert not any(split_names)
 
        if maybe_split:
 
            yield sheet_name
 

	
 
    @classmethod
 
    def plan_sheets(
 
            cls,
 
            tally_by_account: Mapping[data.Account, int],
 
            base_sheets: Sequence[str],
 
            sheet_size: int,
 
    ) -> Sequence[str]:
 
        sorted_tally: PostTally = [
 
            (count, account)
 
            for account, count in tally_by_account.items()
 
        ]
 
        sorted_tally.sort()
 
        split_tally = cls._group_tally(
 
            sorted_tally,
 
            operator.methodcaller('is_under', *base_sheets),
 
        )
 
        return [
 
            sheet_name
 
            for key in base_sheets
 
            for sheet_name in cls._split_sheet(split_tally[key], sheet_size, key)
 
        ]
 

	
 
    @staticmethod
 
    def _sort_and_filter_accounts(
 
            accounts: Iterable[data.Account],
 
            order: Sequence[str],
 
    ) -> Iterator[Tuple[int, data.Account]]:
 
        index_map = {s: ii for ii, s in enumerate(order)}
 
        retval: Mapping[int, List[data.Account]] = collections.defaultdict(list)
 
        for account in accounts:
 
            acct_key = account.is_under(*order)
 
            if acct_key is not None:
 
                retval[index_map[acct_key]].append(account)
 
        for key in sorted(retval):
 
            acct_list = retval[key]
 
            acct_list.sort()
 
            for account in acct_list:
 
                yield key, account
 

	
 
    def section_key(self, row: data.Posting) -> data.Account:
 
        return row.account
 

	
 
    def start_sheet(self, sheet_name: str) -> None:
 
        self.use_sheet(sheet_name.replace(':', ' '))
 
        columns_key = data.Account(sheet_name).is_under(*self.ACCOUNT_COLUMNS)
 
        # columns_key must not be None because ACCOUNT_COLUMNS has an entry
 
        # for all five root accounts.
 
        assert columns_key is not None
 
        self.metadata_columns = self.ACCOUNT_COLUMNS[columns_key]
 
        self.sheet_columns: Sequence[str] = [
 
            *self.CORE_COLUMNS,
 
            *(data.Metadata.human_name(meta_key) for meta_key in self.metadata_columns),
 
        ]
 
        for col_name in self.sheet_columns:
 
            self.sheet.addElement(odf.table.TableColumn(
 
                stylename=self.column_styles.get(col_name, self.default_column),
 
            ))
 
        self.add_row(*(
 
            self.string_cell(col_name, stylename=self.style_bold)
 
            for col_name in self.sheet_columns
 
        ))
 
        self.lock_first_row()
 

	
 
    def _report_section_balance(self, key: data.Account, date_key: str) -> None:
 
        uses_opening = key.is_under('Assets', 'Equity', 'Liabilities')
 
        related = self.account_groups[key]
 
        if date_key == 'start':
 
            if not uses_opening:
 
                return
 
            date = self.date_range.start
 
            balance = related.start_bal
 
            description = "Opening Balance"
 
        else:
 
            date = self.date_range.stop
 
            if uses_opening:
 
                balance = related.stop_bal
 
                description = "Ending Balance"
 
            else:
 
                balance = related.period_bal
 
                description = "Period Total"
 
        self.add_row(
 
            self.date_cell(date, stylename=self.merge_styles(
 
                self.style_bold, self.style_date,
 
            )),
 
            odf.table.TableCell(),
 
            self.string_cell(description, stylename=self.style_bold),
 
            odf.table.TableCell(),
...
 
@@ -338,127 +321,127 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
            self.string_cell(row.meta.get('entity') or ''),
 
            self.string_cell(row.meta.txn.narration),
 
            amount_cell,
 
            self.currency_cell(self.norm_func(row.at_cost())),
 
            *(self.meta_links_cell(row.meta.report_links(key))
 
              if key in data.LINK_METADATA
 
              else self.string_cell(row.meta.get(key, ''))
 
              for key in self.metadata_columns),
 
        )
 

	
 
    def _combined_balance_row(self,
 
                              balance_accounts: Sequence[str],
 
                              attr_name: str,
 
    ) -> None:
 
        date = getattr(self.date_range, attr_name)
 
        balance_attrname = f'{attr_name}_bal'
 
        balance = -sum((
 
            getattr(related, balance_attrname)
 
            for account, related in self.account_groups.items()
 
            if account.is_under(*balance_accounts)
 
        ), core.MutableBalance())
 
        self.add_row(
 
            self.string_cell(
 
                f"Balance as of {date.isoformat()}",
 
                stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
            ),
 
            self.balance_cell(balance, stylename=self.style_bold),
 
        )
 

	
 
    def write_balance_sheet(self) -> None:
 
        balance_accounts = ['Equity', 'Income', 'Expenses']
 
        self.use_sheet("Balance")
 
        self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(3)))
 
        self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(1.5)))
 
        self.add_row(
 
            self.string_cell("Account", stylename=self.style_bold),
 
            self.string_cell("Balance", stylename=self.style_bold),
 
        )
 
        self.lock_first_row()
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"Ledger From {self.date_range.start.isoformat()}"
 
            f" To {self.date_range.stop.isoformat()}",
 
            stylename=self.merge_styles(self.style_centertext, self.style_bold),
 
            numbercolumnsspanned=2,
 
        ))
 
        self.add_row()
 
        self._combined_balance_row(balance_accounts, 'start')
 
        for _, account in self._sort_and_filter_accounts(
 
        for _, account in core.sort_and_filter_accounts(
 
                self.account_groups, balance_accounts,
 
        ):
 
            balance = self.account_groups[account].period_bal
 
            if not balance.is_zero():
 
                self.add_row(
 
                    self.string_cell(account, stylename=self.style_endtext),
 
                    self.balance_cell(-balance),
 
                )
 
        self._combined_balance_row(balance_accounts, 'stop')
 

	
 
    def write(self, rows: Iterable[data.Posting]) -> None:
 
        AccountPostings.START_DATE = self.date_range.start
 
        self.account_groups = dict(AccountPostings.group_by_account(
 
            post for post in rows if post.meta.date < self.date_range.stop
 
        ))
 
        self.write_balance_sheet()
 
        tally_by_account_iter = (
 
            (account, sum(1 for post in related if post.meta.date in self.date_range))
 
            for account, related in self.account_groups.items()
 
        )
 
        tally_by_account = {
 
            account: count
 
            for account, count in tally_by_account_iter
 
            if count
 
        }
 
        sheet_names = self.plan_sheets(
 
            tally_by_account, self.required_sheet_names, self.sheet_size,
 
        )
 
        using_sheet_index = -1
 
        for sheet_index, account in self._sort_and_filter_accounts(
 
        for sheet_index, account in core.sort_and_filter_accounts(
 
                tally_by_account, sheet_names,
 
        ):
 
            while using_sheet_index < sheet_index:
 
                using_sheet_index += 1
 
                self.start_sheet(sheet_names[using_sheet_index])
 
            super().write(self.account_groups[account])
 
        for index in range(using_sheet_index + 1, len(sheet_names)):
 
            self.start_sheet(sheet_names[index])
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date, or 30 days from today if the start
 
date was also not specified.
 
""")
 
    parser.add_argument(
 
        '--account', '-a',
 
        dest='sheet_names',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""Show this account in the report. You can specify this option
 
multiple times. If not specified, the default set adapts to your search
 
criteria.
 
""")
 
    parser.add_argument(
 
        '--sheet-size', '--size',
 
        metavar='SIZE',
 
        type=int,
tests/test_reports_core.py
Show inline comments
 
"""test_reports_core - Unit tests for basic reports functions"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from decimal import Decimal
 

	
 
from . import testutil
 

	
 
from conservancy_beancount.reports import core
 

	
 
from conservancy_beancount.data import Account
 

	
 
AMOUNTS = [
 
    2,
 
    Decimal('4.40'),
 
    testutil.Amount('6.60', 'CHF'),
 
    core.Balance([testutil.Amount('8.80')]),
 
]
 

	
 
@pytest.mark.parametrize('acct_name', [
 
    'Assets:Checking',
 
    'Assets:Receivable:Accounts',
 
    'Expenses:Other',
 
    'Expenses:FilingFees',
 
])
 
def test_normalize_amount_func_pos(acct_name):
 
    actual = core.normalize_amount_func(acct_name)
 
    for amount in AMOUNTS:
 
        assert actual(amount) == amount
 

	
 
@pytest.mark.parametrize('acct_name', [
 
    'Equity:Funds:Restricted',
 
    'Equity:Realized:CurrencyConversion',
 
    'Income:Donations',
 
    'Income:Other',
 
    'Liabilities:CreditCard',
 
    'Liabilities:Payable:Accounts',
 
])
 
def test_normalize_amount_func_neg(acct_name):
 
    actual = core.normalize_amount_func(acct_name)
 
    for amount in AMOUNTS:
 
        assert actual(amount) == -amount
 

	
 
@pytest.mark.parametrize('acct_name', [
 
    '',
 
    'Assets',
 
    'Equity',
 
    'Expenses',
 
    'Income',
 
    'Liabilities',
 
])
 
def test_normalize_amount_func_bad_acct_name(acct_name):
 
    with pytest.raises(ValueError):
 
        core.normalize_amount_func(acct_name)
 

	
 
def test_sort_and_filter_accounts():
 
    accounts = (Account(s) for s in [
 
        'Expenses:Services',
 
        'Assets:Receivable',
 
        'Income:Other',
 
        'Liabilities:Payable',
 
        'Equity:Funds:Unrestricted',
 
        'Income:Donations',
 
        'Expenses:Other',
 
    ])
 
    actual = core.sort_and_filter_accounts(accounts, ['Equity', 'Income', 'Expenses'])
 
    assert list(actual) == [
 
        (0, 'Equity:Funds:Unrestricted'),
 
        (1, 'Income:Donations'),
 
        (1, 'Income:Other'),
 
        (2, 'Expenses:Other'),
 
        (2, 'Expenses:Services'),
 
    ]
 

	
 
def test_sort_and_filter_accounts_unused_name():
 
    accounts = (Account(s) for s in [
 
        'Liabilities:CreditCard',
 
        'Assets:Cash',
 
        'Assets:Receivable:Accounts',
 
    ])
 
    actual = core.sort_and_filter_accounts(
 
        accounts, ['Assets:Receivable', 'Liabilities:Payable', 'Assets', 'Liabilities'],
 
    )
 
    assert list(actual) == [
 
        (0, 'Assets:Receivable:Accounts'),
 
        (2, 'Assets:Cash'),
 
        (3, 'Liabilities:CreditCard'),
 
    ]
 

	
 
def test_sort_and_filter_accounts_with_subaccounts():
 
    accounts = (Account(s) for s in [
 
        'Assets:Checking',
 
        'Assets:Receivable:Fraud',
 
        'Assets:Cash',
 
        'Assets:Receivable:Accounts',
 
    ])
 
    actual = core.sort_and_filter_accounts(accounts, ['Assets:Receivable', 'Assets'])
 
    assert list(actual) == [
 
        (0, 'Assets:Receivable:Accounts'),
 
        (0, 'Assets:Receivable:Fraud'),
 
        (1, 'Assets:Cash'),
 
        (1, 'Assets:Checking'),
 
    ]
 

	
 
@pytest.mark.parametrize('empty_arg', ['accounts', 'order'])
 
def test_sort_and_filter_accounts_empty_accounts(empty_arg):
 
    accounts = [Account(s) for s in ['Expenses:Other', 'Income:Other']]
 
    if empty_arg == 'accounts':
 
        args = ([], accounts)
 
    else:
 
        args = (accounts, [])
 
    actual = core.sort_and_filter_accounts(*args)
 
    assert next(actual, None) is None
0 comments (0 inline, 0 general)