Files @ 6703d1af87ad
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/test_reports_fund.py

Brett Smith
reports: BaseODS puts each line of strings in a P tag.

This seems to be the most straightforward way to get Calc to automatically
determine a nice row height for multi-line string cells. This has become a
lot more noticeable now that query-report supports putting postal addresses
in cells.
"""test_reports_fund.py - Unit tests for fund report"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import collections
import copy
import datetime
import io
import itertools

import pytest

from . import testutil

import babel.numbers
import odf.opendocument
import odf.table

from beancount import loader as bc_loader
from conservancy_beancount import data
from conservancy_beancount.reports import core
from conservancy_beancount.reports import fund

from decimal import Decimal

_ledger_load = bc_loader.load_file(testutil.test_path('books/fund.beancount'))
START_DATE = datetime.date(2018, 3, 1)
MID_DATE = datetime.date(2019, 3, 1)
STOP_DATE = datetime.date(2020, 3, 1)

EQUITY_ROOT_ACCOUNTS = ('Expenses:', 'Equity:', 'Income:')

OPENING_BALANCES = {
    'Alpha': 3000,
    'Bravo': 2000,
    'Charlie': 1000,
    'Conservancy': 4000,
    'Delta': 0,
}

BALANCES_BY_YEAR = {
    ('Conservancy', 2018): [
        ('Income:Other', 40),
        ('Expenses:Other', -4),
        ('Assets:Receivable:Accounts', 40),
        ('Liabilities:Payable:Accounts', 4),
    ],
    ('Conservancy', 2019): [
        ('Income:Other', 42),
        ('Expenses:Other', Decimal('-4.20')),
        ('Equity:Funds:Unrestricted', 100),
        ('Equity:Realized:CurrencyConversion', Decimal('6.20')),
        ('Assets:Receivable:Accounts', -40),
        ('Liabilities:Payable:Accounts', -4),
    ],
    ('Alpha', 2018): [
        ('Income:Other', 60),
        ('Liabilities:UnearnedIncome', 30),
        ('Assets:Prepaid:Expenses', 20),
    ],
    ('Alpha', 2019): [
        ('Income:Other', 30),
        ('Expenses:Other', -26),
        ('Assets:Prepaid:Expenses', -20),
        ('Liabilities:UnearnedIncome', -30),
    ],
    ('Bravo', 2018): [
        ('Expenses:Other', -20),
    ],
    ('Bravo', 2019): [
        ('Income:Other', 200),
    ],
    ('Charlie', 2019): [
        ('Equity:Funds:Restricted', -100),
    ],
    ('Delta', 2018): [
        ('Income:Other', Decimal('.40')),
    ],
    ('Delta', 2019): [
        ('Income:Other', Decimal('4.60')),
    ],
}

clean_account_meta = pytest.fixture(autouse=True)(testutil.clean_account_meta)

@pytest.fixture
def fund_entries():
    return copy.deepcopy(_ledger_load[0])

def split_text_lines(output):
    for line in output:
        account, amount = line.rsplit(None, 1)
        yield account.strip(), amount

def format_amount(amount, currency='USD'):
    return babel.numbers.format_currency(
        amount, currency, format_type='accounting',
    )

def check_text_balances(actual, expected, *expect_accounts):
    balance = Decimal()
    for expect_account in expect_accounts:
        expect_amount = expected[expect_account]
        balance += expect_amount
        if expect_account.startswith('Expenses:'):
            expect_amount *= -1
        if expect_amount:
            actual_account, actual_amount = next(actual)
            assert actual_account == expect_account
            assert actual_amount == format_amount(expect_amount)
    return balance

def check_text_report(output, project, start_date, stop_date):
    _, _, project = project.rpartition('=')
    balance_amount = Decimal(OPENING_BALANCES[project])
    expected = collections.defaultdict(Decimal)
    for year in range(2018, stop_date.year):
        try:
            amounts = BALANCES_BY_YEAR[(project, year)]
        except KeyError:
            pass
        else:
            for account, amount in amounts:
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
                    balance_amount += amount
                else:
                    expected[account] += amount
    actual = split_text_lines(output)
    next(actual); next(actual)  # Discard headers
    open_acct, open_amt = next(actual)
    assert open_acct == "{} balance as of {}".format(
        project, start_date.isoformat(),
    )
    assert open_amt == format_amount(balance_amount)
    balance_amount += check_text_balances(
        actual, expected,
        'Income:Other',
        'Expenses:Other',
        'Equity:Funds:Restricted',
        'Equity:Funds:Unrestricted',
        'Equity:Realized:CurrencyConversion',
    )
    next(actual)
    end_acct, end_amt = next(actual)
    assert end_acct == "{} balance as of {}".format(
        project, stop_date.isoformat(),
    )
    assert end_amt == format_amount(balance_amount)
    next(actual)
    balance_amount += check_text_balances(
        actual, expected,
        'Assets:Receivable:Accounts',
        'Assets:Prepaid:Expenses',
        'Liabilities:Payable:Accounts',
        'Liabilities:UnearnedIncome',
    )
    assert next(actual, None) is None

def check_cell_balance(cell, balance):
    if balance:
        assert cell.value == balance
    else:
        assert not cell.value

def check_ods_sheet(sheet, account_balances, *, full):
    total_keys = ['opening', 'Income', 'Expenses', 'Equity']
    if full:
        account_bals = account_balances.copy()
        unrestricted = account_bals.pop('Conservancy')
        total_keys += [
            'Assets:Receivable',
            'Assets:Prepaid',
            'Liabilities',
            'Liabilities:Payable',
        ]
    else:
        account_bals = {
            key: balances
            for key, balances in account_balances.items()
            if key != 'Conservancy' and any(v >= .5 for v in balances.values())
        }
    totals = {key: Decimal() for key in total_keys}
    for fund, balances in account_bals.items():
        for key in totals:
            totals[key] += balances[key]
    account_bals[''] = totals
    if full:
        account_bals['Unrestricted'] = unrestricted
    for row in itertools.islice(sheet.getElementsByType(odf.table.TableRow), 4, None):
        cells = iter(testutil.ODSCell.from_row(row))
        try:
            fund = next(cells).firstChild.text
        except AttributeError:
            fund = ''
        except StopIteration:
            continue
        try:
            balances = account_bals.pop(fund)
        except KeyError:
            pytest.fail(f"report included unexpected fund {fund}")
        check_cell_balance(next(cells), balances['opening'])
        check_cell_balance(next(cells), balances['Income'])
        if full:
            check_cell_balance(next(cells), -balances['Expenses'])
            check_cell_balance(next(cells), balances['Equity'])
        else:
            check_cell_balance(
                next(cells), -sum(balances[key] for key in ['Expenses', 'Equity']),
            )
        check_cell_balance(next(cells), sum(balances[key] for key in [
            'opening', 'Income', 'Expenses', 'Equity',
        ]))
        if full:
            check_cell_balance(next(cells), balances['Assets:Receivable'])
            check_cell_balance(next(cells), balances['Assets:Prepaid'])
            check_cell_balance(next(cells), balances['Liabilities'])
            check_cell_balance(next(cells), balances['Liabilities:Payable'])
        assert next(cells, None) is None
        if full and fund == 'Unrestricted':
            assert '' not in account_bals, "Unrestricted funds reported before subtotals"
            for key, bal in balances.items():
                totals[key] += bal
            account_bals[''] = totals
    assert not account_bals, "did not see all funds in report"

def check_ods_report(ods, start_date, stop_date):
    account_bals = collections.OrderedDict((key, {
        'opening': Decimal(amount),
        'Income': Decimal(0),
        'Expenses': Decimal(0),
        'Equity': Decimal(0),
        'Assets:Receivable': Decimal(0),
        'Assets:Prepaid': Decimal(0),
        'Liabilities:Payable': Decimal(0),
        'Liabilities': Decimal(0),  # UnearnedIncome
    }) for key, amount in sorted(OPENING_BALANCES.items()))
    for fund, year in itertools.product(account_bals, range(2018, stop_date.year)):
        try:
            amounts = BALANCES_BY_YEAR[(fund, year)]
        except KeyError:
            pass
        else:
            for account, amount in amounts:
                if account.startswith(EQUITY_ROOT_ACCOUNTS):
                    if year < start_date.year:
                        acct_key = 'opening'
                    else:
                        acct_key, _, _ = account.partition(':')
                else:
                    acct_key, _, _ = account.rpartition(':')
                account_bals[fund][acct_key] += amount
    sheets = iter(ods.getElementsByType(odf.table.Table))
    check_ods_sheet(next(sheets), account_bals, full=False)
    check_ods_sheet(next(sheets), account_bals, full=True)
    assert next(sheets, None) is None, "found unexpected sheet"

def run_main(out_type, arglist, config=None):
    if config is None:
        config = testutil.TestConfig(
            books_path=testutil.test_path('books/fund.beancount'),
        )
    arglist.insert(0, '--output-file=-')
    output = out_type()
    errors = io.StringIO()
    retcode = fund.main(arglist, output, errors, config)
    output.seek(0)
    return retcode, output, errors

@pytest.mark.parametrize('project,start_date,stop_date', [
    ('Conservancy', START_DATE, STOP_DATE),
    ('project=Conservancy', MID_DATE, STOP_DATE),
    ('Conservancy', START_DATE, MID_DATE),
    ('Alpha', START_DATE, STOP_DATE),
    ('project=Alpha', MID_DATE, STOP_DATE),
    ('Alpha', START_DATE, MID_DATE),
    ('Bravo', START_DATE, STOP_DATE),
    ('project=Bravo', MID_DATE, STOP_DATE),
    ('Bravo', START_DATE, MID_DATE),
    ('project=Charlie', START_DATE, STOP_DATE),
])
def test_text_report(project, start_date, stop_date):
    retcode, output, errors = run_main(io.StringIO, [
        '-b', start_date.isoformat(), '-e', stop_date.isoformat(), project,
    ])
    assert not errors.getvalue()
    assert retcode == 0
    check_text_report(output, project, start_date, stop_date)

def test_text_report_empty_balances():
    retcode, output, errors = run_main(io.StringIO, [
        '-t', 'text', '-b', '2018-01-01',
    ])
    assert not errors.getvalue()
    assert retcode == 0

@pytest.mark.parametrize('start_date,stop_date', [
    (START_DATE, STOP_DATE),
    (MID_DATE, STOP_DATE),
    (START_DATE, MID_DATE),
])
def test_ods_report(start_date, stop_date):
    retcode, output, errors = run_main(io.BytesIO, [
        '--begin', start_date.isoformat(), '--end', stop_date.isoformat(),
    ])
    assert not errors.getvalue()
    assert retcode == 0
    ods = odf.opendocument.load(output)
    check_ods_report(ods, start_date, stop_date)

def test_main_no_postings(caplog):
    retcode, output, errors = run_main(io.StringIO, ['NonexistentProject'])
    assert retcode == 65
    assert any(log.levelname == 'WARNING' for log in caplog.records)