Changeset - 42b3e6ca1742
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-07-01 16:00:17
brettcsmith@brettcsmith.org
accruals: Aging report shows all unpaid accruals color coded by age.

Some readers care about recent accruals, some don't. This presentation
accommmodates both audiences, providing the data while making it easy to
ignore or filter out recent accruals.
3 files changed with 97 insertions and 82 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -103,12 +103,13 @@ from ..beancount_types import (
 
    Errors,
 
    MetaKey,
 
    MetaValue,
 
    Transaction,
 
)
 

	
 
import odf.element  # type:ignore[import]
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 
import rt
 

	
 
from beancount.parser import printer as bc_printer
 

	
...
 
@@ -233,12 +234,19 @@ class BaseReport:
 
        for index, invoice in enumerate(groups):
 
            for line in self._report(groups[invoice], index):
 
                print(line, file=self.out_file)
 

	
 

	
 
class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
    AGE_COLORS = [
 
        '#ff00ff',
 
        '#ff0000',
 
        '#ff8800',
 
        '#ffff00',
 
        '#00ff00',
 
    ]
 
    DOC_COLUMNS = [
 
        'rt-id',
 
        'invoice',
 
        'approval',
 
        'contract',
 
        'purchase-order',
...
 
@@ -280,90 +288,108 @@ class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
                self.string_cell(name, stylename=self.style_bold)
 
                for name in self.COLUMNS
 
            ))
 
            self.lock_first_row()
 

	
 
    def start_section(self, key: data.Account) -> None:
 
        self.norm_func = core.normalize_amount_func(key)
 
        self.age_thresholds = list(AccrualAccount.by_account(key).value.aging_thresholds)
 
        accrual_type = AccrualAccount.by_account(key)
 
        self.norm_func = accrual_type.normalize_amount
 
        self.age_thresholds = list(accrual_type.value.aging_thresholds)
 
        self.age_thresholds.append(-sys.maxsize)
 
        self.age_balances = [core.MutableBalance() for _ in self.age_thresholds]
 
        accrual_date = self.date - datetime.timedelta(days=self.age_thresholds[-1])
 
        self.age_styles = [
 
            self.merge_styles(self.style_date, self.border_style(
 
                core.Border.LEFT, '10pt', 'solid', color,
 
            )) for color in self.AGE_COLORS
 
        ]
 
        acct_parts = key.slice_parts()
 
        self.use_sheet(acct_parts[1])
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"{' '.join(acct_parts[2:])} {acct_parts[1]} Aging Report"
 
            f" Accrued by {accrual_date.isoformat()} Unpaid by {self.date.isoformat()}",
 
            f" for {self.date.isoformat()}",
 
            stylename=self.merge_styles(self.style_bold, self.style_centertext),
 
            numbercolumnsspanned=self.COL_COUNT,
 
        ))
 
        self.add_row()
 

	
 
    def end_section(self, key: data.Account) -> None:
 
        total_balance = core.MutableBalance()
 
        text_style = self.merge_styles(self.style_bold, self.style_endtext)
 
        text_span = 4
 
        last_age_text: Optional[str] = None
 
        self.add_row()
 
        for threshold, balance in zip(self.age_thresholds, self.age_balances):
 
        for threshold, balance, style in zip(
 
                self.age_thresholds, self.age_balances, self.age_styles,
 
        ):
 
            years, days = divmod(threshold, 365)
 
            years_text = f"{years} {'Year' if years == 1 else 'Years'}"
 
            days_text = f"{days} Days"
 
            if years and days:
 
                age_text = f"{years_text} {days_text}"
 
            elif years:
 
                age_text = years_text
 
            else:
 
                age_text = days_text
 
            if last_age_text is None:
 
                age_range = f"Over {age_text}"
 
            elif threshold < 0:
 
                self.add_row(
 
                    self.string_cell(
 
                        f"Total Unpaid Over {last_age_text}: ",
 
                        stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
                        numbercolumnsspanned=text_span,
 
                    ),
 
                    *(odf.table.TableCell() for _ in range(1, text_span)),
 
                    self.balance_cell(total_balance),
 
                )
 
                age_range = f"Under {last_age_text}"
 
            else:
 
                age_range = f"{age_text}–{last_age_text}"
 
            self.add_row(
 
                self.string_cell(
 
                    f"Total Aged {age_range}: ",
 
                    stylename=text_style,
 
                    stylename=self.merge_styles(self.style_bold, self.style_endtext, style),
 
                    numbercolumnsspanned=text_span,
 
                ),
 
                *(odf.table.TableCell() for _ in range(1, text_span)),
 
                self.balance_cell(balance),
 
            )
 
            last_age_text = age_text
 
            total_balance += balance
 
        self.add_row(
 
            self.string_cell(
 
                "Total Unpaid: ",
 
                stylename=text_style,
 
                stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
                numbercolumnsspanned=text_span,
 
            ),
 
            *(odf.table.TableCell() for _ in range(1, text_span)),
 
            self.balance_cell(total_balance),
 
        )
 

	
 
    def write_row(self, row: AccrualPostings) -> None:
 
        row_date = row[0].meta.date
 
        row_balance = self.norm_func(row.balance_at_cost())
 
        age = (self.date - row_date).days
 
        if row_balance.ge_zero():
 
            for index, threshold in enumerate(self.age_thresholds):
 
                if age >= threshold:
 
        for index, threshold in enumerate(self.age_thresholds):
 
            if age >= threshold:
 
                if row_balance.ge_zero():
 
                    self.age_balances[index] += row_balance
 
                    break
 
            else:
 
                return
 
                break
 
        else:
 
            return
 
        raw_balance = self.norm_func(row.balance())
 
        if raw_balance == row_balance:
 
            amount_cell = odf.table.TableCell()
 
        else:
 
            amount_cell = self.balance_cell(raw_balance)
 
        entities = row.meta_values('entity')
 
        entities.discard(None)
 
        projects = row.meta_values('project')
 
        projects.discard(None)
 
        self.add_row(
 
            self.date_cell(row_date),
 
            self.date_cell(row_date, stylename=self.age_styles[index]),
 
            self.multiline_cell(sorted(entities)),
 
            amount_cell,
 
            self.balance_cell(row_balance),
 
            self.multiline_cell(sorted(projects)),
 
            *(self.meta_links_cell(row.all_meta_links(key))
 
              for key in self.DOC_COLUMNS),
setup.py
Show inline comments
...
 
@@ -2,13 +2,13 @@
 

	
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.5.4',
 
    version='1.5.5',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'babel>=2.6',  # Debian:python3-babel
tests/test_reports_accrual.py
Show inline comments
...
 
@@ -58,12 +58,14 @@ ACCOUNTS = [
 
    'Assets:Receivable:Accounts',
 
    'Assets:Receivable:Loans',
 
    'Liabilities:Payable:Accounts',
 
    'Liabilities:Payable:Vacation',
 
]
 

	
 
AGE_SUM_RE = re.compile(r'(?:\b(\d+) Years?)?(?: ?\b(\d+) Days?)?[–:]')
 

	
 
class AgingRow(NamedTuple):
 
    date: datetime.date
 
    entity: Sequence[str]
 
    amount: Optional[Sequence[bc_data.Amount]]
 
    at_cost: bc_data.Amount
 
    rt_id: Sequence[str]
...
 
@@ -163,63 +165,65 @@ def find_row_by_text(row_source, want_text):
 
        try:
 
            found_row = row.childNodes[0].text == want_text
 
        except IndexError:
 
            found_row = False
 
        if found_row:
 
            return row
 
    return None
 
    pytest.fail(f"did not find row with text {want_text!r}")
 

	
 
def check_age_sum(aging_rows, row, date):
 
    text = row.firstChild.text
 
    ages = [int(match.group(1) or 0) * 365 + int(match.group(2) or 0)
 
            for match in AGE_SUM_RE.finditer(text)]
 
    if len(ages) == 1:
 
        # datetime only supports a 10K year range so this should cover all of it
 
        if text.startswith('Total Aged Over '):
 
            age_range = range(ages[0], 3650000)
 
        else:
 
            age_range = range(-3650000, ages[0])
 
    elif len(ages) == 2:
 
        age_range = range(*ages)
 
    else:
 
        pytest.fail(f"row has incorrect age matches: {ages!r}")
 
    assert row.lastChild.value == sum(
 
        row.at_cost.number
 
        for row in aging_rows
 
        if row.at_cost.number > 0
 
        and (date - row.date).days in age_range
 
    )
 
    return row.lastChild.value
 

	
 
def check_aging_sheet(sheet, aging_rows, date, accrue_date):
 
def check_aging_sheet(sheet, aging_rows, date):
 
    if not aging_rows:
 
        return
 
    if isinstance(accrue_date, int):
 
        accrue_date = date + datetime.timedelta(days=accrue_date)
 
    rows = iter(sheet.getElementsByType(odf.table.TableRow))
 
    for row in rows:
 
        if "Aging Report" in row.text:
 
            break
 
    else:
 
        assert None, "Header row not found"
 
    assert f"Accrued by {accrue_date.isoformat()}" in row.text
 
    assert f"Unpaid by {date.isoformat()}" in row.text
 
    expect_rows = iter(aging_rows)
 
    row0 = find_row_by_text(rows, aging_rows[0].date.isoformat())
 
    next(expect_rows).check_row_match(row0)
 
    for actual, expected in zip(rows, expect_rows):
 
        expected.check_row_match(actual)
 
    row0 = find_row_by_text(rows, "Total Aged Over 1 Year: ")
 
    aging_sum = check_age_sum(aging_rows, row0, date)
 
    sums = 0
 
    for row in rows:
 
        if row.text.startswith("Total Aged "):
 
            break
 
    else:
 
        assert None, "Totals rows not found"
 
    actual_sum = Decimal(row.childNodes[-1].value)
 
    for row in rows:
 
        if row.text.startswith("Total Aged "):
 
            actual_sum += Decimal(row.childNodes[-1].value)
 
        if not row.firstChild:
 
            pass
 
        elif row.firstChild.text.startswith("Total Unpaid"):
 
            assert row.lastChild.value == aging_sum
 
            sums += 1
 
        else:
 
            break
 
    assert actual_sum == sum(
 
        row.at_cost.number
 
        for row in aging_rows
 
        if row.date <= accrue_date
 
        and row.at_cost.number > 0
 
    )
 
            aging_sum += check_age_sum(aging_rows, row, date)
 
    assert sums > 1
 

	
 
def check_aging_ods(ods_file,
 
                    date=None,
 
                    recv_rows=AGING_AR,
 
                    pay_rows=AGING_AP,
 
):
 
    if date is None:
 
        date = datetime.date.today()
 
def check_aging_ods(ods_file, date, recv_rows=AGING_AR, pay_rows=AGING_AP):
 
    ods_file.seek(0)
 
    ods = odf.opendocument.load(ods_file)
 
    sheets = ods.spreadsheet.getElementsByType(odf.table.Table)
 
    assert len(sheets) == 2
 
    check_aging_sheet(sheets[0], recv_rows, date, -60)
 
    check_aging_sheet(sheets[1], pay_rows, date, -30)
 
    check_aging_sheet(sheets[0], recv_rows, date)
 
    check_aging_sheet(sheets[1], pay_rows, date)
 

	
 
@pytest.mark.parametrize('search_terms,expect_count,check_func', [
 
    ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
 
        'Assets:Receivable:', 'Liabilities:Payable:',
 
    )),
 
    ([('rt-id', '^rt:505$')], 2, lambda post: post.meta['entity'] == 'DonorA'),
...
 
@@ -573,68 +577,53 @@ def test_outgoing_report_without_rt_id(accrual_postings, caplog):
 
    assert log.message.startswith(
 
        f"can't generate outgoings report for 2010-05-15 MatchingProgram {invoice}"
 
        " because no RT ticket available:",
 
    )
 
    assert not output.getvalue()
 

	
 
def run_aging_report(postings, today=None):
 
    if today is None:
 
        today = datetime.date.today()
 
def run_aging_report(postings, today):
 
    postings = (
 
        post for post in postings
 
        if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
 
    )
 
    groups = dict(accrual.AccrualPostings.make_consistent(postings))
 
    output = io.BytesIO()
 
    rt_wrapper = rtutil.RT(RTClient())
 
    report = accrual.AgingReport(rt_wrapper, output, today)
 
    report.run(groups)
 
    return output
 

	
 
def test_aging_report(accrual_postings):
 
    output = run_aging_report(accrual_postings)
 
    check_aging_ods(output)
 

	
 
@pytest.mark.parametrize('date,recv_end,pay_end', [
 
@pytest.mark.parametrize('date', [
 
    datetime.date(2010, 3, 1),
 
    # Both these dates are chosen for their off-by-one potential:
 
    # the first is exactly 30 days after the 2010-06-10 payable;
 
    # the second is exactly 60 days after the 2010-05-15 receivable.
 
    (datetime.date(2010, 7, 10), 1, 5),
 
    (datetime.date(2010, 7, 14), 2, 5),
 
    datetime.date(2010, 7, 10),
 
    datetime.date(2010, 7, 14),
 
    # The remainder just shuffle the age buckets some.
 
    datetime.date(2010, 12, 1),
 
    datetime.date(2011, 6, 1),
 
    datetime.date(2011, 12, 1),
 
    datetime.date(2012, 3, 1),
 
])
 
def test_aging_report_date_cutoffs(accrual_postings, date, recv_end, pay_end):
 
    expect_recv = AGING_AR[:recv_end]
 
    expect_pay = AGING_AP[:pay_end]
 
def test_aging_report_date_cutoffs(accrual_postings, date):
 
    output = run_aging_report(accrual_postings, date)
 
    check_aging_ods(output, date, expect_recv, expect_pay)
 
    check_aging_ods(output, date)
 

	
 
def test_aging_report_entity_consistency(accrual_postings):
 
    date = datetime.date.today()
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:480'
 
        and post.units.number < 0
 
    ))
 
    check_aging_ods(output, None, [], [
 
    ), date)
 
    check_aging_ods(output, date, [], [
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyA', 125, 'rt:480/4800'),
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyB', 125, 'rt:480/4800'),
 
    ])
 

	
 
def test_aging_report_does_not_include_too_recent_postings(accrual_postings):
 
    # This date is after the Q3 posting, but too soon after for that to be
 
    # included in the aging report.
 
    date = datetime.date(2010, 10, 1)
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:470'
 
    ), date)
 
    # Date+amount are both from the Q2 posting only.
 
    check_aging_ods(output, date, [
 
        AgingRow.make_simple('2010-06-15', 'GrantCo', 5500, 'rt:470/4700',
 
                             project='Development Grant'),
 
    ], [])
 

	
 
def run_main(arglist, config=None, out_type=io.StringIO):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/accruals.beancount'),
 
            rt_client=RTClient(),
 
        )
...
 
@@ -732,13 +721,13 @@ def test_main_aging_report(arglist):
 
    else:
 
        recv_rows = AGING_AR
 
        pay_rows = AGING_AP
 
    retcode, output, errors = run_main(arglist, out_type=io.BytesIO)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_aging_ods(output, None, recv_rows, pay_rows)
 
    check_aging_ods(output, datetime.date.today(), recv_rows, pay_rows)
 

	
 
def test_main_no_books():
 
    errors = check_main_fails([], testutil.TestConfig(), 1 | 8)
 
    testutil.check_lines_match(iter(errors), [
 
        r':[01]: +no books to load in configuration\b',
 
    ])
0 comments (0 inline, 0 general)