Files @ 5784068904e8
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/test_books_loader.py

bkuhn
payroll-type — US:403b:Employee:Roth — needed separate since taxable

Since Roth contributions are taxable, there are some reports that
need to include these amounts in total salary (i.e., when running a
report that seeks to show total taxable income for an employee). As
such, we need a `payroll-type` specifically for Roth 403(b)
contributions.
"""test_books_loader - Unit tests for books Loader class"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import collections
import io
import itertools
import os
import re

from datetime import date
from pathlib import Path

import pytest

from . import testutil

from beancount.core import data as bc_data
from conservancy_beancount import books, data

FY_START_MONTH = 3

books_path = testutil.test_path('books')

class MockError(Exception):
    def __init__(self, message, lineno=0):
        self.message = message
        self.entry = None
        self.source = {'filename': 'test_books_loader.py', 'lineno': lineno}


class MockSearchTerm:
    def __init__(self, pred):
        self.pred = pred

    def filter_postings(self, postings):
        return (post for post in postings if self.pred(post))

    rewrite = filter_postings


SEARCH_TERMS = [
    MockSearchTerm(lambda post: post.account.startswith('Expenses:')),
    MockSearchTerm(lambda post: post.units.number >= 10),
]

clean_account_meta = pytest.fixture()(testutil.clean_account_meta)

@pytest.fixture(scope='module')
def conservancy_loader():
    return books.Loader(books_path, books.FiscalYear(FY_START_MONTH))

def check_openings(entries):
    openings = collections.defaultdict(int)
    for entry in entries:
        if isinstance(entry, bc_data.Open):
            openings[entry.account] += 1
    for account, count in openings.items():
        assert count == 1, f"found {count} open directives for {account}"

def txn_dates(entries):
    for entry in entries:
        if isinstance(entry, bc_data.Transaction):
            yield entry.date

def txn_years(entries):
    return frozenset(date.year for date in txn_dates(entries))

def test_load_result_returncode_ok():
    options_map = {'filename': 'test_load_result_returncode_ok'}
    result = books.LoadResult([testutil.Transaction()], [], options_map)
    assert result.returncode() == 0

def test_load_result_beancount_errors():
    error = MockError("empty transaction", lineno=65)
    options_map = dict(error.source)
    result = books.LoadResult([testutil.Transaction()], [error], options_map)
    assert 10 <= result.returncode() < 64

def test_load_result_config_error():
    error = MockError("no books available")
    result = books.LoadResult.empty(error)
    assert result.returncode() == os.EX_CONFIG

def test_load_result_no_entries():
    result = books.LoadResult.empty()
    assert result.returncode() == os.EX_NOINPUT

@pytest.mark.parametrize('arg_index,end_index', itertools.product(
    range(2),
    range(len(SEARCH_TERMS)),
))
def test_load_result_iter_postings_one_filter_set(arg_index, end_index):
    txn = testutil.Transaction(postings=[
        ('Expenses:Other', 20),
        ('Expenses:BankingFees', 2),
        ('Assets:Checking', -22),
    ])
    result = books.LoadResult.empty()
    result.entries.append(txn)
    args = (SEARCH_TERMS[:end_index], ())
    if arg_index:
        args = (args[1], args[0])
    actual = list(result.iter_postings(*args))
    expected = txn.postings[:-end_index or None]
    assert len(actual) == len(expected)
    for act_post, exp_post in zip(actual, expected):
        assert act_post.account == exp_post.account
        assert act_post.units == exp_post.units

def test_load_result_iter_postings_both_filter_sets():
    txn = testutil.Transaction(postings=[
        ('Expenses:Other', 20),
        ('Expenses:BankingFees', 2),
        ('Assets:Checking', -22),
    ])
    result = books.LoadResult.empty()
    result.entries.append(txn)
    actual = list(result.iter_postings(SEARCH_TERMS[:1], SEARCH_TERMS[1:]))
    assert len(actual) == 1
    assert actual[0].account == txn.postings[0].account
    assert actual[0].units == txn.postings[0].units

def test_load_result_account_metadata(clean_account_meta):
    accounts = ['Assets:Checking', 'Assets:Savings']
    result = books.LoadResult.empty()
    result.options_map['name_liabilities'] = 'Problems'
    result.entries.extend(
        bc_data.Open({}, date(2017, 3, day), name, None, None)
        for day, name in enumerate(accounts, 1)
    )
    result.load_account_metadata()
    for day, name in enumerate(accounts, 1):
        assert data.Account(name).meta.open_date == date(2017, 3, day)

@pytest.mark.parametrize('count', range(3))
def test_print_errors(count):
    error_lines = [75 + n for n in range(count)]
    result = books.LoadResult.empty()
    result.errors.extend(
        MockError("printed error", lineno=lineno)
        for lineno in error_lines
    )
    with io.StringIO() as out_file:
        actual = result.print_errors(out_file)
        matches = list(re.finditer(
            r'^test_books_loader\.py:(\d+):\s+printed error',
            out_file.getvalue(),
            re.MULTILINE,
        ))
    assert actual is bool(error_lines)
    assert len(error_lines) == len(matches)
    assert all(lineno == int(match.group(1)) for lineno, match in zip(error_lines, matches))

@pytest.mark.parametrize('from_fy,to_fy,expect_years', [
    (2019, 2019, range(2019, 2020)),
    (0, 2019, range(2019, 2020)),
    (2018, 2019, range(2018, 2020)),
    (1, 2018, range(2018, 2020)),
    (-1, 2019, range(2018, 2020)),
    (2019, 2020, range(2019, 2021)),
    (1, 2019, range(2019, 2021)),
    (-1, 2020, range(2019, 2021)),
    (2010, 2030, range(2018, 2021)),
    (20, 2010, range(2018, 2021)),
    (-20, 2030, range(2018, 2021)),
])
def test_load_fy_range(conservancy_loader, from_fy, to_fy, expect_years):
    entries, errors, options_map = conservancy_loader.load_fy_range(from_fy, to_fy)
    assert not errors
    actual_years = txn_years(entries)
    assert actual_years.issuperset(expect_years)
    assert min(actual_years) == expect_years.start

def test_load_fy_range_does_not_duplicate_openings(conservancy_loader):
    entries, errors, options_map = conservancy_loader.load_fy_range(2010, 2030)
    check_openings(entries)

def test_load_fy_range_empty(conservancy_loader):
    entries, errors, options_map = conservancy_loader.load_fy_range(2020, 2019)
    assert not errors
    assert not entries
    assert options_map.get('filename') is None

@pytest.mark.parametrize('from_year', [None, *range(2018, 2021)])
def test_load_all(conservancy_loader, from_year):
    entries, errors, options_map = conservancy_loader.load_all(from_year)
    from_year = from_year or 2018
    assert not errors
    check_openings(entries)
    actual_years = txn_years(entries)
    assert actual_years.issuperset(range(from_year, 2021))
    assert min(actual_years) == from_year

@pytest.mark.parametrize('from_date', [
    date(2019, 2, 1),
    date(2019, 9, 15),
    date(2020, 1, 20),
    date(2020, 5, 31),
])
def test_load_all_from_date(conservancy_loader, from_date):
    from_year = from_date.year
    if from_date.month < FY_START_MONTH:
        from_year -= 1
    entries, errors, options_map = conservancy_loader.load_all(from_date)
    assert not errors
    check_openings(entries)
    actual_years = txn_years(entries)
    assert actual_years.issuperset(range(from_year, 2021))
    assert min(actual_years) == from_year

def test_load_none_full_args():
    entries, errors, options_map = books.Loader.load_none('test.cfg', 42)
    assert not entries
    assert errors
    assert all(err.source['filename'] == 'test.cfg' for err in errors)
    assert all(err.source['lineno'] == 42 for err in errors)

def test_load_none_no_args():
    entries, errors, options_map = books.Loader.load_none()
    assert not entries
    assert errors
    assert all(isinstance(err.source['filename'], str) for err in errors)
    assert all(isinstance(err.source['lineno'], int) for err in errors)

def test_dispatch_empty():
    result = books.Loader.dispatch(None)
    assert not result.entries
    assert result.errors

@pytest.mark.parametrize('from_arg', [
    None,
    *range(2018, 2021),
    date(2019, 2, 1),
    date(2019, 9, 15),
    date(2020, 1, 20),
    date(2020, 5, 31),
])
def test_dispatch_load_all_from_year(conservancy_loader, from_arg):
    try:
        from_year = from_arg.year
    except AttributeError:
        from_year = from_arg or 2018
    else:
        if from_arg.month < FY_START_MONTH:
            from_year -= 1
    result = books.Loader.dispatch(conservancy_loader, from_arg)
    check_openings(result.entries)
    actual_years = txn_years(result.entries)
    assert actual_years.issuperset(range(from_year, 2021))
    assert min(actual_years) == from_year
    assert not result.errors

@pytest.mark.parametrize('from_arg,to_arg,expected', [
    (2019, 2019, range(2019, 2020)),
    (0, 2019, range(2019, 2020)),
    (2018, 2019, range(2018, 2020)),
    (1, 2018, range(2018, 2020)),
    (-1, 2019, range(2018, 2020)),
    (2019, 2020, range(2019, 2021)),
    (1, 2019, range(2019, 2021)),
    (-1, 2020, range(2019, 2021)),
    (2010, 2030, range(2018, 2021)),
    (20, 2010, range(2018, 2021)),
    (-20, 2030, range(2018, 2021)),
])
def test_dispatch_load_all_fy_range(conservancy_loader, from_arg, to_arg, expected):
    result = books.Loader.dispatch(conservancy_loader, from_arg, to_arg)
    check_openings(result.entries)
    actual_years = txn_years(result.entries)
    assert actual_years.issuperset(iter(expected))
    assert min(actual_years) == expected.start
    assert not result.errors