@@ -22,10 +22,19 @@ import itertools
import logging
import re
import babel.numbers
import odf.opendocument
import odf.table
import odf.text
import pytest
from . import testutil
from decimal import Decimal
from typing import NamedTuple, Optional, Sequence
from beancount.core import data as bc_data
from beancount import loader as bc_loader
from conservancy_beancount import data
from conservancy_beancount import rtutil
@@ -58,6 +67,58 @@ CONSISTENT_METADATA = [
'purchase-order',
]
class AgingRow(NamedTuple):
date: datetime.date
entity: Sequence[str]
amount: Optional[Sequence[bc_data.Amount]]
at_cost: bc_data.Amount
rt_id: Sequence[str]
invoice: Sequence[str]
@classmethod
def make_simple(cls, date, entity, at_cost, invoice, rt_id=None, orig_amount=None):
if isinstance(date, str):
date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
if not isinstance(at_cost, tuple):
at_cost = testutil.Amount(at_cost)
if rt_id is None:
rt_id, _, _ = invoice.partition('/')
return cls(date, [entity], orig_amount, at_cost, [rt_id], [invoice])
def check_row_match(self, sheet_row):
cells = testutil.ODSCell.from_row(sheet_row)
assert len(cells) == len(self)
cells = iter(cells)
assert next(cells).value == self.date
assert next(cells).text == '\0'.join(self.entity)
assert next(cells).text == '\0'.join(
babel.numbers.format_currency(number, currency, format_type='accounting')
for number, currency in self.amount or ()
)
usd_cell = next(cells)
assert usd_cell.value_type == 'currency'
assert usd_cell.value == self.at_cost.number
for index, cell in enumerate(cells):
links = cell.getElementsByType(odf.text.A)
assert len(links) == len(cell.childNodes)
assert index >= 1
AGING_AP = [
AgingRow.make_simple('2010-03-06', 'EarlyBird', -125, 'rt:44/440'),
AgingRow.make_simple('2010-03-30', 'EarlyBird', 75, 'rt:490/4900'),
AgingRow.make_simple('2010-04-30', 'Vendor', 200, 'FIXME'),
AgingRow.make_simple('2010-06-10', 'Lawyer', 280, 'rt:510/6100'),
AgingRow.make_simple('2010-06-18', 'EuroGov', 1100, 'rt:520/5200',
orig_amount=[testutil.Amount(1000, 'EUR')]),
AGING_AR = [
AgingRow.make_simple('2010-03-05', 'EarlyBird', -500, 'rt:40/400'),
AgingRow.make_simple('2010-05-15', 'MatchingProgram', 1500,
'rt://ticket/515/attachments/5150'),
class RTClient(testutil.RTClient):
TICKET_DATA = {
'40': [
@@ -102,6 +163,66 @@ def accruals_by_meta(postings, value, key='invoice', wrap_type=iter):
and post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
def find_row_by_text(row_source, want_text):
for row in row_source:
try:
found_row = row.childNodes[0].text == want_text
except IndexError:
found_row = False
if found_row:
return row
return None
def check_aging_sheet(sheet, aging_rows, date, accrue_date):
if not aging_rows:
return
if isinstance(accrue_date, int):
accrue_date = date + datetime.timedelta(days=accrue_date)
rows = iter(sheet.getElementsByType(odf.table.TableRow))
for row in rows:
if "Aging Report" in row.text:
break
else:
assert None, "Header row not found"
assert f"Accrued by {accrue_date.isoformat()}" in row.text
assert f"Unpaid by {date.isoformat()}" in row.text
expect_rows = iter(aging_rows)
row0 = find_row_by_text(rows, aging_rows[0].date.isoformat())
next(expect_rows).check_row_match(row0)
for actual, expected in zip(rows, expect_rows):
expected.check_row_match(actual)
if row.text.startswith("Total Aged Over "):
assert None, "Totals rows not found"
actual_sum = Decimal(row.childNodes[-1].value)
actual_sum += Decimal(row.childNodes[-1].value)
assert actual_sum == sum(
row.at_cost.number
for row in aging_rows
if row.date <= accrue_date
and row.at_cost.number > 0
def check_aging_ods(ods_file,
date=None,
recv_rows=AGING_AR,
pay_rows=AGING_AP,
):
if date is None:
date = datetime.date.today()
ods_file.seek(0)
ods = odf.opendocument.load(ods_file)
sheets = ods.spreadsheet.getElementsByType(odf.table.Table)
assert len(sheets) == 2
check_aging_sheet(sheets[0], recv_rows, date, -60)
check_aging_sheet(sheets[1], pay_rows, date, -30)
@pytest.mark.parametrize('link_fmt', [
'{}',
'rt:{}',
@@ -180,8 +301,10 @@ def test_filter_search(accrual_postings, search_terms, expect_count, check_func)
assert check_func(post)
@pytest.mark.parametrize('arg,expected', [
('aging', accrual.AgingReport),
('balance', accrual.BalanceReport),
('outgoing', accrual.OutgoingReport),
('age', accrual.AgingReport),
('bal', accrual.BalanceReport),
('out', accrual.OutgoingReport),
('outgoings', accrual.OutgoingReport),
@@ -399,10 +522,10 @@ def run_outgoing(invoice, postings, rt_client=None):
return output
@pytest.mark.parametrize('invoice,expected', [
('rt:505/5050', "Zero balance outstanding since 2020-05-05"),
('rt:510/5100', "Zero balance outstanding since 2020-05-10"),
('rt:510/6100', "-280.00 USD outstanding since 2020-06-10"),
('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2020-05-15",),
('rt:505/5050', "Zero balance outstanding since 2010-05-05"),
('rt:510/5100', "Zero balance outstanding since 2010-05-10"),
('rt:510/6100', "-280.00 USD outstanding since 2010-06-10"),
('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2010-05-15",),
])
def test_balance_report(accrual_postings, invoice, expected, caplog):
related = accruals_by_meta(accrual_postings, invoice, wrap_type=accrual.AccrualPostings)
@@ -429,10 +552,10 @@ def test_outgoing_report(accrual_postings, caplog):
r'^BEANCOUNT ENTRIES:$',
# For each transaction, check for the date line, a metadata, and the
# Expenses posting.
r'^\s*2020-06-10\s',
r'^\s*2010-06-10\s',
fr'^\s+rt-id: "{rt_id_url}"$',
r'^\s+Expenses:Services:Legal\s+220\.00 USD$',
r'^\s*2020-06-12\s',
r'^\s*2010-06-12\s',
fr'^\s+contract: "{contract_url}"$',
r'^\s+Expenses:FilingFees\s+60\.00 USD$',
@@ -469,6 +592,41 @@ def test_outgoing_report_without_rt_id(accrual_postings, caplog):
assert not output.getvalue()
def run_aging_report(postings, today=None):
if today is None:
today = datetime.date.today()
postings = (
post for post in postings
if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
groups = {
key: group
for _, related in accrual.AccrualPostings.group_by_meta(postings, 'invoice')
for key, group in related.make_consistent()
}
output = io.BytesIO()
rt_client = RTClient()
report = accrual.AgingReport(rt_client, output, today)
report.run(groups)
def test_aging_report(accrual_postings):
output = run_aging_report(accrual_postings)
check_aging_ods(output)
@pytest.mark.parametrize('date,recv_end,pay_end', [
# Both these dates are chosen for their off-by-one potential:
# the first is exactly 30 days after the 2010-06-10 payable;
# the second is exactly 60 days after the 2010-05-15 receivable.
(datetime.date(2010, 7, 10), 1, 4),
(datetime.date(2010, 7, 14), 2, 4),
def test_aging_report_date_cutoffs(accrual_postings, date, recv_end, pay_end):
expect_recv = AGING_AR[:recv_end]
expect_pay = AGING_AP[:pay_end]
output = run_aging_report(accrual_postings, date)
check_aging_ods(output, date, expect_recv, expect_pay)
def run_main(arglist, config=None):
if config is None:
config = testutil.TestConfig(
@@ -527,7 +685,7 @@ def test_main_outgoing_report(arglist):
check_output(output, [
r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
r'^TOTAL TO PAY: \$280\.00$',
@@ -542,9 +700,29 @@ def test_main_balance_report(arglist):
assert retcode == 0
r'\brt://ticket/515/attachments/5150:$',
r'^\s+1,500\.00 USD outstanding since 2020-05-15$',
r'^\s+1,500\.00 USD outstanding since 2010-05-15$',
@pytest.mark.parametrize('arglist', [
[],
['-t', 'aging', 'entity=Lawyer'],
def test_main_aging_report(tmp_path, arglist):
if arglist:
recv_rows = [row for row in AGING_AR if 'Lawyer' in row.entity]
pay_rows = [row for row in AGING_AP if 'Lawyer' in row.entity]
recv_rows = AGING_AR
pay_rows = AGING_AP
output_path = tmp_path / 'AgingReport.ods'
arglist.insert(0, f'--output-file={output_path}')
retcode, output, errors = run_main(arglist)
assert not errors.getvalue()
with output_path.open('rb') as ods_file:
check_aging_ods(ods_file, None, recv_rows, pay_rows)
def test_main_no_books():
check_main_fails([], testutil.TestConfig(), 1 | 8, [
r':1: +no books to load in configuration\b',