Files @ 5e295f1024ae
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/test_reports_ledger.py

Brett Smith
accrual: Change args.since default.

This default makes more since with the way we're going to stop having
opening balances in open books.
"""test_reports_ledger.py - Unit tests for general ledger report"""
# Copyright © 2020  Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import collections
import copy
import datetime
import io
import re

import pytest

from . import testutil

import odf.table
import odf.text

from beancount.core import data as bc_data
from beancount import loader as bc_loader
from conservancy_beancount import data
from conservancy_beancount.reports import core
from conservancy_beancount.reports import ledger

Acct = data.Account

_ledger_load = bc_loader.load_file(testutil.test_path('books/ledger.beancount'))
DEFAULT_REPORT_SHEETS = [
    'Balance',
    'Income',
    'Expenses',
    'Equity',
    'Assets:Receivable',
    'Liabilities:Payable',
    'Assets:PayPal',
    'Assets',
    'Liabilities',
]
PROJECT_REPORT_SHEETS = [
    'Balance',
    'Income',
    'Expenses',
    'Assets:Receivable',
    'Liabilities:Payable',
]
OVERSIZE_RE = re.compile(
    r'^([A-Za-z0-9:]+) has ([0-9,]+) rows, over size ([0-9,]+)$'
)
START_DATE = datetime.date(2018, 3, 1)
MID_DATE = datetime.date(2019, 3, 1)
STOP_DATE = datetime.date(2020, 3, 1)

@pytest.fixture
def ledger_entries():
    return copy.deepcopy(_ledger_load[0])

class NotFound(Exception): pass
class NoSheet(NotFound): pass
class NoHeader(NotFound): pass

class ExpectedPostings(core.RelatedPostings):
    def slice_date_range(self, start_date, end_date):
        postings = enumerate(self)
        for start_index, post in postings:
            if start_date <= post.meta.date:
                break
        else:
            start_index += 1
        if end_date <= post.meta.date:
            end_index = start_index
        else:
            for end_index, post in postings:
                if end_date <= post.meta.date:
                    break
            else:
                end_index = None
        return (self[:start_index].balance_at_cost(),
                self[start_index:end_index])

    def check_report(self, ods, start_date, end_date):
        account = self[0].account
        norm_func = core.normalize_amount_func(account)
        open_bal, expect_posts = self.slice_date_range(start_date, end_date)
        open_bal = norm_func(open_bal)
        for sheet in ods.getElementsByType(odf.table.Table):
            sheet_account = sheet.getAttribute('name').replace(' ', ':')
            if sheet_account and account.is_under(sheet_account):
                break
        else:
            raise NoSheet(account)
        rows = iter(sheet.getElementsByType(odf.table.TableRow))
        for row in rows:
            cells = row.childNodes
            if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
                break
        else:
            if expect_posts:
                raise NoHeader(account)
            else:
                return
        if account.is_under('Assets', 'Equity', 'Liabilities'):
            opening_row = testutil.ODSCell.from_row(next(rows))
            assert opening_row[0].value == start_date
            assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0')
        for expected in expect_posts:
            cells = iter(testutil.ODSCell.from_row(next(rows)))
            assert next(cells).value == expected.meta.date
            assert next(cells).text == (expected.meta.get('entity') or '')
            assert next(cells).text == (expected.meta.txn.narration or '')
            if expected.cost is None:
                assert not next(cells).text
                assert next(cells).value == norm_func(expected.units.number)
            else:
                assert next(cells).value == norm_func(expected.units.number)
                assert next(cells).value == norm_func(expected.at_cost().number)
        closing_row = testutil.ODSCell.from_row(next(rows))
        closing_bal = open_bal + norm_func(expect_posts.balance_at_cost())
        assert closing_row[0].value == end_date
        assert closing_row[4].text == closing_bal.format(None, empty='0', sep='\0')


def get_sheet_names(ods):
    return [sheet.getAttribute('name').replace(' ', ':')
            for sheet in ods.getElementsByType(odf.table.Table)]

def check_oversize_logs(caplog, accounts, sheet_size):
    actual = {}
    for log in caplog.records:
        match = OVERSIZE_RE.match(log.message)
        if match:
            assert int(match.group(3).replace(',', '')) == sheet_size
            actual[match.group(1)] = int(match.group(2).replace(',', ''))
    expected = {name: size for name, size in accounts.items() if size > sheet_size}
    assert actual == expected

def test_plan_sheets_no_change():
    have = {
        Acct('Assets:Cash'): 10,
        Acct('Income:Donations'): 20,
    }
    want = ['Assets', 'Income']
    actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100)
    assert actual == want

@pytest.mark.parametrize('have', [
    {},
    {Acct('Income:Other'): 10},
    {Acct('Assets:Checking'): 20, Acct('Expenses:Other'): 15},
])
def test_plan_sheets_includes_accounts_without_transactions(have):
    want = ['Assets', 'Income', 'Expenses']
    actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100)
    assert actual == want

def test_plan_sheets_single_split():
    have = {
        Acct('Assets:Cash'): 60,
        Acct('Assets:Checking'): 80,
        Acct('Income:Donations'): 50,
        Acct('Expenses:Travel'): 90,
        Acct('Expenses:FilingFees'): 25,
    }
    want = ['Assets', 'Income', 'Expenses']
    actual = ledger.LedgerODS.plan_sheets(have, want, 100)
    assert actual == [
        'Assets:Checking',
        'Assets',
        'Income',
        'Expenses:Travel',
        'Expenses',
    ]

def test_plan_sheets_split_subtree():
    have = {
        Acct('Assets:Bank1:Checking'): 80,
        Acct('Assets:Bank1:Savings'): 10,
        Acct('Assets:Cash:USD'): 20,
        Acct('Assets:Cash:EUR'): 15,
    }
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
    assert actual == ['Assets:Bank1', 'Assets']

def test_plan_sheets_ambiguous_split():
    have = {
        Acct('Assets:Bank1:Checking'): 80,
        Acct('Assets:Bank1:Savings'): 40,
        Acct('Assets:Receivable:Accounts'): 40,
        Acct('Assets:Cash'): 10,
    }
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
    # :Savings cannot fit with :Checking, so it's important that the return
    # value disambiguate that.
    assert actual == ['Assets:Bank1:Checking', 'Assets']

def test_plan_sheets_oversize(caplog):
    have = {
        Acct('Assets:Checking'): 150,
        Acct('Assets:Cash'): 50,
    }
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
    assert actual == ['Assets:Checking', 'Assets']
    check_oversize_logs(caplog, have, 100)

def test_plan_sheets_all_oversize(caplog):
    have = {
        Acct('Assets:Checking'): 150,
        Acct('Assets:Cash'): 150,
    }
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
    # In this case, each account should appear in alphabetical order.
    assert actual == ['Assets:Cash', 'Assets:Checking']
    check_oversize_logs(caplog, have, 100)

def test_plan_sheets_full_split_required(caplog):
    have = {
        Acct('Assets:Bank:Savings'): 98,
        Acct('Assets:Bank:Checking'): 96,
        Acct('Assets:Bank:Investment'): 94,
    }
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
    assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets']
    assert not caplog.records

@pytest.mark.parametrize('start_date,stop_date', [
    (START_DATE, STOP_DATE),
    (START_DATE, MID_DATE),
    (MID_DATE, STOP_DATE),
    (START_DATE.replace(month=6), START_DATE.replace(month=12)),
    (STOP_DATE, STOP_DATE.replace(month=12)),
])
def test_date_range_report(ledger_entries, start_date, stop_date):
    postings = list(data.Posting.from_entries(ledger_entries))
    report = ledger.LedgerODS(start_date, stop_date)
    report.write(iter(postings))
    for _, expected in ExpectedPostings.group_by_account(postings):
        expected.check_report(report.document, start_date, stop_date)

@pytest.mark.parametrize('sheet_names', [
    ('Income', 'Expenses'),
    ('Assets:Receivable', 'Liabilities:Payable'),
])
def test_account_names_report(ledger_entries, sheet_names):
    postings = list(data.Posting.from_entries(ledger_entries))
    report = ledger.LedgerODS(START_DATE, STOP_DATE, sheet_names=sheet_names)
    report.write(iter(postings))
    for key, expected in ExpectedPostings.group_by_account(postings):
        should_find = key.startswith(sheet_names)
        try:
            expected.check_report(report.document, START_DATE, STOP_DATE)
        except NotFound:
            assert not should_find
        else:
            assert should_find

def run_main(arglist, config=None):
    if config is None:
        config = testutil.TestConfig(
            books_path=testutil.test_path('books/ledger.beancount'),
            rt_client=testutil.RTClient(),
        )
    arglist.insert(0, '--output-file=-')
    output = io.BytesIO()
    errors = io.StringIO()
    retcode = ledger.main(arglist, output, errors, config)
    output.seek(0)
    return retcode, output, errors

def test_main(ledger_entries):
    retcode, output, errors = run_main([
        '-b', START_DATE.isoformat(),
        '-e', STOP_DATE.isoformat(),
    ])
    assert not errors.getvalue()
    assert retcode == 0
    ods = odf.opendocument.load(output)
    assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:]
    postings = data.Posting.from_entries(ledger_entries)
    for _, expected in ExpectedPostings.group_by_account(postings):
        expected.check_report(ods, START_DATE, STOP_DATE)

@pytest.mark.parametrize('project,start_date,stop_date', [
    ('eighteen', START_DATE, MID_DATE.replace(day=30)),
    ('nineteen', MID_DATE, STOP_DATE),
])
def test_main_project_report(ledger_entries, project, start_date, stop_date):
    postings = data.Posting.from_entries(ledger_entries)
    for key, related in ExpectedPostings.group_by_meta(postings, 'project'):
        if key == project:
            break
    assert key == project
    retcode, output, errors = run_main([
        f'--begin={start_date.isoformat()}',
        f'--end={stop_date.isoformat()}',
        project,
    ])
    assert not errors.getvalue()
    assert retcode == 0
    ods = odf.opendocument.load(output)
    assert get_sheet_names(ods) == PROJECT_REPORT_SHEETS[:]
    for _, expected in ExpectedPostings.group_by_account(related):
        expected.check_report(ods, start_date, stop_date)

def test_main_no_postings(caplog):
    retcode, output, errors = run_main(['NonexistentProject'])
    assert retcode == 24
    assert any(log.levelname == 'WARNING' for log in caplog.records)