Changeset - 404a88de1db0
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-10-16 20:29:05
brettcsmith@brettcsmith.org
fund: Use Balances instead of PeriodPostings.

A few motivations for this:

* This makes the fund report more maintainable, because the data structure
is better suited to the task at hand, making it easier to follow what's
going on.

* This helps put Balances through a little more testing with a high-level
report.

* This makes the fund report a little faster, since totals are usually
calculated from a smaller number of Balance objects rather than all
Postings over a period.
3 files changed with 138 insertions and 106 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -406,128 +406,134 @@ class Balances:
 
                      or key.classification.is_under(classification)):
 
                pass
 
            elif not period & key.period:
 
                pass
 
            elif not fund & key.fund:
 
                pass
 
            elif not (post_meta is None or post_meta == key.post_meta):
 
                pass
 
            else:
 
                retval += balance
 
        return retval
 

	
 
    def classifications(self,
 
                        account: str,
 
                        sort_period: Optional[int]=None,
 
    ) -> Sequence[data.Account]:
 
        """Return a sequence of seen account classifications
 

	
 
        Given an account name, returns a sequence of all the account
 
        classifications seen in the postings under that part of the account
 
        hierarchy. The classifications are sorted in descending order by the
 
        balance of postings under them for the ``sort_period`` time period.
 
        """
 
        if sort_period is None:
 
            if account in data.EQUITY_ACCOUNTS:
 
                sort_period = Period.PERIOD
 
            else:
 
                sort_period = Period.ANY
 
        class_bals: Mapping[data.Account, MutableBalance] \
 
            = collections.defaultdict(MutableBalance)
 
        for key, balance in self.balances.items():
 
            if not key.account.is_under(account):
 
                pass
 
            elif key.period & sort_period:
 
                class_bals[key.classification] += balance
 
            else:
 
                # Ensure the balance exists in the mapping
 
                class_bals[key.classification]
 
        norm_func = normalize_amount_func(f'{account}:RootsOK')
 
        def sortkey(acct: data.Account) -> Hashable:
 
            prefix, _, _ = acct.rpartition(':')
 
            balance = norm_func(class_bals[acct])
 
            try:
 
                max_bal = max(amount.number for amount in balance.values())
 
            except ValueError:
 
                max_bal = Decimal(0)
 
            return prefix, -max_bal
 
        return sorted(class_bals, key=sortkey)
 

	
 
    def iter_accounts(self, root: Optional[str]=None) -> Sequence[data.Account]:
 
        """Return a sequence of accounts open during the reporting period
 

	
 
        The sequence is sorted by account name.
 
        """
 
        start_date = self.period_range.start
 
        stop_date = self.period_range.stop
 
        return sorted(
 
            account
 
            for account in data.Account.iter_accounts(root)
 
            if account.meta.open_date < stop_date
 
            and (account.meta.close_date is None
 
                 or account.meta.close_date > start_date)
 
        )
 

	
 
    def meta_values(self) -> Set[str]:
 
        retval = {key.post_meta for key in self.balances}
 
        retval.discard(None)
 
        # discarding None ensures we return the desired type.
 
        return retval  # type:ignore[return-value]
 

	
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 
    __slots__ = ('_postings',)
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        self._postings: Sequence[data.Posting]
 
        if _can_own and isinstance(source, Sequence):
 
            self._postings = source
 
        else:
 
            self._postings = list(source)
 

	
 
    @classmethod
 
    def _group_by(cls: Type[RelatedType],
 
                  postings: Iterable[data.Posting],
 
                  key: Callable[[data.Posting], T],
 
    ) -> Iterator[Tuple[T, RelatedType]]:
 
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
 
        for post in postings:
 
            mapping[key(post)].append(post)
 
        for value, posts in mapping.items():
 
            yield value, cls(posts, _can_own=True)
 

	
 
    @classmethod
 
    def group_by_account(cls: Type[RelatedType],
 
                         postings: Iterable[data.Posting],
 
    ) -> Iterator[Tuple[data.Account, RelatedType]]:
 
        return cls._group_by(postings, operator.attrgetter('account'))
 

	
 
    @classmethod
 
    def group_by_meta(cls: Type[RelatedType],
 
                      postings: Iterable[data.Posting],
 
                      key: MetaKey,
 
                      default: Optional[MetaValue]=None,
 
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
 
        """Relate postings by metadata value
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of post.meta.get(key, default).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same metadata value.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.get(key, default)
 
        return cls._group_by(postings, key_func)
 

	
 
    @classmethod
 
    def group_by_first_meta_link(
conservancy_beancount/reports/fund.py
Show inline comments
 
"""funds.py - Funds report from Beancount
 

	
 
This tool produces a report of account balances for restricted funds,
 
including income, expenses, and realized gain/loss over a given time period.
 
Restricted funds are designated with the "project" metadata.
 

	
 
Specify the date range you want to report with the ``--begin`` and ``--end``
 
options.
 

	
 
Select the accounts you want to report with the ``--account`` option. You can
 
specify this option multiple times. The report will include at least one sheet
 
for each account you specify. Subaccounts will be reported on that sheet as
 
well.
 

	
 
Select the postings you want to report by passing metadata search terms in
 
``name=value`` format.
 

	
 
Run ``ledger-report --help`` for abbreviations and other options.
 

	
 
Examples
 
--------
 

	
 
Generate a report of all restricted funds in ODS format over the last year::
 

	
 
      fund-report
 

	
 
Query a specific restricted fund and get a quick balance on the terminal::
 

	
 
      fund-report <PROJECT>
 
"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import functools
 
import locale
 
import logging
 
import sys
 

	
 
from typing import (
 
    Dict,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaValue,
 
)
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
import odf.table  # type:ignore[import]
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from . import rewrite
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 

	
 
AccountsMap = Mapping[data.Account, core.PeriodPostings]
 
FundPosts = Tuple[MetaValue, AccountsMap]
 
Period = core.Period
 

	
 
EQUITY_ACCOUNTS = ['Income', 'Expenses', 'Equity']
 
INFO_ACCOUNTS = [
 
    'Assets:Receivable',
 
    'Assets:Prepaid',
 
    'Liabilities:UnearnedIncome',
 
    'Liabilities:Payable',
 
]
 
PROGNAME = 'fund-report'
 
UNRESTRICTED_FUND = 'Conservancy'
 
logger = logging.getLogger('conservancy_beancount.reports.fund')
 

	
 
class ODSReport(core.BaseODS[FundPosts, None]):
 
    def __init__(self, start_date: datetime.date, stop_date: datetime.date) -> None:
 
class ODSReport(core.BaseODS[str, None]):
 
    def __init__(self,
 
                 balances: core.Balances,
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
    ) -> None:
 
        super().__init__()
 
        self.balances = balances
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 

	
 
    def section_key(self, row: FundPosts) -> None:
 
    def section_key(self, row: str) -> None:
 
        return None
 

	
 
    def start_spreadsheet(self, *, expanded: bool=True) -> None:
 
    def start_spreadsheet(self) -> None:
 
        headers = [["Fund"], ["Balance as of", self.start_date.isoformat()]]
 
        if expanded:
 
        # If self.sheet has children, that means we've already written the
 
        # first sheet and we're starting the second, expanded sheet.
 
        if self.sheet.childNodes:
 
            self.expanded = True
 
            sheet_name = "With Breakdowns"
 
            headers.extend([acct] for acct in EQUITY_ACCOUNTS)
 
        else:
 
            self.expanded = False
 
            sheet_name = "Fund Report"
 
            headers += [["Additions"], ["Releases from", "Restrictions"]]
 
        headers.append(["Balance as of", self.stop_date.isoformat()])
 
        if expanded:
 
        if self.expanded:
 
            headers += [
 
                ["Of which", "Receivable"],
 
                ["Of which", "Prepaid Expenses"],
 
                ["Of which", "Payable"],
 
                ["Of which", "Unearned Income"],
 
            ]
 

	
 
        self.use_sheet(sheet_name)
 
        for header in headers:
 
            first_line = header[0]
 
            if first_line == 'Fund':
 
                width = 2.0
 
            elif first_line == 'Balance as of':
 
                width = 1.5
 
            elif first_line == 'Of which':
 
                width = 1.3
 
            else:
 
                width = 1.2
 
            col_style = self.column_style(width)
 
            self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
 

	
 
        center_bold = self.merge_styles(self.style_centertext, self.style_bold)
 
        row = self.add_row(*(
 
            self.multiline_cell(header, stylename=center_bold)
 
            for header in headers
 
        ))
 
        row.firstChild.setAttribute(
 
            'stylename', self.merge_styles(self.style_endtext, self.style_bold),
 
        )
 
        self.lock_first_row()
 
        self.lock_first_column()
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"Fund Report From {self.start_date.isoformat()} To {self.stop_date.isoformat()}",
 
            stylename=center_bold,
 
            numbercolumnsspanned=6,
 
        ))
 
        self.add_row()
 
        self.sheet_totals = [core.MutableBalance() for _ in range(len(headers) - 1)]
 

	
 
    def end_spreadsheet(self) -> None:
 
        start_sheet = self.sheet
 
        self.set_open_sheet(self.sheet)
 
        self.start_spreadsheet(expanded=False)
 
        bal_indexes = [0, 1, 2, 4]
 
        totals = [core.MutableBalance() for _ in bal_indexes]
 
        threshold = Decimal('.5')
 
        for fund, source_bals in self.balances.items():
 
            balances = [source_bals[index] for index in bal_indexes]
 
            # Incorporate Equity changes to Release from Restrictions.
 
            # Note that using -= mutates the balance in a way we don't want.
 
            balances[2] = balances[2] - source_bals[3]
 
            if (not all(bal.clean_copy(threshold).le_zero() for bal in balances)
 
                and fund != UNRESTRICTED_FUND):
 
                self.write_balances(fund, balances)
 
                for total, bal in zip(totals, balances):
 
                    total += bal
 
        self.write_balances('', totals, self.style_bottomline)
 
        self.document.spreadsheet.childNodes.reverse()
 
        self.sheet = start_sheet
 

	
 
    def _row_balances(self, accounts_map: AccountsMap) -> Iterator[core.Balance]:
 
        key_order = [core.OPENING_BALANCE_NAME, *EQUITY_ACCOUNTS, core.ENDING_BALANCE_NAME]
 
        balances: Dict[str, core.Balance] = {key: core.MutableBalance() for key in key_order}
 
        for acct_s, balance in core.account_balances(accounts_map, EQUITY_ACCOUNTS):
 
            if acct_s in balances:
 
                balances[acct_s] = balance
 
            else:
 
                acct_root, _, _ = acct_s.partition(':')
 
                balances[acct_root] += balance
 
        for key in key_order:
 
            if key == 'Expenses':
 
                yield balances[key]
 
            else:
 
                yield -balances[key]
 
        for info_key in INFO_ACCOUNTS:
 
            for _, balance in core.account_balances(accounts_map, [info_key]):
 
                pass
 
            yield core.normalize_amount_func(info_key)(balance)
 

	
 
    def write_balances(self,
 
                       fund: str,
 
                       balances: Iterable[core.Balance],
 
                       style: Union[None, str, odf.style.Style]=None,
 
    def write_balance_row(self,
 
                          name: str,
 
                          balances: Iterable[core.Balance],
 
                          style: Union[None, str, odf.style.Style]=None,
 
    ) -> odf.table.TableRow:
 
        return self.add_row(
 
            self.string_cell(fund, stylename=self.style_endtext),
 
            self.string_cell(name, stylename=self.style_endtext),
 
            *(self.balance_cell(bal, stylename=style) for bal in balances),
 
        )
 

	
 
    def write_row(self, row: FundPosts) -> None:
 
        fund, accounts_map = row
 
        self.balances[fund] = list(self._row_balances(accounts_map))
 
        if fund != UNRESTRICTED_FUND:
 
            self.write_balances(fund, self.balances[fund])
 

	
 
    def write(self, rows: Iterable[FundPosts]) -> None:
 
        self.balances: Dict[str, Sequence[core.Balance]] = collections.OrderedDict()
 
        super().write(rows)
 
        try:
 
            unrestricted = self.balances[UNRESTRICTED_FUND]
 
        except KeyError:
 
            pass
 
    def write_balances(self,
 
                       name: str,
 
                       style: Union[None, str, odf.style.Style]=None,
 
                       fund: int=core.Fund.ANY,
 
    ) -> odf.table.TableRow:
 
        if (fund & core.Fund.ANY) == core.Fund.ANY:
 
            total = functools.partial(self.balances.total, post_meta=name)
 
        else:
 
            self.add_row()
 
            self.write_balances("Unrestricted", unrestricted)
 
            total = functools.partial(self.balances.total, fund=fund)
 
        balances = [
 
            -total(account=EQUITY_ACCOUNTS, period=Period.THRU_MIDDLE),
 
            -total(account='Income', period=Period.PERIOD),
 
            total(account='Expenses', period=Period.PERIOD),
 
            -total(account='Equity', period=Period.PERIOD),
 
        ]
 
        if not self.expanded:
 
            threshold = Decimal('.5')
 
            if all(bal.clean_copy(threshold).le_zero() for bal in balances):
 
                return
 
            equity = balances.pop()
 
            balances[-1] -= equity
 
        balances.append(-total(account=EQUITY_ACCOUNTS))
 
        if self.expanded:
 
            for account in INFO_ACCOUNTS:
 
                norm_func = core.normalize_amount_func(account)
 
                balances.append(norm_func(total(account=account)))
 
        for sheet_tot, row_tot in zip(self.sheet_totals, balances):
 
            sheet_tot += row_tot
 
        return self.write_balance_row(name, balances, style)
 

	
 
    def write_row(self, row: str) -> None:
 
        if row != UNRESTRICTED_FUND:
 
            self.write_balances(row)
 

	
 
    def write(self, rows: Iterable[str]) -> None:
 
        row_list = list(rows)
 
        # Write the basic, auditor-style fund report.
 
        super().write(iter(row_list))
 
        self.write_balance_row("", self.sheet_totals, self.style_bottomline)
 
        # Write the expanded fund report. start_spreadsheet() will see we've
 
        # written the first sheet and adapt.
 
        super().write(iter(row_list))
 
        self.write_balances("Unrestricted", fund=core.Fund.UNRESTRICTED)
 
        self.set_open_sheet(self.sheet)
 

	
 

	
 
class TextReport:
 
    def __init__(self,
 
                 balances: core.Balances,
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
                 out_file: TextIO) -> None:
 
        self.balances = balances
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 
        self.out_file = out_file
 
        accounts = self.balances.iter_accounts()
 
        self.equity_accounts = [
 
            account for _, account in
 
            core.sort_and_filter_accounts(accounts, EQUITY_ACCOUNTS)
 
        ]
 
        self.info_accounts = [
 
            account for _, account in
 
            core.sort_and_filter_accounts(accounts, INFO_ACCOUNTS)
 
        ]
 

	
 
    def _format_total(self,
 
                      fund_name: str,
 
                      period: int=Period.ANY,
 
                      account: Optional[str]=None,
 
    ) -> Sequence[str]:
 
        if account is None:
 
            account_arg: Union[str, Sequence[str]] = EQUITY_ACCOUNTS
 
            account_exact = False
 
        else:
 
            account_arg = account
 
            account_exact = True
 
        balance = self.balances.total(
 
            post_meta=fund_name,
 
            period=period,
 
            account=account_arg,
 
            account_exact=account_exact,
 
        )
 
        if balance.is_zero():
 
            return []
 
        elif account is None:
 
            balance = -balance
 
        else:
 
            norm_func = core.normalize_amount_func(f'{account}:RootsOK')
 
            balance = norm_func(balance)
 
        return balance.format(None, sep='\0').split('\0')
 

	
 
    def _account_balances(self,
 
                          fund: str,
 
                          account_map: AccountsMap,
 
    ) -> Iterator[Tuple[str, Sequence[str]]]:
 
    def _account_balances(self, fund: str) -> Iterator[Tuple[str, Sequence[str]]]:
 
        total_fmt = f'{fund} balance as of {{}}'
 
        for acct_s, balance in core.account_balances(account_map, EQUITY_ACCOUNTS):
 
            if acct_s is core.OPENING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.start_date.isoformat())
 
            elif acct_s is core.ENDING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.stop_date.isoformat())
 
            if not acct_s.startswith('Expenses:'):
 
                balance = -balance
 
            yield acct_s, balance.format(None, sep='\0').split('\0')
 
        for _, account in core.sort_and_filter_accounts(account_map, INFO_ACCOUNTS):
 
            balance = account_map[account].stop_bal
 
            if not balance.is_zero():
 
                balance = core.normalize_amount_func(account)(balance)
 
                yield account, balance.format(None, sep='\0').split('\0')
 

	
 
    def write(self, rows: Iterable[FundPosts]) -> None:
 
        yield (total_fmt.format(self.start_date.isoformat()),
 
               self._format_total(fund, Period.THRU_MIDDLE))
 
        for account in self.equity_accounts:
 
            total = self._format_total(fund, Period.PERIOD, account)
 
            if total:
 
                yield (account, total)
 
        yield (total_fmt.format(self.stop_date.isoformat()),
 
               self._format_total(fund))
 
        for account in self.info_accounts:
 
            total = self._format_total(fund, account=account)
 
            if total:
 
                yield (account, total)
 

	
 
    def write(self, rows: Iterable[str]) -> None:
 
        output = [
 
            line
 
            for fund, account_map in rows
 
            for line in self._account_balances(fund, account_map)
 
            for fund in rows
 
            for line in self._account_balances(fund)
 
        ]
 
        acct_width = max(len(acct_s) for acct_s, _ in output) + 2
 
        bal_width = max(len(s) for _, bal_s in output for s in bal_s)
 
        bal_width = max(bal_width, 8)
 
        line_fmt = f'{{:>{acct_width}}}  {{:>{bal_width}}}'
 
        print(line_fmt.replace('{:>', '{:^').format("ACCOUNT", "BALANCE"),
 
              file=self.out_file)
 
        fund_start = f' balance as of {self.start_date.isoformat()}'
 
        fund_end = f' balance as of {self.stop_date.isoformat()}'
 
        for acct_s, bal_seq in output:
 
            is_end_bal = acct_s.endswith(fund_end)
 
            if is_end_bal or acct_s.endswith(fund_start):
 
                print(line_fmt.format('─' * acct_width, '─' * bal_width),
 
                      file=self.out_file)
 
            bal_iter = iter(bal_seq)
 
            print(line_fmt.format(acct_s, next(bal_iter)), file=self.out_file)
 
            for bal_s in bal_iter:
 
                print(line_fmt.format('', bal_s), file=self.out_file)
 
            if is_end_bal:
 
                print(line_fmt.format('═' * acct_width, '═' * bal_width),
 
                      file=self.out_file)
 

	
 

	
 
class ReportType(enum.Enum):
 
    TEXT = TextReport
 
    ODS = ODSReport
 
    TXT = TEXT
 
    SPREADSHEET = ODS
 

	
 
    @classmethod
 
    def from_arg(cls, s: str) -> 'ReportType':
 
        try:
 
            return cls[s.upper()]
 
        except KeyError:
 
            raise ValueError(f"no report type matches {s!r}") from None
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date.
 
""")
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='TYPE',
 
        type=ReportType.from_arg,
 
        help="""Type of report to generate. `text` gives a plain two-column text
 
report listing accounts and balances over the period, and is the default when
 
you search for a specific project/fund. `ods` produces a higher-level
 
spreadsheet, meant to provide an overview of all funds, and is the default when
 
you don't specify a project/fund.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for text reports, and a generated filename for ODS
 
reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.report_type = ReportType.TEXT
 
        else:
 
            args.report_type = ReportType.ODS
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    if args.stop_date is None:
 
        if args.start_date is None:
 
            args.stop_date = datetime.date.today()
 
        else:
 
            args.stop_date = cliutil.diff_year(args.start_date, 1)
 
    if args.start_date is None:
 
        args.start_date = cliutil.diff_year(args.stop_date, -1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
        entries, load_errors, options = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        entries, load_errors, _ = books_loader.load_fy_range(args.start_date, args.stop_date)
 
        entries, load_errors, options = books_loader.load_fy_range(args.start_date, args.stop_date)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 

	
 
    data.Account.load_from_books(entries, options)
 
    postings = iter(
 
        post
 
        for post in data.Posting.from_entries(entries)
 
        if post.meta.date < args.stop_date
 
    )
 
    for rewrite_path in args.rewrite_rules:
 
        try:
 
            ruleset = rewrite.RewriteRuleset.from_yaml(rewrite_path)
 
        except ValueError as error:
 
            logger.critical("failed loading rewrite rules from %s: %s",
 
                            rewrite_path, error.args[0])
 
            return cliutil.ExitCode.RewriteRulesError
 
        postings = ruleset.rewrite(postings)
 
    for search_term in args.search_terms:
 
        postings = search_term.filter_postings(postings)
 
    fund_postings = {
 
        key: related
 
        for key, related in core.RelatedPostings.group_by_meta(postings, 'project')
 
        if isinstance(key, str)
 
    }
 
    period_cls = core.PeriodPostings.with_start_date(args.start_date)
 
    fund_map = collections.OrderedDict(
 
        (fund, dict(period_cls.group_by_account(fund_postings[fund])))
 
        for fund in sorted(fund_postings, key=lambda s: locale.strxfrm(s.casefold()))
 
    )
 
    if not fund_map:
 
    balances = core.Balances(postings, args.start_date, args.stop_date, 'project')
 
    funds = sorted(balances.meta_values(), key=lambda s: locale.strxfrm(s.casefold()))
 
    if not funds:
 
        logger.warning("no matching postings found to report")
 
        returncode = returncode or cliutil.ExitCode.NoDataFiltered
 
    elif args.report_type is ReportType.TEXT:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = TextReport(args.start_date, args.stop_date, out_file)
 
        report.write(fund_map.items())
 
        report = TextReport(balances, args.start_date, args.stop_date, out_file)
 
        report.write(funds)
 
    else:
 
        ods_report = ODSReport(args.start_date, args.stop_date)
 
        ods_report = ODSReport(balances, args.start_date, args.stop_date)
 
        ods_report.set_common_properties(config.books_repo())
 
        ods_report.write(fund_map.items())
 
        ods_report.write(funds)
 
        if args.output_file is None:
 
            out_dir_path = config.repository_path() or Path()
 
            args.output_file = out_dir_path / 'FundReport_{}_{}.ods'.format(
 
                args.start_date.isoformat(), args.stop_date.isoformat(),
 
            )
 
            logger.info("Writing report to %s", args.output_file)
 
        ods_file = cliutil.bytes_output(args.output_file, stdout)
 
        ods_report.save_file(ods_file)
 
    return returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
tests/test_reports_fund.py
Show inline comments
...
 
@@ -32,128 +32,130 @@ from beancount import loader as bc_loader
 
from conservancy_beancount import data
 
from conservancy_beancount.reports import core
 
from conservancy_beancount.reports import fund
 

	
 
from decimal import Decimal
 

	
 
_ledger_load = bc_loader.load_file(testutil.test_path('books/fund.beancount'))
 
START_DATE = datetime.date(2018, 3, 1)
 
MID_DATE = datetime.date(2019, 3, 1)
 
STOP_DATE = datetime.date(2020, 3, 1)
 

	
 
EQUITY_ROOT_ACCOUNTS = ('Expenses:', 'Equity:', 'Income:')
 

	
 
OPENING_BALANCES = {
 
    'Alpha': 3000,
 
    'Bravo': 2000,
 
    'Charlie': 1000,
 
    'Conservancy': 4000,
 
    'Delta': 0,
 
}
 

	
 
BALANCES_BY_YEAR = {
 
    ('Conservancy', 2018): [
 
        ('Income:Other', 40),
 
        ('Expenses:Other', -4),
 
        ('Assets:Receivable:Accounts', 40),
 
        ('Liabilities:Payable:Accounts', 4),
 
    ],
 
    ('Conservancy', 2019): [
 
        ('Income:Other', 42),
 
        ('Expenses:Other', Decimal('-4.20')),
 
        ('Equity:Funds:Unrestricted', 100),
 
        ('Equity:Realized:CurrencyConversion', Decimal('6.20')),
 
        ('Assets:Receivable:Accounts', -40),
 
        ('Liabilities:Payable:Accounts', -4),
 
    ],
 
    ('Alpha', 2018): [
 
        ('Income:Other', 60),
 
        ('Liabilities:UnearnedIncome', 30),
 
        ('Assets:Prepaid:Expenses', 20),
 
    ],
 
    ('Alpha', 2019): [
 
        ('Income:Other', 30),
 
        ('Expenses:Other', -26),
 
        ('Assets:Prepaid:Expenses', -20),
 
        ('Liabilities:UnearnedIncome', -30),
 
    ],
 
    ('Bravo', 2018): [
 
        ('Expenses:Other', -20),
 
    ],
 
    ('Bravo', 2019): [
 
        ('Income:Other', 200),
 
    ],
 
    ('Charlie', 2019): [
 
        ('Equity:Funds:Restricted', -100),
 
    ],
 
    ('Delta', 2018): [
 
        ('Income:Other', Decimal('.40')),
 
    ],
 
    ('Delta', 2019): [
 
        ('Income:Other', Decimal('4.60')),
 
    ],
 
}
 

	
 
clean_account_meta = pytest.fixture(autouse=True)(testutil.clean_account_meta)
 

	
 
@pytest.fixture
 
def fund_entries():
 
    return copy.deepcopy(_ledger_load[0])
 

	
 
def split_text_lines(output):
 
    for line in output:
 
        account, amount = line.rsplit(None, 1)
 
        yield account.strip(), amount
 

	
 
def format_amount(amount, currency='USD'):
 
    return babel.numbers.format_currency(
 
        amount, currency, format_type='accounting',
 
    )
 

	
 
def check_text_balances(actual, expected, *expect_accounts):
 
    balance = Decimal()
 
    for expect_account in expect_accounts:
 
        expect_amount = expected[expect_account]
 
        balance += expect_amount
 
        if expect_account.startswith('Expenses:'):
 
            expect_amount *= -1
 
        if expect_amount:
 
            actual_account, actual_amount = next(actual)
 
            assert actual_account == expect_account
 
            assert actual_amount == format_amount(expect_amount)
 
    return balance
 

	
 
def check_text_report(output, project, start_date, stop_date):
 
    _, _, project = project.rpartition('=')
 
    balance_amount = Decimal(OPENING_BALANCES[project])
 
    expected = collections.defaultdict(Decimal)
 
    for year in range(2018, stop_date.year):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(project, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    balance_amount += amount
 
                else:
 
                    expected[account] += amount
 
    actual = split_text_lines(output)
 
    next(actual); next(actual)  # Discard headers
 
    open_acct, open_amt = next(actual)
 
    assert open_acct == "{} balance as of {}".format(
 
        project, start_date.isoformat(),
 
    )
 
    assert open_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Income:Other',
 
        'Expenses:Other',
 
        'Equity:Funds:Restricted',
 
        'Equity:Funds:Unrestricted',
 
        'Equity:Realized:CurrencyConversion',
 
    )
 
    next(actual)
 
    end_acct, end_amt = next(actual)
 
    assert end_acct == "{} balance as of {}".format(
 
        project, stop_date.isoformat(),
 
    )
 
    assert end_amt == format_amount(balance_amount)
 
    next(actual)
0 comments (0 inline, 0 general)