Changeset - 5e9e11923e8c
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-21 02:51:02
brettcsmith@brettcsmith.org
reports: Add account_balances() function.
3 files changed with 158 insertions and 37 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -68,6 +68,9 @@ from ..beancount_types import (
 
    MetaValue,
 
)
 

	
 
OPENING_BALANCE_NAME = "OPENING BALANCE"
 
ENDING_BALANCE_NAME = "ENDING BALANCE"
 

	
 
DecimalCompat = data.DecimalCompat
 
BalanceType = TypeVar('BalanceType', bound='Balance')
 
ElementType = Callable[..., odf.element.Element]
...
 
@@ -1104,6 +1107,50 @@ class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
 
            self.save_file(out_file)
 

	
 

	
 
def account_balances(
 
        groups: Mapping[data.Account, PeriodPostings],
 
        order: Optional[Sequence[str]]=None,
 
) -> Iterator[Tuple[str, Balance]]:
 
    """Iterate account balances over a date range
 

	
 
    1. ``subclass = PeriodPostings.with_start_date(start_date)``
 
    2. ``groups = dict(subclass.group_by_account(postings))``
 
    3. ``for acct, bal in account_balances(groups, [optional ordering]): ...``
 

	
 
    This function returns an iterator of 2-tuples ``(account, balance)``
 
    that you can use to generate a report in the style of ``ledger balance``.
 
    The accounts are accounts in ``groups`` that appeared under one of the
 
    account name strings in ``order``. ``balance`` is the corresponding
 
    balance over the time period (``groups[key].period_bal``). Accounts are
 
    iterated in the order provided by ``sort_and_filter_accounts()``.
 

	
 
    The first 2-tuple is ``(OPENING_BALANCE_NAME, balance)`` with the balance of
 
    all these accounts as of ``start_date``.
 
    The final 2-tuple is ``(ENDING_BALANCE_NAME, balance)`` with the final
 
    balance of all these accounts as of ``start_date``.
 
    The iterator will always yield these special 2-tuples, even when there are
 
    no accounts in the input or to report.
 
    """
 
    if order is None:
 
        order = ['Equity', 'Income', 'Expenses']
 
    acct_seq = [account for _, account in sort_and_filter_accounts(groups, order)]
 
    yield (OPENING_BALANCE_NAME, sum(
 
        (groups[key].start_bal for key in acct_seq),
 
        MutableBalance(),
 
    ))
 
    for key in acct_seq:
 
        postings = groups[key]
 
        try:
 
            in_date_range = postings[-1].meta.date >= postings.START_DATE
 
        except IndexError:
 
            in_date_range = False
 
        if in_date_range:
 
            yield (key, groups[key].period_bal)
 
    yield (ENDING_BALANCE_NAME, sum(
 
        (groups[key].stop_bal for key in acct_seq),
 
        MutableBalance(),
 
    ))
 

	
 
def normalize_amount_func(account_name: str) -> Callable[[T], T]:
 
    """Get a function to normalize amounts for reporting
 

	
conservancy_beancount/reports/ledger.py
Show inline comments
...
 
@@ -312,27 +312,7 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
              for key in self.metadata_columns),
 
        )
 

	
 
    def _combined_balance_row(self,
 
                              balance_accounts: Sequence[str],
 
                              attr_name: str,
 
    ) -> None:
 
        date = getattr(self.date_range, attr_name)
 
        balance_attrname = f'{attr_name}_bal'
 
        balance = -sum((
 
            getattr(related, balance_attrname)
 
            for account, related in self.account_groups.items()
 
            if account.is_under(*balance_accounts)
 
        ), core.MutableBalance())
 
        self.add_row(
 
            self.string_cell(
 
                f"Balance as of {date.isoformat()}",
 
                stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
            ),
 
            self.balance_cell(balance, stylename=self.style_bold),
 
        )
 

	
 
    def write_balance_sheet(self) -> None:
 
        balance_accounts = ['Equity', 'Income', 'Expenses']
 
        self.use_sheet("Balance")
 
        self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(3)))
 
        self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(1.5)))
...
 
@@ -349,17 +329,20 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
            numbercolumnsspanned=2,
 
        ))
 
        self.add_row()
 
        self._combined_balance_row(balance_accounts, 'start')
 
        for _, account in core.sort_and_filter_accounts(
 
                self.account_groups, balance_accounts,
 
        ):
 
            balance = self.account_groups[account].period_bal
 
            if not balance.is_zero():
 
                self.add_row(
 
                    self.string_cell(account, stylename=self.style_endtext),
 
                    self.balance_cell(-balance),
 
                )
 
        self._combined_balance_row(balance_accounts, 'stop')
 
        for account, balance in core.account_balances(self.account_groups):
 
            if account is core.OPENING_BALANCE_NAME:
 
                text = f"Balance as of {self.date_range.start.isoformat()}"
 
                style = self.merge_styles(self.style_bold, self.style_endtext)
 
            elif account is core.ENDING_BALANCE_NAME:
 
                text = f"Balance as of {self.date_range.stop.isoformat()}"
 
                style = self.merge_styles(self.style_bold, self.style_endtext)
 
            else:
 
                text = account
 
                style = self.style_endtext
 
            self.add_row(
 
                self.string_cell(text, stylename=style),
 
                self.balance_cell(-balance, stylename=style),
 
            )
 

	
 
    def write(self, rows: Iterable[data.Posting]) -> None:
 
        related_cls = core.PeriodPostings.with_start_date(self.date_range.start)
tests/test_reports_core.py
Show inline comments
...
 
@@ -14,16 +14,17 @@
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 

	
 
import pytest
 

	
 
from decimal import Decimal
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import data
 
from conservancy_beancount.reports import core
 

	
 
from conservancy_beancount.data import Account
 

	
 
AMOUNTS = [
 
    2,
 
    Decimal('4.40'),
...
 
@@ -31,6 +32,27 @@ AMOUNTS = [
 
    core.Balance([testutil.Amount('8.80')]),
 
]
 

	
 
@pytest.fixture
 
def balance_postings():
 
    dates = testutil.date_seq(testutil.FY_MID_DATE)
 
    return data.Posting.from_entries([
 
        testutil.Transaction(date=next(dates), postings=[
 
            ('Equity:OpeningBalance', -1000),
 
            ('Assets:Checking', 1000),
 
        ]),
 
        testutil.Transaction(date=next(dates), postings=[
 
            ('Income:Donations', -10),
 
            ('Expenses:BankingFees', 1),
 
            ('Assets:Checking', 9),
 
        ]),
 
        testutil.Transaction(date=next(dates), postings=[
 
            ('Income:Donations', -20),
 
            ('Expenses:Services:Fundraising', 1),
 
            ('Equity:Realized:CurrencyConversion', 1),
 
            ('Assets:Checking', 18),
 
        ]),
 
    ])
 

	
 
@pytest.mark.parametrize('acct_name', [
 
    'Assets:Checking',
 
    'Assets:Receivable:Accounts',
...
 
@@ -68,7 +90,7 @@ def test_normalize_amount_func_bad_acct_name(acct_name):
 
        core.normalize_amount_func(acct_name)
 

	
 
def test_sort_and_filter_accounts():
 
    accounts = (Account(s) for s in [
 
    accounts = (data.Account(s) for s in [
 
        'Expenses:Services',
 
        'Assets:Receivable',
 
        'Income:Other',
...
 
@@ -87,7 +109,7 @@ def test_sort_and_filter_accounts():
 
    ]
 

	
 
def test_sort_and_filter_accounts_unused_name():
 
    accounts = (Account(s) for s in [
 
    accounts = (data.Account(s) for s in [
 
        'Liabilities:CreditCard',
 
        'Assets:Cash',
 
        'Assets:Receivable:Accounts',
...
 
@@ -102,7 +124,7 @@ def test_sort_and_filter_accounts_unused_name():
 
    ]
 

	
 
def test_sort_and_filter_accounts_with_subaccounts():
 
    accounts = (Account(s) for s in [
 
    accounts = (data.Account(s) for s in [
 
        'Assets:Checking',
 
        'Assets:Receivable:Fraud',
 
        'Assets:Cash',
...
 
@@ -118,10 +140,79 @@ def test_sort_and_filter_accounts_with_subaccounts():
 

	
 
@pytest.mark.parametrize('empty_arg', ['accounts', 'order'])
 
def test_sort_and_filter_accounts_empty_accounts(empty_arg):
 
    accounts = [Account(s) for s in ['Expenses:Other', 'Income:Other']]
 
    accounts = [data.Account(s) for s in ['Expenses:Other', 'Income:Other']]
 
    if empty_arg == 'accounts':
 
        args = ([], accounts)
 
    else:
 
        args = (accounts, [])
 
    actual = core.sort_and_filter_accounts(*args)
 
    assert next(actual, None) is None
 

	
 
def check_account_balance(balance_seq, account, balance):
 
    assert next(balance_seq, None) == (account, {'USD': testutil.Amount(balance)})
 

	
 
@pytest.mark.parametrize('days_after', range(4))
 
def test_account_balances(balance_postings, days_after):
 
    start_date = testutil.FY_MID_DATE + datetime.timedelta(days=days_after)
 
    balance_cls = core.PeriodPostings.with_start_date(start_date)
 
    groups = dict(balance_cls.group_by_account(balance_postings))
 
    actual = core.account_balances(groups)
 
    expect_opening = -1027
 
    opening_acct, opening_bal = next(actual)
 
    if days_after < 1:
 
        check_account_balance(actual, 'Equity:OpeningBalance', -1000)
 
        expect_opening += 1000
 
    if days_after < 3:
 
        check_account_balance(actual, 'Equity:Realized:CurrencyConversion', 1)
 
        expect_opening -= 1
 
    if days_after < 2:
 
        check_account_balance(actual, 'Income:Donations', -30)
 
        expect_opening += 30
 
    elif days_after < 3:
 
        check_account_balance(actual, 'Income:Donations', -20)
 
        expect_opening += 20
 
    if days_after < 2:
 
        check_account_balance(actual, 'Expenses:BankingFees', 1)
 
        expect_opening -= 1
 
    if days_after < 3:
 
        check_account_balance(actual, 'Expenses:Services:Fundraising', 1)
 
        expect_opening -= 1
 
    if expect_opening:
 
        assert opening_bal == {'USD': testutil.Amount(expect_opening)}
 
    else:
 
        assert opening_bal.is_zero()
 
    assert opening_acct == core.OPENING_BALANCE_NAME
 
    check_account_balance(actual, core.ENDING_BALANCE_NAME, -1027)
 
    assert next(actual, None) is None
 

	
 
def test_account_balances_order_arg(balance_postings):
 
    start_date = testutil.FY_MID_DATE + datetime.timedelta(days=1)
 
    balance_cls = core.PeriodPostings.with_start_date(start_date)
 
    groups = dict(balance_cls.group_by_account(balance_postings))
 
    actual = core.account_balances(groups, ['Income', 'Assets'])
 
    check_account_balance(actual, core.OPENING_BALANCE_NAME, 1000)
 
    check_account_balance(actual, 'Income:Donations', -30)
 
    check_account_balance(actual, 'Assets:Checking', 27)
 
    check_account_balance(actual, core.ENDING_BALANCE_NAME, 997)
 
    assert next(actual, None) is None
 

	
 
def test_account_balances_order_filters_all(balance_postings):
 
    start_date = testutil.FY_MID_DATE + datetime.timedelta(days=1)
 
    balance_cls = core.PeriodPostings.with_start_date(start_date)
 
    groups = dict(balance_cls.group_by_account(balance_postings))
 
    actual = core.account_balances(groups, ['Liabilities'])
 
    account, balance = next(actual)
 
    assert account is core.OPENING_BALANCE_NAME
 
    assert balance.is_zero()
 
    account, balance = next(actual)
 
    assert account is core.ENDING_BALANCE_NAME
 
    assert balance.is_zero()
 

	
 
def test_account_balances_empty_postings():
 
    actual = core.account_balances({})
 
    account, balance = next(actual)
 
    assert account is core.OPENING_BALANCE_NAME
 
    assert balance.is_zero()
 
    account, balance = next(actual)
 
    assert account is core.ENDING_BALANCE_NAME
 
    assert balance.is_zero()
0 comments (0 inline, 0 general)