Changeset - cc1767a09d1d
[Not reviewed]
0 4 0
Brett Smith - 4 years ago 2020-08-22 13:25:53
brettcsmith@brettcsmith.org
fund: Incorporate Equity accounts into Release from Restrictions.

This matches what we do on our Statement of Activities in the
balance sheet report.
4 files changed with 33 insertions and 11 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/fund.py
Show inline comments
 
"""funds.py - Funds report from Beancount
 

	
 
This tool produces a report of account balances for restricted funds,
 
including income, expenses, and realized gain/loss over a given time period.
 
Restricted funds are designated with the "project" metadata.
 

	
 
Specify the date range you want to report with the ``--begin`` and ``--end``
 
options.
 

	
 
Select the accounts you want to report with the ``--account`` option. You can
 
specify this option multiple times. The report will include at least one sheet
 
for each account you specify. Subaccounts will be reported on that sheet as
 
well.
 

	
 
Select the postings you want to report by passing metadata search terms in
 
``name=value`` format.
 

	
 
Run ``ledger-report --help`` for abbreviations and other options.
 

	
 
Examples
 
--------
 

	
 
Generate a report of all restricted funds in ODS format over the last year::
 

	
 
      fund-report
 

	
 
Query a specific restricted fund and get a quick balance on the terminal::
 

	
 
      fund-report <PROJECT>
 
"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import locale
 
import logging
 
import sys
 

	
 
from typing import (
 
    Dict,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaValue,
 
)
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
import odf.table  # type:ignore[import]
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 

	
 
AccountsMap = Mapping[data.Account, core.PeriodPostings]
 
FundPosts = Tuple[MetaValue, AccountsMap]
 

	
 
EQUITY_ACCOUNTS = ['Equity', 'Income', 'Expenses']
 
INFO_ACCOUNTS = [
 
    'Assets:Receivable',
 
    'Assets:Prepaid',
 
    'Liabilities:UnearnedIncome',
 
    'Liabilities:Payable',
 
]
 
PROGNAME = 'fund-report'
 
UNRESTRICTED_FUND = 'Conservancy'
 
logger = logging.getLogger('conservancy_beancount.reports.fund')
 

	
 
class ODSReport(core.BaseODS[FundPosts, None]):
 
    def __init__(self, start_date: datetime.date, stop_date: datetime.date) -> None:
 
        super().__init__()
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 

	
 
    def section_key(self, row: FundPosts) -> None:
 
        return None
 

	
 
    def start_spreadsheet(self, *, expanded: bool=True) -> None:
 
        headers = [["Fund"], ["Balance as of", self.start_date.isoformat()]]
 
        if expanded:
 
            sheet_name = "With Breakdowns"
 
            headers += [["Income"], ["Expenses"], ["Equity"]]
 
        else:
 
            sheet_name = "Fund Report"
 
            headers += [["Additions"], ["Releases from", "Restrictions"]]
 
        headers.append(["Balance as of", self.stop_date.isoformat()])
 
        if expanded:
 
            headers += [
 
                ["Of which", "Receivable"],
 
                ["Of which", "Prepaid Expenses"],
 
                ["Of which", "Payable"],
 
                ["Of which", "Unearned Income"],
 
            ]
 

	
 
        self.use_sheet(sheet_name)
 
        for header in headers:
 
            first_line = header[0]
 
            if first_line == 'Fund':
 
                width = 2.0
 
            elif first_line == 'Balance as of':
 
                width = 1.5
 
            elif first_line == 'Of which':
 
                width = 1.3
 
            else:
 
                width = 1.2
 
            col_style = self.column_style(width)
 
            self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
 

	
 
        center_bold = self.merge_styles(self.style_centertext, self.style_bold)
 
        row = self.add_row(*(
 
            self.multiline_cell(header, stylename=center_bold)
 
            for header in headers
 
        ))
 
        row.firstChild.setAttribute(
 
            'stylename', self.merge_styles(self.style_endtext, self.style_bold),
 
        )
 
        self.lock_first_row()
 
        self.lock_first_column()
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"Fund Report From {self.start_date.isoformat()} To {self.stop_date.isoformat()}",
 
            stylename=center_bold,
 
            numbercolumnsspanned=6,
 
        ))
 
        self.add_row()
 

	
 
    def end_spreadsheet(self) -> None:
 
        start_sheet = self.sheet
 
        self.set_open_sheet(self.sheet)
 
        self.start_spreadsheet(expanded=False)
 
        bal_indexes = [0, 1, 2, 4]
 
        totals = [core.MutableBalance() for _ in bal_indexes]
 
        threshold = Decimal('.5')
 
        for fund, balances in self.balances.items():
 
            balances = [balances[index] for index in bal_indexes]
 
        for fund, source_bals in self.balances.items():
 
            balances = [source_bals[index] for index in bal_indexes]
 
            # Incorporate Equity changes to Release from Restrictions.
 
            # Note that using -= mutates the balance in a way we don't want.
 
            balances[2] = balances[2] - source_bals[3]
 
            if (not all(bal.clean_copy(threshold).le_zero() for bal in balances)
 
                and fund != UNRESTRICTED_FUND):
 
                self.write_balances(fund, balances)
 
                for total, bal in zip(totals, balances):
 
                    total += bal
 
        self.write_balances('', totals, self.merge_styles(
 
            self.border_style(core.Border.TOP, '.75pt'),
 
            self.border_style(core.Border.BOTTOM, '1.5pt', 'double'),
 
        ))
 
        self.document.spreadsheet.childNodes.reverse()
 
        self.sheet = start_sheet
 

	
 
    def _row_balances(self, accounts_map: AccountsMap) -> Iterator[core.Balance]:
 
        acct_order = ['Income', 'Expenses', 'Equity']
 
        key_order = [core.OPENING_BALANCE_NAME, *acct_order, core.ENDING_BALANCE_NAME]
 
        balances: Dict[str, core.Balance] = {key: core.MutableBalance() for key in key_order}
 
        for acct_s, balance in core.account_balances(accounts_map, acct_order):
 
            if acct_s in balances:
 
                balances[acct_s] = balance
 
            else:
 
                acct_root, _, _ = acct_s.partition(':')
 
                balances[acct_root] += balance
 
        for key in key_order:
 
            if key == 'Expenses':
 
                yield balances[key]
 
            else:
 
                yield -balances[key]
 
        for info_key in INFO_ACCOUNTS:
 
            for _, balance in core.account_balances(accounts_map, [info_key]):
 
                pass
 
            yield core.normalize_amount_func(info_key)(balance)
 

	
 
    def write_balances(self,
 
                       fund: str,
 
                       balances: Iterable[core.Balance],
 
                       style: Union[None, str, odf.style.Style]=None,
 
    ) -> odf.table.TableRow:
 
        return self.add_row(
 
            self.string_cell(fund, stylename=self.style_endtext),
 
            *(self.balance_cell(bal, stylename=style) for bal in balances),
 
        )
 

	
 
    def write_row(self, row: FundPosts) -> None:
 
        fund, accounts_map = row
 
        self.balances[fund] = list(self._row_balances(accounts_map))
 
        if fund != UNRESTRICTED_FUND:
 
            self.write_balances(fund, self.balances[fund])
 

	
 
    def write(self, rows: Iterable[FundPosts]) -> None:
 
        self.balances: Dict[str, Sequence[core.Balance]] = collections.OrderedDict()
 
        super().write(rows)
 
        try:
 
            unrestricted = self.balances[UNRESTRICTED_FUND]
 
        except KeyError:
 
            pass
 
        else:
 
            self.add_row()
 
            self.write_balances("Unrestricted", unrestricted)
 

	
 

	
 
class TextReport:
 
    def __init__(self,
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
                 out_file: TextIO) -> None:
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 
        self.out_file = out_file
 

	
 
    def _account_balances(self,
 
                          fund: str,
 
                          account_map: AccountsMap,
 
    ) -> Iterator[Tuple[str, Sequence[str]]]:
 
        total_fmt = f'{fund} balance as of {{}}'
 
        for acct_s, balance in core.account_balances(account_map, EQUITY_ACCOUNTS):
 
            if acct_s is core.OPENING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.start_date.isoformat())
 
            elif acct_s is core.ENDING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.stop_date.isoformat())
 
            yield acct_s, (-balance).format(None, sep='\0').split('\0')
 
        for _, account in core.sort_and_filter_accounts(account_map, INFO_ACCOUNTS):
 
            balance = account_map[account].stop_bal
 
            if not balance.is_zero():
 
                balance = core.normalize_amount_func(account)(balance)
 
                yield account, balance.format(None, sep='\0').split('\0')
 

	
 
    def write(self, rows: Iterable[FundPosts]) -> None:
 
        output = [
 
            line
 
            for fund, account_map in rows
 
            for line in self._account_balances(fund, account_map)
 
        ]
 
        acct_width = max(len(acct_s) for acct_s, _ in output) + 2
 
        bal_width = max(len(s) for _, bal_s in output for s in bal_s)
 
        bal_width = max(bal_width, 8)
 
        line_fmt = f'{{:>{acct_width}}}  {{:>{bal_width}}}'
 
        print(line_fmt.replace('{:>', '{:^').format("ACCOUNT", "BALANCE"),
 
              file=self.out_file)
 
        fund_start = f' balance as of {self.start_date.isoformat()}'
 
        for acct_s, bal_seq in output:
 
            if acct_s.endswith(fund_start):
 
                print(line_fmt.format('―' * acct_width, '―' * bal_width),
 
                      file=self.out_file)
 
            bal_iter = iter(bal_seq)
 
            print(line_fmt.format(acct_s, next(bal_iter)), file=self.out_file)
 
            for bal_s in bal_iter:
 
                print(line_fmt.format('', bal_s), file=self.out_file)
 

	
 

	
 
class ReportType(enum.Enum):
 
    TEXT = TextReport
 
    ODS = ODSReport
 
    TXT = TEXT
 
    SPREADSHEET = ODS
 

	
 
    @classmethod
 
    def from_arg(cls, s: str) -> 'ReportType':
 
        try:
 
            return cls[s.upper()]
 
        except KeyError:
 
            raise ValueError(f"no report type matches {s!r}") from None
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date.
 
""")
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='TYPE',
 
        type=ReportType.from_arg,
 
        help="""Type of report to generate. `text` gives a plain two-column text
 
report listing accounts and balances over the period, and is the default when
 
you search for a specific project/fund. `ods` produces a higher-level
 
spreadsheet, meant to provide an overview of all funds, and is the default when
 
you don't specify a project/fund.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for text reports, and a generated filename for ODS
 
reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.report_type = ReportType.TEXT
 
        else:
 
            args.report_type = ReportType.ODS
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    if args.stop_date is None:
 
        if args.start_date is None:
tests/books/fund.beancount
Show inline comments
 
; This input is used to test both the fund report
 
; and the opening balances tool.
 

	
 
option "inferred_tolerance_default" "USD:0.01"
 

	
 
2018-01-01 open Assets:Checking
 
2018-01-01 open Assets:EUR
 
2018-01-01 open Assets:Prepaid:Expenses
 
2018-01-01 open Assets:Receivable:Accounts
 
2018-01-01 open Equity:Funds:Restricted
 
2018-01-01 open Equity:Funds:Unrestricted
 
2018-01-01 open Equity:Realized:CurrencyConversion
 
2018-01-01 open Expenses:Other
 
2018-01-01 open Income:Other
 
2018-01-01 open Liabilities:Payable:Accounts
 
2018-01-01 open Liabilities:UnearnedIncome
 

	
 
2018-02-28 * "Opening balances"
 
  Equity:Funds:Unrestricted  -4,000 USD
 
  project: "Conservancy"
 
  Equity:Funds:Restricted    -3,000 USD
 
  project: "Alpha"
 
  Equity:Funds:Restricted    -2,000 USD
 
  project: "Bravo"
 
  Equity:Funds:Restricted    -1,000 USD
 
  project: "Charlie"
 
  Assets:Checking            10,000 USD
 

	
 
2018-03-03 * "Conservancy receivable 2018"
 
  project: "Conservancy"
 
  Income:Other               -32 EUR {1.25 USD}
 
  Assets:Receivable:Accounts  32 EUR {1.25 USD}
 

	
 
2018-03-06 * "Conservancy payable 2018"
 
  project: "Conservancy"
 
  Expenses:Other                 4 USD
 
  Liabilities:Payable:Accounts  -4 USD
 

	
 
2018-06-03 * "Alpha income 2018"
 
  project: "Alpha"
 
  Income:Other    -60 USD
 
  Assets:Checking  60 USD
 

	
 
2018-06-06 * "Alpha unearned income"
 
  project: "Alpha"
 
  Liabilities:UnearnedIncome  -30 USD
 
  Assets:Checking              30 USD
 

	
 
2018-06-09 * "Alpha prepaid expense"
 
  project: "Alpha"
 
  Assets:Prepaid:Expenses  20 USD
 
  Assets:Checking         -20 USD
 

	
 
2018-09-03 * "Bravo expense"
 
  project: "Bravo"
 
  Expenses:Other    20 USD
 
  Assets:Checking  -20 USD
 

	
 
2018-12-03 * "Delta income"
 
  project: "Delta"
 
  Income:Other    -0.40 USD
 
  Assets:Checking  0.40 USD
 

	
 
2019-03-03 * "Conservancy receivable paid"
 
  project: "Conservancy"
 
  Assets:Receivable:Accounts  -32 EUR {1.25 USD} @ 1.5 USD
 
  Assets:EUR                   32 EUR {1.5 USD}
 
  Equity:Realized:CurrencyConversion
 

	
 
2019-03-06 * "Conservancy payable paid"
 
  project: "Conservancy"
 
  Liabilities:Payable:Accounts  4 USD
 
  Assets:Checking              -4 USD
 

	
 
2019-03-12 * "Conservancy income 2019"
 
  project: "Conservancy"
 
  Income:Other    -28 EUR {1.5 USD}
 
  Assets:Checking  40 USD
 
  Equity:Realized:CurrencyConversion
 

	
 
2019-03-15 * "Conservancy expense 2019"
 
  project: "Conservancy"
 
  Expenses:Other    2.80 EUR {1.5 USD}
 
  Assets:Checking  -4.00 USD
 
  Equity:Realized:CurrencyConversion
 

	
 
2019-06-06 * "Alpha unearned income converted"
 
  project: "Alpha"
 
  Liabilities:UnearnedIncome  30 USD
 
  Income:Other               -30 USD
 

	
 
2019-06-09 * "Alpha prepaid expense converted"
 
  project: "Alpha"
 
  Assets:Prepaid:Expenses  -20 USD
 
  Expenses:Other            20 USD
 

	
 
2019-06-12 * "Alpha expense 2019A"
 
  project: "Alpha"
 
  Expenses:Other    3 USD
 
  Assets:Checking  -3 USD
 

	
 
2019-06-15 * "Alpha expense 2019B"
 
  project: "Alpha"
 
  Expenses:Other    3 USD
 
  Assets:Checking  -3 USD
 

	
 
2019-09-03 * "Bravo income"
 
  project: "Bravo"
 
  Income:Other    -200 USD
 
  Assets:Checking  200 USD
 

	
 
2019-12-03 * "Delta income"
 
2019-09-06 * "Delta income"
 
  project: "Delta"
 
  Income:Other    -4.60 USD
 
  Assets:Checking  4.60 USD
 

	
 
2019-12-03 * "Charlie release from restriction"
 
  Equity:Funds:Restricted     100 USD
 
  project: "Charlie"
 
  Equity:Funds:Unrestricted  -100 USD
 
  project: "Conservancy"
tests/test_opening_balances.py
Show inline comments
 
"""test_tools_opening_balances.py - Unit tests for opening balance generation"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import copy
 
import datetime
 
import io
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from beancount import loader as bc_loader
 
from conservancy_beancount.tools import opening_balances as openbalmod
 

	
 
from decimal import Decimal
 
from typing import NamedTuple, Optional
 

	
 
A_CHECKING = 'Assets:Checking'
 
A_EUR = 'Assets:EUR'
 
A_PREPAID = 'Assets:Prepaid:Expenses'
 
A_RECEIVABLE = 'Assets:Receivable:Accounts'
 
A_RESTRICTED = 'Equity:Funds:Restricted'
 
A_UNRESTRICTED = 'Equity:Funds:Unrestricted'
 
A_CURRCONV = 'Equity:Realized:CurrencyConversion'
 
A_CREDITCARD = 'Liabilities:CreditCard'
 
A_PAYABLE = 'Liabilities:Payable:Accounts'
 
A_UNEARNED = 'Liabilities:UnearnedIncome'
 

	
 
class FlatPosting(NamedTuple):
 
    account: str
 
    units_number: Decimal
 
    units_currency: str
 
    cost_number: Optional[Decimal]
 
    cost_currency: Optional[str]
 
    cost_date: Optional[datetime.date]
 
    project: Optional[str]
 

	
 
    def __repr__(self):
 
        cost_s = f' {{{self.cost_number} {self.cost_currency}, {self.cost_date}}}'
 
        if cost_s == ' {None None, None}':
 
            cost_s = ''
 
        return f'<{self.account}  {self.units_number} {self.units_currency}{cost_s}>'
 

	
 
    @classmethod
 
    def make(cls,
 
             account,
 
             units_number,
 
             units_currency='USD',
 
             cost_number=None,
 
             cost_date=None,
 
             project=None,
 
             cost_currency=None,
 
    ):
 
        if cost_number is not None:
 
            cost_number = Decimal(cost_number)
 
            if cost_currency is None:
 
                cost_currency = 'USD'
 
            if isinstance(cost_date, str):
 
                cost_date = datetime.datetime.strptime(cost_date, '%Y-%m-%d').date()
 
        return cls(account, Decimal(units_number), units_currency,
 
                   cost_number, cost_currency, cost_date, project)
 

	
 
    @classmethod
 
    def from_beancount(cls, posting):
 
        units_number, units_currency = posting.units
 
        if posting.cost is None:
 
            cost_number = cost_currency = cost_date = None
 
        else:
 
            cost_number, cost_currency, cost_date, _ = posting.cost
 
        try:
 
            project = posting.meta['project']
 
        except (AttributeError, KeyError):
 
            project = None
 
        return cls(posting.account, units_number, units_currency,
 
                   cost_number, cost_currency, cost_date, project)
 

	
 
    @classmethod
 
    def from_output(cls, output):
 
        entries, _, _ = bc_loader.load_string(output.read())
 
        return (cls.from_beancount(post) for post in entries[-1].postings)
 

	
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/fund.beancount'),
 
        )
 
    output = io.StringIO()
 
    errors = io.StringIO()
 
    retcode = openbalmod.main(arglist, output, errors, config)
 
    output.seek(0)
 
    return retcode, output, errors
 

	
 
@pytest.mark.parametrize('arg', ['2018', '2018-03-01'])
 
def test_2018_opening(arg):
 
    retcode, output, errors = run_main([arg])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    assert list(FlatPosting.from_output(output)) == [
 
        FlatPosting.make(A_CHECKING, 10000),
 
        FlatPosting.make(A_RESTRICTED, -3000, project='Alpha'),
 
        FlatPosting.make(A_RESTRICTED, -2000, project='Bravo'),
 
        FlatPosting.make(A_RESTRICTED, -1000, project='Charlie'),
 
        FlatPosting.make(A_UNRESTRICTED, -4000, project='Conservancy'),
 
    ]
 

	
 
@pytest.mark.parametrize('arg', ['2019', '2019-03-01'])
 
def test_2019_opening(arg):
 
    retcode, output, errors = run_main([arg])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    assert list(FlatPosting.from_output(output)) == [
 
        FlatPosting.make(A_CHECKING, '10050.40'),
 
        FlatPosting.make(A_PREPAID, 20, project='Alpha'),
 
        FlatPosting.make(A_RECEIVABLE, 32, 'EUR', '1.25', '2018-03-03', 'Conservancy'),
 
        FlatPosting.make(A_RESTRICTED, -3060, project='Alpha'),
 
        FlatPosting.make(A_RESTRICTED, -1980, project='Bravo'),
 
        FlatPosting.make(A_RESTRICTED, -1000, project='Charlie'),
 
        FlatPosting.make(A_RESTRICTED, '-.40', project='Delta'),
 
        FlatPosting.make(A_UNRESTRICTED, -4036, project='Conservancy'),
 
        FlatPosting.make(A_PAYABLE, -4, project='Conservancy'),
 
        FlatPosting.make(A_UNEARNED, -30, project='Alpha'),
 
    ]
 

	
 
@pytest.mark.parametrize('arg', ['2020', '2020-12-31'])
 
def test_2020_opening(arg):
 
    retcode, output, errors = run_main([arg])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    assert list(FlatPosting.from_output(output)) == [
 
        FlatPosting.make(A_CHECKING, 10281),
 
        FlatPosting.make(A_EUR, 32, 'EUR', '1.5', '2019-03-03'),
 
        FlatPosting.make(A_RESTRICTED, -3064, project='Alpha'),
 
        FlatPosting.make(A_RESTRICTED, -2180, project='Bravo'),
 
        FlatPosting.make(A_RESTRICTED, -1000, project='Charlie'),
 
        FlatPosting.make(A_RESTRICTED, -900, project='Charlie'),
 
        FlatPosting.make(A_RESTRICTED, -5, project='Delta'),
 
        FlatPosting.make(A_UNRESTRICTED, -4080, project='Conservancy'),
 
        FlatPosting.make(A_UNRESTRICTED, -4180, project='Conservancy'),
 
    ]
tests/test_reports_fund.py
Show inline comments
 
"""test_reports_fund.py - Unit tests for fund report"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import copy
 
import datetime
 
import io
 
import itertools
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
import babel.numbers
 
import odf.opendocument
 
import odf.table
 

	
 
from beancount import loader as bc_loader
 
from conservancy_beancount import data
 
from conservancy_beancount.reports import core
 
from conservancy_beancount.reports import fund
 

	
 
from decimal import Decimal
 

	
 
_ledger_load = bc_loader.load_file(testutil.test_path('books/fund.beancount'))
 
START_DATE = datetime.date(2018, 3, 1)
 
MID_DATE = datetime.date(2019, 3, 1)
 
STOP_DATE = datetime.date(2020, 3, 1)
 

	
 
EQUITY_ROOT_ACCOUNTS = ('Expenses:', 'Equity:', 'Income:')
 

	
 
OPENING_BALANCES = {
 
    'Alpha': 3000,
 
    'Bravo': 2000,
 
    'Charlie': 1000,
 
    'Conservancy': 4000,
 
    'Delta': 0,
 
}
 

	
 
BALANCES_BY_YEAR = {
 
    ('Conservancy', 2018): [
 
        ('Income:Other', 40),
 
        ('Expenses:Other', -4),
 
        ('Assets:Receivable:Accounts', 40),
 
        ('Liabilities:Payable:Accounts', 4),
 
    ],
 
    ('Conservancy', 2019): [
 
        ('Income:Other', 42),
 
        ('Expenses:Other', Decimal('-4.20')),
 
        ('Equity:Funds:Unrestricted', 100),
 
        ('Equity:Realized:CurrencyConversion', Decimal('6.20')),
 
        ('Assets:Receivable:Accounts', -40),
 
        ('Liabilities:Payable:Accounts', -4),
 
    ],
 
    ('Alpha', 2018): [
 
        ('Income:Other', 60),
 
        ('Liabilities:UnearnedIncome', 30),
 
        ('Assets:Prepaid:Expenses', 20),
 
    ],
 
    ('Alpha', 2019): [
 
        ('Income:Other', 30),
 
        ('Expenses:Other', -26),
 
        ('Assets:Prepaid:Expenses', -20),
 
        ('Liabilities:UnearnedIncome', -30),
 
    ],
 
    ('Bravo', 2018): [
 
        ('Expenses:Other', -20),
 
    ],
 
    ('Bravo', 2019): [
 
        ('Income:Other', 200),
 
    ],
 
    ('Charlie', 2019): [
 
        ('Equity:Funds:Restricted', -100),
 
    ],
 
    ('Delta', 2018): [
 
        ('Income:Other', Decimal('.40')),
 
    ],
 
    ('Delta', 2019): [
 
        ('Income:Other', Decimal('4.60')),
 
    ],
 
}
 

	
 
@pytest.fixture
 
def fund_entries():
 
    return copy.deepcopy(_ledger_load[0])
 

	
 
def split_text_lines(output):
 
    for line in output:
 
        account, amount = line.rsplit(None, 1)
 
        yield account.strip(), amount
 

	
 
def format_amount(amount, currency='USD'):
 
    return babel.numbers.format_currency(
 
        amount, currency, format_type='accounting',
 
    )
 

	
 
def check_text_balances(actual, expected, *expect_accounts):
 
    balance = Decimal()
 
    for expect_account in expect_accounts:
 
        expect_amount = expected[expect_account]
 
        if expect_amount:
 
            actual_account, actual_amount = next(actual)
 
            assert actual_account == expect_account
 
            assert actual_amount == format_amount(expect_amount)
 
            balance += expect_amount
 
    return balance
 

	
 
def check_text_report(output, project, start_date, stop_date):
 
    _, _, project = project.rpartition('=')
 
    balance_amount = Decimal(OPENING_BALANCES[project])
 
    expected = collections.defaultdict(Decimal)
 
    for year in range(2018, stop_date.year):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(project, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    balance_amount += amount
 
                else:
 
                    expected[account] += amount
 
    actual = split_text_lines(output)
 
    next(actual); next(actual)  # Discard headers
 
    open_acct, open_amt = next(actual)
 
    assert open_acct == "{} balance as of {}".format(
 
        project, start_date.isoformat(),
 
    )
 
    assert open_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Equity:Funds:Restricted',
 
        'Equity:Funds:Unrestricted',
 
        'Equity:Realized:CurrencyConversion',
 
        'Income:Other',
 
        'Expenses:Other',
 
    )
 
    end_acct, end_amt = next(actual)
 
    assert end_acct == "{} balance as of {}".format(
 
        project, stop_date.isoformat(),
 
    )
 
    assert end_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Assets:Receivable:Accounts',
 
        'Assets:Prepaid:Expenses',
 
        'Liabilities:Payable:Accounts',
 
        'Liabilities:UnearnedIncome',
 
    )
 
    assert next(actual, None) is None
 

	
 
def check_cell_balance(cell, balance):
 
    if balance:
 
        assert cell.value == balance
 
    else:
 
        assert not cell.value
 

	
 
def check_ods_sheet(sheet, account_balances, *, full):
 
    if full:
 
        account_bals = account_balances.copy()
 
        account_bals['Unrestricted'] = account_bals.pop('Conservancy')
 
    else:
 
        account_bals = {
 
            key: balances
 
            for key, balances in account_balances.items()
 
            if key != 'Conservancy' and any(v >= .5 for v in balances.values())
 
        }
 
        totals = {key: Decimal() for key in
 
                  ['opening', 'Income', 'Expenses', 'Equity:Realized']}
 
                  ['opening', 'Income', 'Expenses', 'Equity']}
 
        for fund, balances in account_bals.items():
 
            for key in totals:
 
                totals[key] += balances[key]
 
        account_bals[''] = totals
 
    for row in itertools.islice(sheet.getElementsByType(odf.table.TableRow), 4, None):
 
        cells = iter(testutil.ODSCell.from_row(row))
 
        try:
 
            fund = next(cells).firstChild.text
 
        except (AttributeError, StopIteration):
 
            continue
 
        try:
 
            balances = account_bals.pop(fund)
 
        except KeyError:
 
            pytest.fail(f"report included unexpected fund {fund}")
 
        check_cell_balance(next(cells), balances['opening'])
 
        check_cell_balance(next(cells), balances['Income'])
 
        check_cell_balance(next(cells), -balances['Expenses'])
 
        if full:
 
            check_cell_balance(next(cells), balances['Equity:Realized'])
 
            check_cell_balance(next(cells), -balances['Expenses'])
 
            check_cell_balance(next(cells), balances['Equity'])
 
        else:
 
            check_cell_balance(
 
                next(cells), -sum(balances[key] for key in ['Expenses', 'Equity']),
 
            )
 
        check_cell_balance(next(cells), sum(balances[key] for key in [
 
            'opening', 'Income', 'Expenses', 'Equity:Realized',
 
            'opening', 'Income', 'Expenses', 'Equity',
 
        ]))
 
        if full:
 
            check_cell_balance(next(cells), balances['Assets:Receivable'])
 
            check_cell_balance(next(cells), balances['Assets:Prepaid'])
 
            check_cell_balance(next(cells), balances['Liabilities'])
 
            check_cell_balance(next(cells), balances['Liabilities:Payable'])
 
        assert next(cells, None) is None
 
    assert not account_bals, "did not see all funds in report"
 

	
 
def check_ods_report(ods, start_date, stop_date):
 
    account_bals = collections.OrderedDict((key, {
 
        'opening': Decimal(amount),
 
        'Income': Decimal(0),
 
        'Expenses': Decimal(0),
 
        'Equity:Realized': Decimal(0),
 
        'Equity': Decimal(0),
 
        'Assets:Receivable': Decimal(0),
 
        'Assets:Prepaid': Decimal(0),
 
        'Liabilities:Payable': Decimal(0),
 
        'Liabilities': Decimal(0),  # UnearnedIncome
 
    }) for key, amount in sorted(OPENING_BALANCES.items()))
 
    for fund, year in itertools.product(account_bals, range(2018, stop_date.year)):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(fund, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                if account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    if year < start_date.year:
 
                        acct_key = 'opening'
 
                    else:
 
                        acct_key, _, _ = account.partition(':')
 
                else:
 
                    acct_key, _, _ = account.rpartition(':')
 
                account_bals[fund][acct_key] += amount
 
    sheets = iter(ods.getElementsByType(odf.table.Table))
 
    check_ods_sheet(next(sheets), account_bals, full=False)
 
    check_ods_sheet(next(sheets), account_bals, full=True)
 
    assert next(sheets, None) is None, "found unexpected sheet"
 

	
 
def run_main(out_type, arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/fund.beancount'),
 
        )
 
    arglist.insert(0, '--output-file=-')
 
    output = out_type()
 
    errors = io.StringIO()
 
    retcode = fund.main(arglist, output, errors, config)
 
    output.seek(0)
 
    return retcode, output, errors
 

	
 
@pytest.mark.parametrize('project,start_date,stop_date', [
 
    ('Conservancy', START_DATE, STOP_DATE),
 
    ('project=Conservancy', MID_DATE, STOP_DATE),
 
    ('Conservancy', START_DATE, MID_DATE),
 
    ('Alpha', START_DATE, STOP_DATE),
 
    ('project=Alpha', MID_DATE, STOP_DATE),
 
    ('Alpha', START_DATE, MID_DATE),
 
    ('Bravo', START_DATE, STOP_DATE),
 
    ('project=Bravo', MID_DATE, STOP_DATE),
 
    ('Bravo', START_DATE, MID_DATE),
 
    ('project=Charlie', START_DATE, STOP_DATE),
 
])
 
def test_text_report(project, start_date, stop_date):
 
    retcode, output, errors = run_main(io.StringIO, [
 
        '-b', start_date.isoformat(), '-e', stop_date.isoformat(), project,
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_text_report(output, project, start_date, stop_date)
 

	
 
@pytest.mark.parametrize('start_date,stop_date', [
 
    (START_DATE, STOP_DATE),
 
    (MID_DATE, STOP_DATE),
 
    (START_DATE, MID_DATE),
 
])
 
def test_ods_report(start_date, stop_date):
 
    retcode, output, errors = run_main(io.BytesIO, [
 
        '--begin', start_date.isoformat(), '--end', stop_date.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    check_ods_report(ods, start_date, stop_date)
 

	
 
def test_main_no_postings(caplog):
 
    retcode, output, errors = run_main(io.StringIO, ['NonexistentProject'])
 
    assert retcode == 65
 
    assert any(log.levelname == 'WARNING' for log in caplog.records)
0 comments (0 inline, 0 general)