statement_reconciler: Add initial Chase bank CSV statement matching

We currently don't have many examples to work with, so haven't done any
significant testing of the matching accuracy between statement and books.
""" - Unit tests for fund report"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import collections
import copy
import datetime
import io
import itertools

import pytest

from . import testutil

import babel.numbers
import odf.opendocument
import odf.table

from beancount import loader as bc_loader
from conservancy_beancount import data
from conservancy_beancount.reports import core
from conservancy_beancount.reports import fund

from decimal import Decimal

_ledger_load = bc_loader.load_file(testutil.test_path('books/fund.beancount'))
START_DATE =, 3, 1)
MID_DATE =, 3, 1)
STOP_DATE =, 3, 1)

EQUITY_ROOT_ACCOUNTS = ('Expenses:', 'Equity:', 'Income:')

    'Alpha': 3000,
    'Bravo': 2000,
    'Charlie': 1000,
    'Conservancy': 4000,
    'Delta': 0,

    ('Conservancy', 2018): [
        ('Income:Other', 40),
        ('Expenses:Other', -4),
        ('Assets:Receivable:Accounts', 40),
        ('Liabilities:Payable:Accounts', 4),
    ('Conservancy', 2019): [
        ('Income:Other', 42),
        ('Expenses:Other', Decimal('-4.20')),
        ('Equity:Funds:Unrestricted', 100),
        ('Equity:Realized:CurrencyConversion', Decimal('6.20')),
        ('Assets:Receivable:Accounts', -40),
        ('Liabilities:Payable:Accounts', -4),
    ('Alpha', 2018): [
        ('Income:Other', 60),
        ('Liabilities:UnearnedIncome', 30),
        ('Assets:Prepaid:Expenses', 20),
    ('Alpha', 2019): [
        ('Income:Other', 30),
        ('Expenses:Other', -26),
        ('Assets:Prepaid:Expenses', -20),
        ('Liabilities:UnearnedIncome', -30),
    ('Bravo', 2018): [
        ('Expenses:Other', -20),
    ('Bravo', 2019): [
        ('Income:Other', 200),
    ('Charlie', 2019): [
        ('Equity:Funds:Restricted', -100),
    ('Delta', 2018): [
        ('Income:Other', Decimal('.40')),
    ('Delta', 2019): [
        ('Income:Other', Decimal('4.60')),

clean_account_meta = pytest.fixture(autouse=True)(testutil.clean_account_meta)

def fund_entries():
    return copy.deepcopy(_ledger_load[0])

def split_text_lines(output):
    for line in output:
        account, amount = line.rsplit(None, 1)
        yield account.strip(), amount

def format_amount(amount, currency='USD'):
    return babel.numbers.format_currency(
        amount, currency, format_type='accounting',

def check_text_balances(actual, expected, *expect_accounts):
    balance = Decimal()
    for expect_account in expect_accounts:
        expect_amount = expected[expect_account]
        balance += expect_amount
        if expect_account.startswith('Expenses:'):
            expect_amount *= -1
        if expect_amount:
            actual_account, actual_amount = next(actual)
            assert actual_account == expect_account
            assert actual_amount == format_amount(expect_amount)
    return balance

def check_text_report(output, project, start_date, stop_date):
    _, _, project = project.rpartition('=')
    balance_amount = Decimal(OPENING_BALANCES[project])
    expected = collections.defaultdict(Decimal)
    for year in range(2018, stop_date.year):
            amounts = BALANCES_BY_YEAR[(project, year)]
        except KeyError:
            for account, amount in amounts:
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
                    balance_amount += amount
                    expected[account] += amount
    actual = split_text_lines(output)
    next(actual); next(actual)  # Discard headers
    open_acct, open_amt = next(actual)
    assert open_acct == "{} balance as of {}".format(
        project, start_date.isoformat(),
    assert open_amt == format_amount(balance_amount)
    balance_amount += check_text_balances(
        actual, expected,
    end_acct, end_amt = next(actual)
    assert end_acct == "{} balance as of {}".format(
        project, stop_date.isoformat(),
    assert end_amt == format_amount(balance_amount)
    balance_amount += check_text_balances(
        actual, expected,
    assert next(actual, None) is None

def check_cell_balance(cell, balance):
    if balance:
        assert cell.value == balance
        assert not cell.value

def check_ods_sheet(sheet, account_balances, *, full):
    total_keys = ['opening', 'Income', 'Expenses', 'Equity']
    if full:
        account_bals = account_balances.copy()
        unrestricted = account_bals.pop('Conservancy')
        total_keys += [
        account_bals = {
            key: balances
            for key, balances in account_balances.items()
            if key != 'Conservancy' and any(v >= .5 for v in balances.values())
    totals = {key: Decimal() for key in total_keys}
    for fund, balances in account_bals.items():
        for key in totals:
            totals[key] += balances[key]
    account_bals[''] = totals
    if full:
        account_bals['Unrestricted'] = unrestricted
    for row in itertools.islice(sheet.getElementsByType(odf.table.TableRow), 4, None):
        cells = iter(testutil.ODSCell.from_row(row))
            fund = next(cells).firstChild.text
        except AttributeError:
            fund = ''
        except StopIteration:
            balances = account_bals.pop(fund)
        except KeyError:
  "report included unexpected fund {fund}")
        check_cell_balance(next(cells), balances['opening'])
        check_cell_balance(next(cells), balances['Income'])
        if full:
            check_cell_balance(next(cells), -balances['Expenses'])
            check_cell_balance(next(cells), balances['Equity'])
                next(cells), -sum(balances[key] for key in ['Expenses', 'Equity']),
        check_cell_balance(next(cells), sum(balances[key] for key in [
            'opening', 'Income', 'Expenses', 'Equity',
        if full:
            check_cell_balance(next(cells), balances['Assets:Receivable'])
            check_cell_balance(next(cells), balances['Assets:Prepaid'])
            check_cell_balance(next(cells), balances['Liabilities'])
            check_cell_balance(next(cells), balances['Liabilities:Payable'])
        assert next(cells, None) is None
        if full and fund == 'Unrestricted':
            assert '' not in account_bals, "Unrestricted funds reported before subtotals"
            for key, bal in balances.items():
                totals[key] += bal
            account_bals[''] = totals
    assert not account_bals, "did not see all funds in report"

def check_ods_report(ods, start_date, stop_date):
    account_bals = collections.OrderedDict((key, {
        'opening': Decimal(amount),
        'Income': Decimal(0),
        'Expenses': Decimal(0),
        'Equity': Decimal(0),
        'Assets:Receivable': Decimal(0),
        'Assets:Prepaid': Decimal(0),
        'Liabilities:Payable': Decimal(0),
        'Liabilities': Decimal(0),  # UnearnedIncome
    }) for key, amount in sorted(OPENING_BALANCES.items()))
    for fund, year in itertools.product(account_bals, range(2018, stop_date.year)):
            amounts = BALANCES_BY_YEAR[(fund, year)]
        except KeyError:
            for account, amount in amounts:
                if account.startswith(EQUITY_ROOT_ACCOUNTS):
                    if year < start_date.year:
                        acct_key = 'opening'
                        acct_key, _, _ = account.partition(':')
                    acct_key, _, _ = account.rpartition(':')
                account_bals[fund][acct_key] += amount
    sheets = iter(ods.getElementsByType(odf.table.Table))
    check_ods_sheet(next(sheets), account_bals, full=False)
    check_ods_sheet(next(sheets), account_bals, full=True)
    assert next(sheets, None) is None, "found unexpected sheet"

def run_main(out_type, arglist, config=None):
    if config is None:
        config = testutil.TestConfig(
    arglist.insert(0, '--output-file=-')
    output = out_type()
    errors = io.StringIO()
    retcode = fund.main(arglist, output, errors, config)
    return retcode, output, errors

@pytest.mark.parametrize('project,start_date,stop_date', [
    ('Conservancy', START_DATE, STOP_DATE),
    ('project=Conservancy', MID_DATE, STOP_DATE),
    ('Conservancy', START_DATE, MID_DATE),
    ('Alpha', START_DATE, STOP_DATE),
    ('project=Alpha', MID_DATE, STOP_DATE),
    ('Alpha', START_DATE, MID_DATE),
    ('Bravo', START_DATE, STOP_DATE),
    ('project=Bravo', MID_DATE, STOP_DATE),
    ('Bravo', START_DATE, MID_DATE),
    ('project=Charlie', START_DATE, STOP_DATE),
def test_text_report(project, start_date, stop_date):
    retcode, output, errors = run_main(io.StringIO, [
        '-b', start_date.isoformat(), '-e', stop_date.isoformat(), project,
    assert not errors.getvalue()
    assert retcode == 0
    check_text_report(output, project, start_date, stop_date)

def test_text_report_empty_balances():
    retcode, output, errors = run_main(io.StringIO, [
        '-t', 'text', '-b', '2018-01-01',
    assert not errors.getvalue()
    assert retcode == 0

@pytest.mark.parametrize('start_date,stop_date', [
def test_ods_report(start_date, stop_date):
    retcode, output, errors = run_main(io.BytesIO, [
        '--begin', start_date.isoformat(), '--end', stop_date.isoformat(),
    assert not errors.getvalue()
    assert retcode == 0
    ods = odf.opendocument.load(output)
    check_ods_report(ods, start_date, stop_date)

def test_main_no_postings(caplog):
    retcode, output, errors = run_main(io.StringIO, ['NonexistentProject'])
    assert retcode == 65
    assert any(log.levelname == 'WARNING' for log in caplog.records)