Changeset - 40573cb6dc30
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-27 22:26:03
brettcsmith@brettcsmith.org
fund: Split ODS into two sheets.

The first only has equity numbers the auditors look at.
The second includes balances of additional accounts.
2 files changed with 41 insertions and 22 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/fund.py
Show inline comments
...
 
@@ -41,158 +41,166 @@ Query a specific restricted fund and get a quick balance on the terminal::
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import locale
 
import logging
 
import sys
 

	
 
from typing import (
 
    Dict,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
)
 
from ..beancount_types import (
 
    MetaValue,
 
)
 

	
 
from pathlib import Path
 

	
 
import odf.table  # type:ignore[import]
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 

	
 
AccountsMap = Mapping[data.Account, core.PeriodPostings]
 
FundPosts = Tuple[MetaValue, AccountsMap]
 

	
 
EQUITY_ACCOUNTS = ['Equity', 'Income', 'Expenses']
 
INFO_ACCOUNTS = [
 
    'Assets:Receivable',
 
    'Assets:Prepaid',
 
    'Liabilities:Payable',
 
    'Liabilities:UnearnedIncome',
 
]
 
PROGNAME = 'fund-report'
 
UNRESTRICTED_FUND = 'Conservancy'
 
logger = logging.getLogger('conservancy_beancount.reports.fund')
 

	
 
class ODSReport(core.BaseODS[FundPosts, None]):
 
    def __init__(self, start_date: datetime.date, stop_date: datetime.date) -> None:
 
        super().__init__()
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 
        self.unrestricted: AccountsMap = {}
 

	
 
    def section_key(self, row: FundPosts) -> None:
 
        return None
 

	
 
    def start_spreadsheet(self) -> None:
 
        self.use_sheet("Fund Report")
 
        self.use_sheet("With Breakdowns")
 
        for width in [2.5, 1.5, 1.2, 1.2, 1.2, 1.5, 1.2, 1.3, 1.2, 1.3]:
 
            col_style = self.column_style(width)
 
            self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
 
        center_bold = self.merge_styles(self.style_centertext, self.style_bold)
 
        self.add_row(
 
            self.string_cell(
 
                "Fund", stylename=self.merge_styles(self.style_endtext, self.style_bold),
 
            ),
 
            self.multiline_cell(["Balance as of", self.start_date.isoformat()],
 
                                stylename=center_bold),
 
            self.string_cell("Income", stylename=center_bold),
 
            self.string_cell("Expenses", stylename=center_bold),
 
            self.multiline_cell(["Realized", "Gain/Loss"], stylename=center_bold),
 
            self.multiline_cell(["Balance as of", self.stop_date.isoformat()],
 
                                stylename=center_bold),
 
            self.multiline_cell(["Of Which", "Receivable"], stylename=center_bold),
 
            self.multiline_cell(["Of Which", "Prepaid Expenses"], stylename=center_bold),
 
            self.multiline_cell(["Of Which", "Payable"], stylename=center_bold),
 
            self.multiline_cell(["Of Which", "Unearned Income"], stylename=center_bold),
 
        )
 
        self.lock_first_row()
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"Fund Report From {self.start_date.isoformat()} To {self.stop_date.isoformat()}",
 
            stylename=center_bold,
 
            numbercolumnsspanned=6,
 
        ))
 
        self.add_row()
 

	
 
    def end_spreadsheet(self) -> None:
 
        sheet = self.copy_element(self.sheet)
 
        sheet.setAttribute('name', 'Fund Report')
 
        for row in sheet.childNodes:
 
            row.childNodes = row.childNodes[:6]
 
        self.lock_first_row(sheet)
 
        self.document.spreadsheet.insertBefore(sheet, self.sheet)
 

	
 
    def _row_balances(self, accounts_map: AccountsMap) -> Iterable[core.Balance]:
 
        acct_order = ['Income', 'Expenses', 'Equity']
 
        key_order = [core.OPENING_BALANCE_NAME, *acct_order, core.ENDING_BALANCE_NAME]
 
        balances: Dict[str, core.Balance] = {key: core.MutableBalance() for key in key_order}
 
        for acct_s, balance in core.account_balances(accounts_map, acct_order):
 
            if acct_s in balances:
 
                balances[acct_s] = balance
 
            else:
 
                acct_root, _, _ = acct_s.partition(':')
 
                balances[acct_root] += balance
 
        for key in key_order:
 
            if key == 'Expenses':
 
                yield balances[key]
 
            else:
 
                yield -balances[key]
 
        for info_key in INFO_ACCOUNTS:
 
            for _, balance in core.account_balances(accounts_map, [info_key]):
 
                pass
 
            yield core.normalize_amount_func(info_key)(balance)
 

	
 
    def write_row(self, row: FundPosts) -> None:
 
        fund, accounts_map = row
 
        if fund == UNRESTRICTED_FUND:
 
            assert not self.unrestricted
 
            self.unrestricted = accounts_map
 
            return
 
        self.add_row(
 
            self.string_cell(fund, stylename=self.style_endtext),
 
            *(self.balance_cell(bal) for bal in self._row_balances(accounts_map)),
 
        )
 

	
 
    def write(self, rows: Iterable[FundPosts]) -> None:
 
        super().write(rows)
 
        if self.unrestricted:
 
            self.add_row()
 
            self.write_row(("Unrestricted", self.unrestricted))
 

	
 

	
 
class TextReport:
 
    def __init__(self,
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
                 out_file: TextIO) -> None:
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 
        self.out_file = out_file
 

	
 
    def _account_balances(self,
 
                          fund: str,
 
                          account_map: AccountsMap,
 
    ) -> Iterator[Tuple[str, Sequence[str]]]:
 
        total_fmt = f'{fund} balance as of {{}}'
 
        for acct_s, balance in core.account_balances(account_map, EQUITY_ACCOUNTS):
 
            if acct_s is core.OPENING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.start_date.isoformat())
 
            elif acct_s is core.ENDING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.stop_date.isoformat())
 
            yield acct_s, (-balance).format(None, sep='\0').split('\0')
 
        for _, account in core.sort_and_filter_accounts(account_map, INFO_ACCOUNTS):
 
            balance = account_map[account].stop_bal
 
            if not balance.is_zero():
 
                balance = core.normalize_amount_func(account)(balance)
 
                yield account, balance.format(None, sep='\0').split('\0')
 

	
tests/test_reports_fund.py
Show inline comments
...
 
@@ -94,159 +94,170 @@ def split_text_lines(output):
 
def format_amount(amount, currency='USD'):
 
    return babel.numbers.format_currency(
 
        amount, currency, format_type='accounting',
 
    )
 

	
 
def check_text_balances(actual, expected, *expect_accounts):
 
    balance = Decimal()
 
    for expect_account in expect_accounts:
 
        expect_amount = expected[expect_account]
 
        if expect_amount:
 
            actual_account, actual_amount = next(actual)
 
            assert actual_account == expect_account
 
            assert actual_amount == format_amount(expect_amount)
 
            balance += expect_amount
 
    return balance
 

	
 
def check_text_report(output, project, start_date, stop_date):
 
    _, _, project = project.rpartition('=')
 
    balance_amount = Decimal(OPENING_BALANCES[project])
 
    expected = collections.defaultdict(Decimal)
 
    for year in range(2018, stop_date.year):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(project, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    balance_amount += amount
 
                else:
 
                    expected[account] += amount
 
    actual = split_text_lines(output)
 
    next(actual); next(actual)  # Discard headers
 
    open_acct, open_amt = next(actual)
 
    assert open_acct == "{} balance as of {}".format(
 
        project, start_date.isoformat(),
 
    )
 
    assert open_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Equity:Realized:CurrencyConversion',
 
        'Income:Other',
 
        'Expenses:Other',
 
    )
 
    end_acct, end_amt = next(actual)
 
    assert end_acct == "{} balance as of {}".format(
 
        project, stop_date.isoformat(),
 
    )
 
    assert end_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Assets:Receivable:Accounts',
 
        'Assets:Prepaid:Expenses',
 
        'Liabilities:Payable:Accounts',
 
        'Liabilities:UnearnedIncome',
 
    )
 
    assert next(actual, None) is None
 

	
 
def check_cell_balance(cell, balance):
 
    if balance:
 
        assert cell.value == balance
 
    else:
 
        assert not cell.value
 

	
 
def check_ods_sheet(sheet, account_balances, *, full):
 
    account_bals = account_balances.copy()
 
    unrestricted = account_bals.pop('Conservancy')
 
    if full:
 
        account_bals['Unrestricted'] = unrestricted
 
    for row in sheet.getElementsByType(odf.table.TableRow):
 
        cells = iter(testutil.ODSCell.from_row(row))
 
        try:
 
            fund = next(cells).firstChild.text
 
        except (AttributeError, StopIteration):
 
            fund = None
 
        if fund in account_bals:
 
            balances = account_bals.pop(fund)
 
            check_cell_balance(next(cells), balances['opening'])
 
            check_cell_balance(next(cells), balances['Income'])
 
            check_cell_balance(next(cells), -balances['Expenses'])
 
            check_cell_balance(next(cells), balances['Equity:Realized'])
 
            check_cell_balance(next(cells), sum(balances[key] for key in [
 
                'opening', 'Income', 'Expenses', 'Equity:Realized',
 
            ]))
 
            if full:
 
                check_cell_balance(next(cells), balances['Assets:Receivable'])
 
                check_cell_balance(next(cells), balances['Assets:Prepaid'])
 
                check_cell_balance(next(cells), balances['Liabilities:Payable'])
 
                check_cell_balance(next(cells), balances['Liabilities'])
 
            assert next(cells, None) is None
 
    assert not account_bals, "did not see all funds in report"
 

	
 
def check_ods_report(ods, start_date, stop_date):
 
    account_bals = collections.OrderedDict((key, {
 
        'opening': Decimal(amount),
 
        'Income': Decimal(0),
 
        'Expenses': Decimal(0),
 
        'Equity:Realized': Decimal(0),
 
        'Assets:Receivable': Decimal(0),
 
        'Assets:Prepaid': Decimal(0),
 
        'Liabilities:Payable': Decimal(0),
 
        'Liabilities': Decimal(0),  # UnearnedIncome
 
    }) for key, amount in sorted(OPENING_BALANCES.items()))
 
    for fund, year in itertools.product(account_bals, range(2018, stop_date.year)):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(fund, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    acct_key = 'opening'
 
                else:
 
                    acct_key, _, _ = account.rpartition(':')
 
                account_bals[fund][acct_key] += amount
 
    account_bals['Unrestricted'] = account_bals.pop('Conservancy')
 
    for row in ods.getElementsByType(odf.table.TableRow):
 
        cells = iter(testutil.ODSCell.from_row(row))
 
        try:
 
            fund = next(cells).firstChild.text
 
        except (AttributeError, StopIteration):
 
            fund = None
 
        if fund in account_bals:
 
            balances = account_bals.pop(fund)
 
            check_cell_balance(next(cells), balances['opening'])
 
            check_cell_balance(next(cells), balances['Income'])
 
            check_cell_balance(next(cells), -balances['Expenses'])
 
            check_cell_balance(next(cells), balances['Equity:Realized'])
 
            check_cell_balance(next(cells), sum(balances[key] for key in [
 
                'opening', 'Income', 'Expenses', 'Equity:Realized',
 
            ]))
 
            check_cell_balance(next(cells), balances['Assets:Receivable'])
 
            check_cell_balance(next(cells), balances['Assets:Prepaid'])
 
            check_cell_balance(next(cells), balances['Liabilities:Payable'])
 
            check_cell_balance(next(cells), balances['Liabilities'])
 
    assert not account_bals, "did not see all funds in report"
 
    sheets = iter(ods.getElementsByType(odf.table.Table))
 
    check_ods_sheet(next(sheets), account_bals, full=False)
 
    check_ods_sheet(next(sheets), account_bals, full=True)
 
    assert next(sheets, None) is None, "found unexpected sheet"
 

	
 
def run_main(out_type, arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/fund.beancount'),
 
        )
 
    arglist.insert(0, '--output-file=-')
 
    output = out_type()
 
    errors = io.StringIO()
 
    retcode = fund.main(arglist, output, errors, config)
 
    output.seek(0)
 
    return retcode, output, errors
 

	
 
@pytest.mark.parametrize('project,start_date,stop_date', [
 
    ('Conservancy', START_DATE, STOP_DATE),
 
    ('project=Conservancy', MID_DATE, STOP_DATE),
 
    ('Conservancy', START_DATE, MID_DATE),
 
    ('Alpha', START_DATE, STOP_DATE),
 
    ('project=Alpha', MID_DATE, STOP_DATE),
 
    ('Alpha', START_DATE, MID_DATE),
 
    ('Bravo', START_DATE, STOP_DATE),
 
    ('project=Bravo', MID_DATE, STOP_DATE),
 
    ('Bravo', START_DATE, MID_DATE),
 
    ('project=Charlie', START_DATE, STOP_DATE),
 
])
 
def test_text_report(project, start_date, stop_date):
 
    retcode, output, errors = run_main(io.StringIO, [
 
        '-b', start_date.isoformat(), '-e', stop_date.isoformat(), project,
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_text_report(output, project, start_date, stop_date)
 

	
 
@pytest.mark.parametrize('start_date,stop_date', [
 
    (START_DATE, STOP_DATE),
 
    (MID_DATE, STOP_DATE),
 
    (START_DATE, MID_DATE),
 
])
 
def test_ods_report(start_date, stop_date):
 
    retcode, output, errors = run_main(io.BytesIO, [
 
        '--begin', start_date.isoformat(), '--end', stop_date.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    check_ods_report(ods, start_date, stop_date)
 

	
 
def test_main_no_postings(caplog):
 
    retcode, output, errors = run_main(io.StringIO, ['NonexistentProject'])
 
    assert retcode == 24
 
    assert any(log.levelname == 'WARNING' for log in caplog.records)
0 comments (0 inline, 0 general)