Changeset - 42b3e6ca1742
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-07-01 16:00:17
brettcsmith@brettcsmith.org
accruals: Aging report shows all unpaid accruals color coded by age.

Some readers care about recent accruals, some don't. This presentation
accommmodates both audiences, providing the data while making it easy to
ignore or filter out recent accruals.
3 files changed with 97 insertions and 82 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -106,6 +106,7 @@ from ..beancount_types import (
 
    Transaction,
 
)
 

	
 
import odf.element  # type:ignore[import]
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 
import rt
...
 
@@ -236,6 +237,13 @@ class BaseReport:
 

	
 

	
 
class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
    AGE_COLORS = [
 
        '#ff00ff',
 
        '#ff0000',
 
        '#ff8800',
 
        '#ffff00',
 
        '#00ff00',
 
    ]
 
    DOC_COLUMNS = [
 
        'rt-id',
 
        'invoice',
...
 
@@ -283,16 +291,22 @@ class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
            self.lock_first_row()
 

	
 
    def start_section(self, key: data.Account) -> None:
 
        self.norm_func = core.normalize_amount_func(key)
 
        self.age_thresholds = list(AccrualAccount.by_account(key).value.aging_thresholds)
 
        accrual_type = AccrualAccount.by_account(key)
 
        self.norm_func = accrual_type.normalize_amount
 
        self.age_thresholds = list(accrual_type.value.aging_thresholds)
 
        self.age_thresholds.append(-sys.maxsize)
 
        self.age_balances = [core.MutableBalance() for _ in self.age_thresholds]
 
        accrual_date = self.date - datetime.timedelta(days=self.age_thresholds[-1])
 
        self.age_styles = [
 
            self.merge_styles(self.style_date, self.border_style(
 
                core.Border.LEFT, '10pt', 'solid', color,
 
            )) for color in self.AGE_COLORS
 
        ]
 
        acct_parts = key.slice_parts()
 
        self.use_sheet(acct_parts[1])
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"{' '.join(acct_parts[2:])} {acct_parts[1]} Aging Report"
 
            f" Accrued by {accrual_date.isoformat()} Unpaid by {self.date.isoformat()}",
 
            f" for {self.date.isoformat()}",
 
            stylename=self.merge_styles(self.style_bold, self.style_centertext),
 
            numbercolumnsspanned=self.COL_COUNT,
 
        ))
...
 
@@ -300,11 +314,12 @@ class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 

	
 
    def end_section(self, key: data.Account) -> None:
 
        total_balance = core.MutableBalance()
 
        text_style = self.merge_styles(self.style_bold, self.style_endtext)
 
        text_span = 4
 
        last_age_text: Optional[str] = None
 
        self.add_row()
 
        for threshold, balance in zip(self.age_thresholds, self.age_balances):
 
        for threshold, balance, style in zip(
 
                self.age_thresholds, self.age_balances, self.age_styles,
 
        ):
 
            years, days = divmod(threshold, 365)
 
            years_text = f"{years} {'Year' if years == 1 else 'Years'}"
 
            days_text = f"{days} Days"
...
 
@@ -316,12 +331,23 @@ class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
                age_text = days_text
 
            if last_age_text is None:
 
                age_range = f"Over {age_text}"
 
            elif threshold < 0:
 
                self.add_row(
 
                    self.string_cell(
 
                        f"Total Unpaid Over {last_age_text}: ",
 
                        stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
                        numbercolumnsspanned=text_span,
 
                    ),
 
                    *(odf.table.TableCell() for _ in range(1, text_span)),
 
                    self.balance_cell(total_balance),
 
                )
 
                age_range = f"Under {last_age_text}"
 
            else:
 
                age_range = f"{age_text}–{last_age_text}"
 
            self.add_row(
 
                self.string_cell(
 
                    f"Total Aged {age_range}: ",
 
                    stylename=text_style,
 
                    stylename=self.merge_styles(self.style_bold, self.style_endtext, style),
 
                    numbercolumnsspanned=text_span,
 
                ),
 
                *(odf.table.TableCell() for _ in range(1, text_span)),
...
 
@@ -332,7 +358,7 @@ class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
        self.add_row(
 
            self.string_cell(
 
                "Total Unpaid: ",
 
                stylename=text_style,
 
                stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
                numbercolumnsspanned=text_span,
 
            ),
 
            *(odf.table.TableCell() for _ in range(1, text_span)),
...
 
@@ -343,13 +369,13 @@ class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
        row_date = row[0].meta.date
 
        row_balance = self.norm_func(row.balance_at_cost())
 
        age = (self.date - row_date).days
 
        if row_balance.ge_zero():
 
            for index, threshold in enumerate(self.age_thresholds):
 
                if age >= threshold:
 
        for index, threshold in enumerate(self.age_thresholds):
 
            if age >= threshold:
 
                if row_balance.ge_zero():
 
                    self.age_balances[index] += row_balance
 
                    break
 
            else:
 
                return
 
                break
 
        else:
 
            return
 
        raw_balance = self.norm_func(row.balance())
 
        if raw_balance == row_balance:
 
            amount_cell = odf.table.TableCell()
...
 
@@ -360,7 +386,7 @@ class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
        projects = row.meta_values('project')
 
        projects.discard(None)
 
        self.add_row(
 
            self.date_cell(row_date),
 
            self.date_cell(row_date, stylename=self.age_styles[index]),
 
            self.multiline_cell(sorted(entities)),
 
            amount_cell,
 
            self.balance_cell(row_balance),
setup.py
Show inline comments
...
 
@@ -5,7 +5,7 @@ from setuptools import setup
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.5.4',
 
    version='1.5.5',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
tests/test_reports_accrual.py
Show inline comments
...
 
@@ -61,6 +61,8 @@ ACCOUNTS = [
 
    'Liabilities:Payable:Vacation',
 
]
 

	
 
AGE_SUM_RE = re.compile(r'(?:\b(\d+) Years?)?(?: ?\b(\d+) Days?)?[–:]')
 

	
 
class AgingRow(NamedTuple):
 
    date: datetime.date
 
    entity: Sequence[str]
...
 
@@ -166,57 +168,59 @@ def find_row_by_text(row_source, want_text):
 
            found_row = False
 
        if found_row:
 
            return row
 
    return None
 
    pytest.fail(f"did not find row with text {want_text!r}")
 

	
 
def check_age_sum(aging_rows, row, date):
 
    text = row.firstChild.text
 
    ages = [int(match.group(1) or 0) * 365 + int(match.group(2) or 0)
 
            for match in AGE_SUM_RE.finditer(text)]
 
    if len(ages) == 1:
 
        # datetime only supports a 10K year range so this should cover all of it
 
        if text.startswith('Total Aged Over '):
 
            age_range = range(ages[0], 3650000)
 
        else:
 
            age_range = range(-3650000, ages[0])
 
    elif len(ages) == 2:
 
        age_range = range(*ages)
 
    else:
 
        pytest.fail(f"row has incorrect age matches: {ages!r}")
 
    assert row.lastChild.value == sum(
 
        row.at_cost.number
 
        for row in aging_rows
 
        if row.at_cost.number > 0
 
        and (date - row.date).days in age_range
 
    )
 
    return row.lastChild.value
 

	
 
def check_aging_sheet(sheet, aging_rows, date, accrue_date):
 
def check_aging_sheet(sheet, aging_rows, date):
 
    if not aging_rows:
 
        return
 
    if isinstance(accrue_date, int):
 
        accrue_date = date + datetime.timedelta(days=accrue_date)
 
    rows = iter(sheet.getElementsByType(odf.table.TableRow))
 
    for row in rows:
 
        if "Aging Report" in row.text:
 
            break
 
    else:
 
        assert None, "Header row not found"
 
    assert f"Accrued by {accrue_date.isoformat()}" in row.text
 
    assert f"Unpaid by {date.isoformat()}" in row.text
 
    expect_rows = iter(aging_rows)
 
    row0 = find_row_by_text(rows, aging_rows[0].date.isoformat())
 
    next(expect_rows).check_row_match(row0)
 
    for actual, expected in zip(rows, expect_rows):
 
        expected.check_row_match(actual)
 
    row0 = find_row_by_text(rows, "Total Aged Over 1 Year: ")
 
    aging_sum = check_age_sum(aging_rows, row0, date)
 
    sums = 0
 
    for row in rows:
 
        if row.text.startswith("Total Aged "):
 
            break
 
    else:
 
        assert None, "Totals rows not found"
 
    actual_sum = Decimal(row.childNodes[-1].value)
 
    for row in rows:
 
        if row.text.startswith("Total Aged "):
 
            actual_sum += Decimal(row.childNodes[-1].value)
 
        if not row.firstChild:
 
            pass
 
        elif row.firstChild.text.startswith("Total Unpaid"):
 
            assert row.lastChild.value == aging_sum
 
            sums += 1
 
        else:
 
            break
 
    assert actual_sum == sum(
 
        row.at_cost.number
 
        for row in aging_rows
 
        if row.date <= accrue_date
 
        and row.at_cost.number > 0
 
    )
 
            aging_sum += check_age_sum(aging_rows, row, date)
 
    assert sums > 1
 

	
 
def check_aging_ods(ods_file,
 
                    date=None,
 
                    recv_rows=AGING_AR,
 
                    pay_rows=AGING_AP,
 
):
 
    if date is None:
 
        date = datetime.date.today()
 
def check_aging_ods(ods_file, date, recv_rows=AGING_AR, pay_rows=AGING_AP):
 
    ods_file.seek(0)
 
    ods = odf.opendocument.load(ods_file)
 
    sheets = ods.spreadsheet.getElementsByType(odf.table.Table)
 
    assert len(sheets) == 2
 
    check_aging_sheet(sheets[0], recv_rows, date, -60)
 
    check_aging_sheet(sheets[1], pay_rows, date, -30)
 
    check_aging_sheet(sheets[0], recv_rows, date)
 
    check_aging_sheet(sheets[1], pay_rows, date)
 

	
 
@pytest.mark.parametrize('search_terms,expect_count,check_func', [
 
    ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
...
 
@@ -576,9 +580,7 @@ def test_outgoing_report_without_rt_id(accrual_postings, caplog):
 
    )
 
    assert not output.getvalue()
 

	
 
def run_aging_report(postings, today=None):
 
    if today is None:
 
        today = datetime.date.today()
 
def run_aging_report(postings, today):
 
    postings = (
 
        post for post in postings
 
        if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
...
 
@@ -590,48 +592,35 @@ def run_aging_report(postings, today=None):
 
    report.run(groups)
 
    return output
 

	
 
def test_aging_report(accrual_postings):
 
    output = run_aging_report(accrual_postings)
 
    check_aging_ods(output)
 

	
 
@pytest.mark.parametrize('date,recv_end,pay_end', [
 
@pytest.mark.parametrize('date', [
 
    datetime.date(2010, 3, 1),
 
    # Both these dates are chosen for their off-by-one potential:
 
    # the first is exactly 30 days after the 2010-06-10 payable;
 
    # the second is exactly 60 days after the 2010-05-15 receivable.
 
    (datetime.date(2010, 7, 10), 1, 5),
 
    (datetime.date(2010, 7, 14), 2, 5),
 
    datetime.date(2010, 7, 10),
 
    datetime.date(2010, 7, 14),
 
    # The remainder just shuffle the age buckets some.
 
    datetime.date(2010, 12, 1),
 
    datetime.date(2011, 6, 1),
 
    datetime.date(2011, 12, 1),
 
    datetime.date(2012, 3, 1),
 
])
 
def test_aging_report_date_cutoffs(accrual_postings, date, recv_end, pay_end):
 
    expect_recv = AGING_AR[:recv_end]
 
    expect_pay = AGING_AP[:pay_end]
 
def test_aging_report_date_cutoffs(accrual_postings, date):
 
    output = run_aging_report(accrual_postings, date)
 
    check_aging_ods(output, date, expect_recv, expect_pay)
 
    check_aging_ods(output, date)
 

	
 
def test_aging_report_entity_consistency(accrual_postings):
 
    date = datetime.date.today()
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:480'
 
        and post.units.number < 0
 
    ))
 
    check_aging_ods(output, None, [], [
 
    ), date)
 
    check_aging_ods(output, date, [], [
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyA', 125, 'rt:480/4800'),
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyB', 125, 'rt:480/4800'),
 
    ])
 

	
 
def test_aging_report_does_not_include_too_recent_postings(accrual_postings):
 
    # This date is after the Q3 posting, but too soon after for that to be
 
    # included in the aging report.
 
    date = datetime.date(2010, 10, 1)
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:470'
 
    ), date)
 
    # Date+amount are both from the Q2 posting only.
 
    check_aging_ods(output, date, [
 
        AgingRow.make_simple('2010-06-15', 'GrantCo', 5500, 'rt:470/4700',
 
                             project='Development Grant'),
 
    ], [])
 

	
 
def run_main(arglist, config=None, out_type=io.StringIO):
 
    if config is None:
 
        config = testutil.TestConfig(
...
 
@@ -735,7 +724,7 @@ def test_main_aging_report(arglist):
 
    retcode, output, errors = run_main(arglist, out_type=io.BytesIO)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_aging_ods(output, None, recv_rows, pay_rows)
 
    check_aging_ods(output, datetime.date.today(), recv_rows, pay_rows)
 

	
 
def test_main_no_books():
 
    errors = check_main_fails([], testutil.TestConfig(), 1 | 8)
0 comments (0 inline, 0 general)