Changeset - d6821b13681b
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-28 13:43:44
brettcsmith@brettcsmith.org
fund: Fund report columns more closely match the audit report.
2 files changed with 23 insertions and 9 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/fund.py
Show inline comments
...
 
@@ -56,165 +56,178 @@ from typing import (
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
)
 
from ..beancount_types import (
 
    MetaValue,
 
)
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
import odf.table  # type:ignore[import]
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 

	
 
AccountsMap = Mapping[data.Account, core.PeriodPostings]
 
FundPosts = Tuple[MetaValue, AccountsMap]
 

	
 
EQUITY_ACCOUNTS = ['Equity', 'Income', 'Expenses']
 
INFO_ACCOUNTS = [
 
    'Assets:Receivable',
 
    'Assets:Prepaid',
 
    'Liabilities:Payable',
 
    'Liabilities:UnearnedIncome',
 
]
 
PROGNAME = 'fund-report'
 
UNRESTRICTED_FUND = 'Conservancy'
 
logger = logging.getLogger('conservancy_beancount.reports.fund')
 

	
 
class ODSReport(core.BaseODS[FundPosts, None]):
 
    def __init__(self, start_date: datetime.date, stop_date: datetime.date) -> None:
 
        super().__init__()
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 
        self.unrestricted: AccountsMap = {}
 

	
 
    def section_key(self, row: FundPosts) -> None:
 
        return None
 

	
 
    def start_spreadsheet(self) -> None:
 
        self.use_sheet("With Breakdowns")
 
        for width in [2.5, 1.5, 1.2, 1.2, 1.2, 1.5, 1.2, 1.3, 1.2, 1.3]:
 
            col_style = self.column_style(width)
 
            self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
 
        center_bold = self.merge_styles(self.style_centertext, self.style_bold)
 
        self.add_row(
 
            self.string_cell(
 
                "Fund", stylename=self.merge_styles(self.style_endtext, self.style_bold),
 
            ),
 
            self.multiline_cell(["Balance as of", self.start_date.isoformat()],
 
                                stylename=center_bold),
 
            self.string_cell("Income", stylename=center_bold),
 
            self.string_cell("Expenses", stylename=center_bold),
 
            self.multiline_cell(["Realized", "Gain/Loss"], stylename=center_bold),
 
            self.string_cell("Equity", stylename=center_bold),
 
            self.multiline_cell(["Balance as of", self.stop_date.isoformat()],
 
                                stylename=center_bold),
 
            self.multiline_cell(["Of Which", "Receivable"], stylename=center_bold),
 
            self.multiline_cell(["Of Which", "Prepaid Expenses"], stylename=center_bold),
 
            self.multiline_cell(["Of Which", "Payable"], stylename=center_bold),
 
            self.multiline_cell(["Of Which", "Unearned Income"], stylename=center_bold),
 
        )
 
        self.lock_first_row()
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"Fund Report From {self.start_date.isoformat()} To {self.stop_date.isoformat()}",
 
            stylename=center_bold,
 
            numbercolumnsspanned=6,
 
        ))
 
        self.add_row()
 

	
 
    def end_spreadsheet(self) -> None:
 
        sheet = self.copy_element(self.sheet)
 
        sheet.setAttribute('name', 'Fund Report')
 
        row_qname = sheet.lastChild.qname
 
        row_qname = odf.table.TableRow().qname
 
        skip_rows: List[int] = []
 
        report_threshold = Decimal('.5')
 
        first_row = True
 
        for index, row in enumerate(sheet.childNodes):
 
            row.childNodes = row.childNodes[:6]
 
            if len(row.childNodes) < 6:
 
                continue
 
            row.childNodes = [*row.childNodes[:4], row.childNodes[5]]
 
            if row.qname != row_qname:
 
                pass
 
            elif first_row:
 
                ref_child = row.childNodes[2]
 
                stylename = ref_child.getAttribute('stylename')
 
                row.insertBefore(self.string_cell(
 
                    "Additions", stylename=stylename,
 
                ), ref_child)
 
                row.insertBefore(self.multiline_cell(
 
                    ["Releases from", "Restrictions"], stylename=stylename,
 
                ), ref_child)
 
                del row.childNodes[4:6]
 
                first_row = False
 
            # Filter out fund rows that don't have anything reportable.
 
            if (row.qname == row_qname
 
                # len(childNodes) makes sure this isn't a header/spacer row.
 
                and len(row.childNodes) == 6
 
                and not any(
 
            elif not any(
 
                    # Multiple childNodes means it's a multi-currency balance.
 
                    len(cell.childNodes) > 1
 
                    # Some column has to round up to 1 to be reportable.
 
                    or (cell.getAttribute('valuetype') == 'currency'
 
                        and Decimal(cell.getAttribute('value')) >= report_threshold)
 
                    for cell in row.childNodes
 
            )):
 
            ):
 
                skip_rows.append(index)
 
        for index in reversed(skip_rows):
 
            del sheet.childNodes[index]
 
        self.lock_first_row(sheet)
 
        self.document.spreadsheet.insertBefore(sheet, self.sheet)
 

	
 
    def _row_balances(self, accounts_map: AccountsMap) -> Iterable[core.Balance]:
 
        acct_order = ['Income', 'Expenses', 'Equity']
 
        key_order = [core.OPENING_BALANCE_NAME, *acct_order, core.ENDING_BALANCE_NAME]
 
        balances: Dict[str, core.Balance] = {key: core.MutableBalance() for key in key_order}
 
        for acct_s, balance in core.account_balances(accounts_map, acct_order):
 
            if acct_s in balances:
 
                balances[acct_s] = balance
 
            else:
 
                acct_root, _, _ = acct_s.partition(':')
 
                balances[acct_root] += balance
 
        for key in key_order:
 
            if key == 'Expenses':
 
                yield balances[key]
 
            else:
 
                yield -balances[key]
 
        for info_key in INFO_ACCOUNTS:
 
            for _, balance in core.account_balances(accounts_map, [info_key]):
 
                pass
 
            yield core.normalize_amount_func(info_key)(balance)
 

	
 
    def write_row(self, row: FundPosts) -> None:
 
        fund, accounts_map = row
 
        if fund == UNRESTRICTED_FUND:
 
            assert not self.unrestricted
 
            self.unrestricted = accounts_map
 
            return
 
        self.add_row(
 
            self.string_cell(fund, stylename=self.style_endtext),
 
            *(self.balance_cell(bal) for bal in self._row_balances(accounts_map)),
 
        )
 

	
 
    def write(self, rows: Iterable[FundPosts]) -> None:
 
        super().write(rows)
 
        if self.unrestricted:
 
            self.add_row()
 
            self.write_row(("Unrestricted", self.unrestricted))
 

	
 

	
 
class TextReport:
 
    def __init__(self,
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
                 out_file: TextIO) -> None:
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 
        self.out_file = out_file
 

	
 
    def _account_balances(self,
 
                          fund: str,
 
                          account_map: AccountsMap,
 
    ) -> Iterator[Tuple[str, Sequence[str]]]:
 
        total_fmt = f'{fund} balance as of {{}}'
 
        for acct_s, balance in core.account_balances(account_map, EQUITY_ACCOUNTS):
 
            if acct_s is core.OPENING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.start_date.isoformat())
 
            elif acct_s is core.ENDING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.stop_date.isoformat())
 
            yield acct_s, (-balance).format(None, sep='\0').split('\0')
tests/test_reports_fund.py
Show inline comments
...
 
@@ -124,129 +124,130 @@ def check_text_report(output, project, start_date, stop_date):
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    balance_amount += amount
 
                else:
 
                    expected[account] += amount
 
    actual = split_text_lines(output)
 
    next(actual); next(actual)  # Discard headers
 
    open_acct, open_amt = next(actual)
 
    assert open_acct == "{} balance as of {}".format(
 
        project, start_date.isoformat(),
 
    )
 
    assert open_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Equity:Realized:CurrencyConversion',
 
        'Income:Other',
 
        'Expenses:Other',
 
    )
 
    end_acct, end_amt = next(actual)
 
    assert end_acct == "{} balance as of {}".format(
 
        project, stop_date.isoformat(),
 
    )
 
    assert end_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Assets:Receivable:Accounts',
 
        'Assets:Prepaid:Expenses',
 
        'Liabilities:Payable:Accounts',
 
        'Liabilities:UnearnedIncome',
 
    )
 
    assert next(actual, None) is None
 

	
 
def check_cell_balance(cell, balance):
 
    if balance:
 
        assert cell.value == balance
 
    else:
 
        assert not cell.value
 

	
 
def check_ods_sheet(sheet, account_balances, *, full):
 
    if full:
 
        account_bals = account_balances.copy()
 
        account_bals['Unrestricted'] = account_bals.pop('Conservancy')
 
    else:
 
        account_bals = {
 
            key: balances
 
            for key, balances in account_balances.items()
 
            if key != 'Conservancy' and any(v >= .5 for v in balances.values())
 
        }
 
    for row in itertools.islice(sheet.getElementsByType(odf.table.TableRow), 4, None):
 
        cells = iter(testutil.ODSCell.from_row(row))
 
        try:
 
            fund = next(cells).firstChild.text
 
        except (AttributeError, StopIteration):
 
            continue
 
        try:
 
            balances = account_bals.pop(fund)
 
        except KeyError:
 
            pytest.fail(f"report included unexpected fund {fund}")
 
        check_cell_balance(next(cells), balances['opening'])
 
        check_cell_balance(next(cells), balances['Income'])
 
        check_cell_balance(next(cells), -balances['Expenses'])
 
        check_cell_balance(next(cells), balances['Equity:Realized'])
 
        if full:
 
            check_cell_balance(next(cells), balances['Equity:Realized'])
 
        check_cell_balance(next(cells), sum(balances[key] for key in [
 
            'opening', 'Income', 'Expenses', 'Equity:Realized',
 
        ]))
 
        if full:
 
            check_cell_balance(next(cells), balances['Assets:Receivable'])
 
            check_cell_balance(next(cells), balances['Assets:Prepaid'])
 
            check_cell_balance(next(cells), balances['Liabilities:Payable'])
 
            check_cell_balance(next(cells), balances['Liabilities'])
 
        assert next(cells, None) is None
 
    assert not account_bals, "did not see all funds in report"
 

	
 
def check_ods_report(ods, start_date, stop_date):
 
    account_bals = collections.OrderedDict((key, {
 
        'opening': Decimal(amount),
 
        'Income': Decimal(0),
 
        'Expenses': Decimal(0),
 
        'Equity:Realized': Decimal(0),
 
        'Assets:Receivable': Decimal(0),
 
        'Assets:Prepaid': Decimal(0),
 
        'Liabilities:Payable': Decimal(0),
 
        'Liabilities': Decimal(0),  # UnearnedIncome
 
    }) for key, amount in sorted(OPENING_BALANCES.items()))
 
    for fund, year in itertools.product(account_bals, range(2018, stop_date.year)):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(fund, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    acct_key = 'opening'
 
                else:
 
                    acct_key, _, _ = account.rpartition(':')
 
                account_bals[fund][acct_key] += amount
 
    sheets = iter(ods.getElementsByType(odf.table.Table))
 
    check_ods_sheet(next(sheets), account_bals, full=False)
 
    check_ods_sheet(next(sheets), account_bals, full=True)
 
    assert next(sheets, None) is None, "found unexpected sheet"
 

	
 
def run_main(out_type, arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/fund.beancount'),
 
        )
 
    arglist.insert(0, '--output-file=-')
 
    output = out_type()
 
    errors = io.StringIO()
 
    retcode = fund.main(arglist, output, errors, config)
 
    output.seek(0)
 
    return retcode, output, errors
 

	
 
@pytest.mark.parametrize('project,start_date,stop_date', [
 
    ('Conservancy', START_DATE, STOP_DATE),
 
    ('project=Conservancy', MID_DATE, STOP_DATE),
 
    ('Conservancy', START_DATE, MID_DATE),
 
    ('Alpha', START_DATE, STOP_DATE),
 
    ('project=Alpha', MID_DATE, STOP_DATE),
 
    ('Alpha', START_DATE, MID_DATE),
 
    ('Bravo', START_DATE, STOP_DATE),
 
    ('project=Bravo', MID_DATE, STOP_DATE),
 
    ('Bravo', START_DATE, MID_DATE),
 
    ('project=Charlie', START_DATE, STOP_DATE),
 
])
 
def test_text_report(project, start_date, stop_date):
0 comments (0 inline, 0 general)