Changeset - 73bbc1e4ec8a
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-10-16 14:05:23
brettcsmith@brettcsmith.org
data: Define EQUITY_ACCOUNTS and FUND_ACCOUNTS.
3 files changed with 18 insertions and 23 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/data.py
Show inline comments
 
"""Enhanced Beancount data structures for Conservancy
 

	
 
The classes in this module are interface-compatible with Beancount's core data
 
structures, and provide additional business logic that we want to use
 
throughout Conservancy tools.
 
"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import datetime
 
import decimal
 
import functools
 
import re
 

	
 
from beancount.core import account as bc_account
 
from beancount.core import amount as bc_amount
 
from beancount.core import convert as bc_convert
 
from beancount.core import data as bc_data
 
from beancount.core import position as bc_position
 
from beancount.parser import options as bc_options
 

	
 
from typing import (
 
    cast,
 
    overload,
 
    Callable,
 
    Hashable,
 
    Iterable,
 
    Iterator,
 
    MutableMapping,
 
    Optional,
 
    Pattern,
 
    Sequence,
 
    TypeVar,
 
    Union,
 
)
 

	
 
from .beancount_types import (
 
    Close,
 
    Currency,
 
    Directive,
 
    Meta,
 
    MetaKey,
 
    MetaValue,
 
    Open,
 
    OptionsMap,
 
    Posting as BasePosting,
 
    Transaction,
 
)
 

	
 
DecimalCompat = Union[decimal.Decimal, int]
 

	
 
EQUITY_ACCOUNTS = frozenset([
 
    'Equity',
 
    'Expenses',
 
    'Income',
 
])
 
FUND_ACCOUNTS = EQUITY_ACCOUNTS | frozenset([
 
    'Assets:Prepaid',
 
    'Assets:Receivable',
 
    'Liabilities:Payable',
 
    'Liabilities:UnearnedIncome',
 
])
 
LINK_METADATA = frozenset([
 
    'approval',
 
    'bank-statement',
 
    'check',
 
    'contract',
 
    'invoice',
 
    'purchase-order',
 
    'receipt',
 
    'rt-id',
 
    'statement',
 
    'tax-statement',
 
])
 

	
 
class AccountMeta(MutableMapping[MetaKey, MetaValue]):
 
    """Access account metadata
 

	
 
    This class provides a consistent interface to all the metadata provided by
 
    Beancount's ``open`` and ``close`` directives: open and close dates,
 
    used currencies, booking method, and the metadata associated with each.
 

	
 
    For convenience, you can use this class as a Mapping to access the ``open``
 
    directive's metadata directly.
 
    """
 
    __slots__ = ('_opening', '_closing')
 

	
 
    def __init__(self, opening: Open, closing: Optional[Close]=None) -> None:
 
        self._opening = opening
 
        self._closing: Optional[Close] = None
 
        if closing is not None:
 
            self.add_closing(closing)
 

	
 
    def add_closing(self, closing: Close) -> None:
 
        if self._closing is not None and self._closing is not closing:
 
            raise ValueError(f"{self.account} already closed by {self._closing!r}")
 
        elif closing.account != self.account:
 
            raise ValueError(f"cannot close {self.account} with {closing.account}")
 
        elif closing.date < self.open_date:
 
            raise ValueError(f"close date {closing.date} predates open date {self.open_date}")
 
        else:
 
            self._closing = closing
 

	
 
    def __iter__(self) -> Iterator[MetaKey]:
 
        return iter(self._opening.meta)
 

	
 
    def __len__(self) -> int:
 
        return len(self._opening.meta)
 

	
 
    def __getitem__(self, key: MetaKey) -> MetaValue:
 
        return self._opening.meta[key]
 

	
 
    def __setitem__(self, key: MetaKey, value: MetaValue) -> None:
 
        self._opening.meta[key] = value
 

	
 
    def __delitem__(self, key: MetaKey) -> None:
 
        del self._opening.meta[key]
 

	
 
    @property
 
    def account(self) -> 'Account':
 
        return Account(self._opening.account)
 

	
 
    @property
 
    def booking(self) -> Optional[bc_data.Booking]:
 
        return self._opening.booking
 

	
 
    @property
 
    def close_date(self) -> Optional[datetime.date]:
 
        return None if self._closing is None else self._closing.date
 

	
 
    @property
 
    def close_meta(self) -> Optional[Meta]:
 
        return None if self._closing is None else self._closing.meta
 

	
 
    @property
 
    def currencies(self) -> Optional[Sequence[Currency]]:
 
        return self._opening.currencies
 

	
 
    @property
 
    def open_date(self) -> datetime.date:
 
        return self._opening.date
 

	
 
    @property
 
    def open_meta(self) -> Meta:
 
        return self._opening.meta
 

	
 

	
 
class Account(str):
 
    """Account name string
 

	
 
    This is a string that names an account, like Assets:Bank:Checking
 
    or Income:Donations. This class provides additional methods for common
 
    account name parsing and queries.
 
    """
 
    __slots__ = ()
 

	
 
    ACCOUNT_RE: Pattern
 
    SEP = bc_account.sep
 
    _meta_map: MutableMapping[str, AccountMeta] = {}
 
    _options_map: OptionsMap
 

	
 
    @classmethod
 
    def load_options_map(cls, options_map: OptionsMap) -> None:
 
        cls._options_map = options_map
 
        roots: Sequence[str] = bc_options.get_account_types(options_map)
 
        cls.ACCOUNT_RE = re.compile(
 
            r'^(?:{})(?:{}[A-Z0-9][-A-Za-z0-9]*)+$'.format(
 
                '|'.join(roots), cls.SEP,
 
            ))
 

	
 
    @classmethod
 
    def load_opening(cls, opening: Open) -> None:
 
        cls._meta_map[opening.account] = AccountMeta(opening)
 

	
 
    @classmethod
 
    def load_closing(cls, closing: Close) -> None:
 
        try:
 
            cls._meta_map[closing.account].add_closing(closing)
 
        except KeyError:
 
            raise ValueError(
 
                f"tried to load {closing.account} close directive before open",
 
            ) from None
 

	
 
    @classmethod
 
    def load_openings_and_closings(cls, entries: Iterable[Directive]) -> None:
 
        for entry in entries:
 
            # type ignores because Beancount's directives aren't type-checkable.
 
            if isinstance(entry, bc_data.Open):
 
                cls.load_opening(entry)  # type:ignore[arg-type]
 
            elif isinstance(entry, bc_data.Close):
conservancy_beancount/reports/balance_sheet.py
Show inline comments
 
"""balance_sheet.py - Balance sheet report"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import logging
 
import operator
 
import sys
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
from typing import (
 
    Any,
 
    Callable,
 
    Collection,
 
    Dict,
 
    Hashable,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    NamedTuple,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
    Union,
 
)
 

	
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from . import rewrite
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from .. import ranges
 

	
 
EQUITY_ACCOUNTS = frozenset(['Equity', 'Income', 'Expenses'])
 
PROGNAME = 'balance-sheet-report'
 
logger = logging.getLogger('conservancy_beancount.reports.balance_sheet')
 

	
 
KWArgs = Mapping[str, Any]
 

	
 
class Fund(enum.IntFlag):
 
    RESTRICTED = enum.auto()
 
    UNRESTRICTED = enum.auto()
 
    ANY = RESTRICTED | UNRESTRICTED
 

	
 

	
 
class Period(enum.IntFlag):
 
    OPENING = enum.auto()
 
    PRIOR = enum.auto()
 
    MIDDLE = enum.auto()
 
    PERIOD = enum.auto()
 
    THRU_PRIOR = OPENING | PRIOR
 
    THRU_MIDDLE = THRU_PRIOR | MIDDLE
 
    ANY = THRU_MIDDLE | PERIOD
 

	
 

	
 
class BalanceKey(NamedTuple):
 
    account: data.Account
 
    classification: data.Account
 
    period: Period
 
    fund: Fund
 
    post_type: Optional[str]
 

	
 

	
 
class Balances:
 
    def __init__(self,
 
                 postings: Iterable[data.Posting],
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
                 fund_key: str='project',
 
                 unrestricted_fund_value: str='Conservancy',
 
    ) -> None:
 
        year_diff = (stop_date - start_date).days // 365
 
        if year_diff == 0:
 
            self.prior_range = ranges.DateRange(
 
                cliutil.diff_year(start_date, -1),
 
                cliutil.diff_year(stop_date, -1),
 
            )
 
            self.period_desc = "Period"
 
        else:
 
            self.prior_range = ranges.DateRange(
 
                cliutil.diff_year(start_date, -year_diff),
 
                start_date,
 
            )
 
            self.period_desc = f"Year{'s' if year_diff > 1 else ''}"
 
        self.middle_range = ranges.DateRange(self.prior_range.stop, start_date)
 
        self.period_range = ranges.DateRange(start_date, stop_date)
 
        self.balances: Mapping[BalanceKey, core.MutableBalance] \
 
            = collections.defaultdict(core.MutableBalance)
 
        for post in postings:
 
            post_date = post.meta.date
 
            if post_date >= stop_date:
 
                continue
 
            elif post_date in self.period_range:
 
                period = Period.PERIOD
 
            elif post_date in self.middle_range:
 
                period = Period.MIDDLE
 
            elif post_date in self.prior_range:
 
                period = Period.PRIOR
 
            else:
 
                period = Period.OPENING
 
            if post.meta.get(fund_key) == unrestricted_fund_value:
 
                fund = Fund.UNRESTRICTED
 
            else:
 
                fund = Fund.RESTRICTED
 
            try:
 
                classification_s = post.account.meta['classification']
 
                if isinstance(classification_s, str):
 
                    classification = data.Account(classification_s)
 
                else:
 
                    raise TypeError()
 
            except (KeyError, TypeError):
 
                classification = post.account
 
            if post.account.root_part() == 'Expenses':
 
                post_type = post.meta.get('expense-type')
 
            else:
 
                post_type = None
 
            key = BalanceKey(post.account, classification, period, fund, post_type)
 
            self.balances[key] += post.at_cost()
 

	
 
    def total(self,
 
              account: Union[None, str, Collection[str]]=None,
 
              classification: Optional[str]=None,
 
              period: int=Period.ANY,
 
              fund: int=Fund.ANY,
 
              post_type: Optional[str]=None,
 
              *,
 
              account_exact: bool=False,
 
    ) -> core.Balance:
 
        if isinstance(account, str):
 
            account = (account,)
 
        acct_pred: Callable[[data.Account], bool]
 
        if account is None:
 
            acct_pred = lambda acct: True
 
        elif account_exact:
 
            # At this point, between this isinstance() above and the earlier
 
            # `account is None` check, we've collapsed the type of `account` to
 
            # `Collection[str]`. Unfortunately the logic is too involved for
 
            # mypy to follow, so ignore the type problem.
 
            acct_pred = lambda acct: acct in account  # type:ignore[operator]
 
        else:
 
            acct_pred = lambda acct: acct.is_under(*account) is not None  # type:ignore[misc]
 
        retval = core.MutableBalance()
 
        for key, balance in self.balances.items():
 
            if not acct_pred(key.account):
 
                pass
 
            elif not (classification is None
 
                      or key.classification.is_under(classification)):
 
                pass
 
            elif not period & key.period:
 
                pass
 
            elif not fund & key.fund:
 
                pass
 
            elif not (post_type is None or post_type == key.post_type):
 
                pass
 
            else:
 
                retval += balance
 
        return retval
 

	
 
    def classifications(self,
 
                        account: str,
 
                        sort_period: Optional[int]=None,
 
    ) -> Sequence[data.Account]:
 
        if sort_period is None:
 
            if account in EQUITY_ACCOUNTS:
 
            if account in data.EQUITY_ACCOUNTS:
 
                sort_period = Period.PERIOD
 
            else:
 
                sort_period = Period.ANY
 
        class_bals: Mapping[data.Account, core.MutableBalance] \
 
            = collections.defaultdict(core.MutableBalance)
 
        for key, balance in self.balances.items():
 
            if not key.account.is_under(account):
 
                pass
 
            elif key.period & sort_period:
 
                class_bals[key.classification] += balance
 
            else:
 
                # Ensure the balance exists in the mapping
 
                class_bals[key.classification]
 
        norm_func = core.normalize_amount_func(f'{account}:RootsOK')
 
        def sortkey(acct: data.Account) -> Hashable:
 
            prefix, _, _ = acct.rpartition(':')
 
            balance = norm_func(class_bals[acct])
 
            try:
 
                max_bal = max(amount.number for amount in balance.values())
 
            except ValueError:
 
                max_bal = Decimal(0)
 
            return prefix, -max_bal
 
        return sorted(class_bals, key=sortkey)
 

	
 
    def iter_accounts(self, root: Optional[str]=None) -> Sequence[data.Account]:
 
        start_date = self.period_range.start
 
        stop_date = self.period_range.stop
 
        return sorted(
 
            account
 
            for account in data.Account.iter_accounts(root)
 
            if account.meta.open_date < stop_date
 
            and (account.meta.close_date is None
 
                 or account.meta.close_date > start_date)
 
        )
 

	
 

	
 
class Report(core.BaseODS[Sequence[None], None]):
 
    C_CASH = 'Cash'
 
    NO_BALANCE = core.Balance()
 
    SPACE = ' ' * 4
 

	
 
    def __init__(self,
 
                 balances: Balances,
 
                 *,
 
                 date_fmt: str='%B %d, %Y',
 
    ) -> None:
 
        super().__init__()
 
        self.balances = balances
 
        self.period_desc = balances.period_desc
 
        self.date_fmt = date_fmt
 
        one_day = datetime.timedelta(days=1)
 
        date = balances.period_range.stop - one_day
 
        self.period_name = date.strftime(date_fmt)
 
        date = balances.prior_range.stop - one_day
 
        self.opening_name = date.strftime(date_fmt)
 
        self.last_totals_row = odf.table.TableRow()
 

	
 
    def section_key(self, row: Sequence[None]) -> None:
 
        raise NotImplementedError("balance_sheet.Report.section_key")
 

	
 
    def init_styles(self) -> None:
 
        super().init_styles()
 
        self.style_header = self.merge_styles(self.style_bold, self.style_centertext)
 
        self.style_huline = self.merge_styles(
 
            self.style_header,
 
            self.border_style(core.Border.BOTTOM, '1pt'),
 
        )
 

	
 
    def write_all(self) -> None:
 
        self.write_financial_position()
 
        self.write_activities()
 
        self.write_functional_expenses()
 
        self.write_cash_flows()
 
        self.write_trial_balances()
 

	
 
    def walk_classifications(self, cseq: Iterable[data.Account]) \
 
        -> Iterator[Tuple[str, Optional[data.Account]]]:
 
        last_prefix: Sequence[str] = []
 
        for classification in cseq:
 
            parts = classification.split(':')
 
            tail = parts.pop()
 
            space = self.SPACE * len(parts)
 
            if parts != last_prefix:
 
                yield f'{space[len(self.SPACE):]}{parts[-1]}', None
 
                last_prefix = parts
 
            yield f'{space}{tail}', classification
 

	
 
    def walk_classifications_by_account(
 
            self,
 
            account: str,
 
            sort_period: Optional[int]=None,
 
    ) -> Iterator[Tuple[str, Optional[data.Account]]]:
 
        return self.walk_classifications(self.balances.classifications(
 
            account, sort_period,
 
        ))
 

	
 
    def start_sheet(self,
 
                    sheet_name: str,
 
                    *headers: Iterable[str],
 
                    totals_prefix: Sequence[str]=(),
 
                    first_width: Union[float, str]=3,
 
                    width: Union[float, str]=1.5,
 
                    title_fmt: str="DRAFT Statement of {sheet_name}",
 
    ) -> None:
 
        header_cells: Sequence[odf.table.TableCell] = [
 
            odf.table.TableCell(),
 
            *(self.multiline_cell(header_lines, stylename=self.style_huline)
 
              for header_lines in headers),
 
            *(self.multiline_cell([*totals_prefix, date_s], stylename=self.style_huline)
 
              for date_s in [self.period_name, self.opening_name]),
 
        ]
 
        self.col_count = len(header_cells)
 
        self.use_sheet(sheet_name)
 
        for index in range(self.col_count):
 
            col_style = self.column_style(width if index else first_width)
 
            self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
 
        start_date = self.balances.period_range.start.strftime(self.date_fmt)
 
        self.add_row(
 
            self.multiline_cell([
 
                title_fmt.format(sheet_name=sheet_name),
 
                f"{start_date}—{self.period_name}",
 
            ], numbercolumnsspanned=self.col_count, stylename=self.style_header)
 
        )
 
        self.add_row()
 
        self.add_row(*header_cells)
 

	
 
    def write_classifications_by_account(
 
            self,
 
            account: str,
 
            balance_kwargs: Sequence[KWArgs],
 
            exclude_classifications: Collection[str]=frozenset(),
 
            text_prefix: str='',
 
            norm_func: Optional[Callable[[core.Balance], core.Balance]]=None,
 
    ) -> Sequence[core.Balance]:
 
        if norm_func is None:
 
            norm_func = core.normalize_amount_func(f'{account}:RootsOK')
 
        assert len(balance_kwargs) + 1 == self.col_count, \
 
            "called write_classifications with wrong number of balance_kwargs"
 
        retval = [core.MutableBalance() for _ in balance_kwargs]
 
        for text, classification in self.walk_classifications_by_account(account):
 
            text_cell = self.string_cell(text_prefix + text)
 
            if classification is None:
 
                if not text[0].isspace():
 
                    self.add_row()
 
                self.add_row(text_cell)
 
            elif classification in exclude_classifications:
 
                pass
 
            else:
 
                row = self.add_row(text_cell)
 
                for kwargs, total_bal in zip(balance_kwargs, retval):
 
                    balance = norm_func(self.balances.total(
 
                        classification=classification, **kwargs,
 
                    ))
 
                    row.addElement(self.balance_cell(balance))
 
                    total_bal += balance
 
        return retval
 

	
 
    def write_totals_row(
 
            self,
 
            text: str,
 
            *balances: Sequence[core.Balance],
 
            stylename: Union[None, str, odf.style.Style]=None,
 
            leading_rows: Optional[int]=None,
 
    ) -> odf.table.TableRow:
 
        if leading_rows is None:
 
            if (self.sheet.lastChild is self.last_totals_row
 
                or stylename is self.style_bottomline):
 
                leading_rows = 1
 
            else:
 
                leading_rows = 0
 
        expect_len = self.col_count - 1
 
        assert all(len(seq) == expect_len for seq in balances), \
 
            "called write_totals_row with the wrong length of balance columns"
 
        for _ in range(leading_rows):
 
            self.add_row()
 
        self.last_totals_row = self.add_row(
 
            self.string_cell(text),
 
            *(self.balance_cell(
 
                sum(sum_bals, core.MutableBalance()),
 
                stylename=stylename,
 
            ) for sum_bals in zip(*balances)),
 
        )
 
        return self.last_totals_row
 

	
 
    def write_financial_position(self) -> None:
 
        self.start_sheet("Financial Position")
 
        balance_kwargs: Sequence[KWArgs] = [
 
            {'period': Period.ANY},
 
            {'period': Period.THRU_PRIOR},
 
        ]
 

	
 
        asset_totals = self.write_classifications_by_account('Assets', balance_kwargs)
 
        self.write_totals_row(
 
            "Total Assets", asset_totals, stylename=self.style_bottomline,
 
        )
 
        self.add_row()
 
        self.add_row()
 

	
 
        liabilities = self.write_classifications_by_account('Liabilities', balance_kwargs)
 
        self.write_totals_row(
 
            "Total Liabilities", liabilities, stylename=self.style_endtotal,
 
        )
 
        self.add_row()
 
        self.add_row()
 

	
 
        equity_totals = [core.MutableBalance() for _ in balance_kwargs]
 
        self.add_row(self.string_cell("Net Assets", stylename=self.style_bold))
 
        self.add_row()
 
        for fund in [Fund.UNRESTRICTED, Fund.RESTRICTED]:
 
            preposition = "Without" if fund is Fund.UNRESTRICTED else "With"
 
            row = self.add_row(self.string_cell(f"{preposition} donor restrictions"))
 
            for kwargs, total_bal in zip(balance_kwargs, equity_totals):
 
                balance = -self.balances.total(account=EQUITY_ACCOUNTS, fund=fund, **kwargs)
 
                balance = -self.balances.total(account=data.EQUITY_ACCOUNTS, fund=fund, **kwargs)
 
                row.addElement(self.balance_cell(balance))
 
                total_bal += balance
 
        self.write_totals_row(
 
            "Total Net Assets", equity_totals, stylename=self.style_total,
 
        )
 
        self.write_totals_row(
 
            "Total Liabilities and Net Assets",
 
            liabilities, equity_totals,
 
            stylename=self.style_bottomline,
 
        )
 

	
 
    def write_activities(self) -> None:
 
        self.start_sheet(
 
            "Activities",
 
            ["Without Donor", "Restrictions"],
 
            ["With Donor", "Restrictions"],
 
            totals_prefix=[f"Total {self.period_desc} Ended"],
 
        )
 
        bal_kwargs: Sequence[Dict[str, Any]] = [
 
            {'period': Period.PERIOD, 'fund': Fund.UNRESTRICTED},
 
            {'period': Period.PERIOD, 'fund': Fund.RESTRICTED},
 
            {'period': Period.PERIOD},
 
            {'period': Period.PRIOR},
 
        ]
 

	
 
        self.add_row(self.string_cell("Support and Revenue", stylename=self.style_bold))
 
        self.add_row()
 
        income_totals = self.write_classifications_by_account('Income', bal_kwargs)
 
        self.write_totals_row("", income_totals, stylename=self.style_total)
 
        self.add_row()
 
        self.add_row(
 
            self.string_cell("Net Assets released from restrictions:"),
 
        )
 
        released = self.balances.total(
 
            account=('Expenses', 'Equity'),
 
            period=Period.PERIOD,
 
            fund=Fund.RESTRICTED,
 
        )
 
        other_totals = [core.MutableBalance() for _ in bal_kwargs]
 
        other_totals[0] += released
 
        other_totals[1] -= released
 
        self.write_totals_row("Satisfaction of program restrictions", other_totals)
 
        self.write_totals_row(
 
            "Total Support and Revenue",
 
            income_totals, other_totals,
 
            stylename=self.style_endtotal,
 
        )
 

	
 
        period_expenses = core.MutableBalance()
 
        prior_expenses = core.MutableBalance()
 
        self.add_row()
 
        self.add_row(self.string_cell("Expenses", stylename=self.style_bold))
 
        self.add_row()
 
        for text, type_value in [
 
                ("Program services", 'program'),
 
                ("Management and administrative services", 'management'),
 
                ("Fundraising", 'fundraising'),
 
        ]:
 
            period_bal = self.balances.total(
 
                account='Expenses', period=Period.PERIOD, post_type=type_value,
 
            )
 
            prior_bal = self.balances.total(
 
                account='Expenses', period=Period.PRIOR, post_type=type_value,
 
            )
 
            self.write_totals_row(text, [
 
                period_bal,
 
                self.NO_BALANCE,
 
                period_bal,
 
                prior_bal,
 
            ], leading_rows=0)
 
            period_expenses += period_bal
 
            prior_expenses += prior_bal
 
        period_bal = self.balances.total(account='Expenses', period=Period.PERIOD)
 
        if (period_expenses - period_bal).clean_copy(1).is_zero():
 
            period_bal = period_expenses
 
        else:
 
            logger.warning("Period functional expenses do not match total; math in columns B+D is wrong")
 
        prior_bal = self.balances.total(account='Expenses', period=Period.PRIOR)
 
        if (prior_expenses - prior_bal).clean_copy(1).is_zero():
 
            prior_bal = prior_expenses
 
        else:
 
            logger.warning("Prior functional expenses do not match total; math in column E is wrong")
 
        self.write_totals_row("Total Expenses", [
 
            period_bal,
 
            self.NO_BALANCE,
 
            period_bal,
 
            prior_bal,
 
        ], stylename=self.style_endtotal, leading_rows=0)
 

	
 
        other_totals[0] -= period_bal
 
        other_totals[2] -= period_bal
 
        other_totals[3] -= prior_bal
 
        self.write_totals_row("Change in Net Assets", income_totals, other_totals)
 

	
 
        for kwargs in bal_kwargs:
 
            if kwargs['period'] is Period.PERIOD:
 
                kwargs['period'] = Period.THRU_MIDDLE
 
            else:
 
                kwargs['period'] = Period.OPENING
 
        equity_totals = [
 
            -self.balances.total(account=EQUITY_ACCOUNTS, **kwargs)
 
            -self.balances.total(account=data.EQUITY_ACCOUNTS, **kwargs)
 
            for kwargs in bal_kwargs
 
        ]
 
        self.write_totals_row("Beginning Net Assets", equity_totals)
 
        self.write_totals_row(
 
            "Ending Net Assets",
 
            income_totals, other_totals, equity_totals,
 
            stylename=self.style_bottomline,
 
        )
 

	
 
    def write_functional_expenses(self) -> None:
 
        self.start_sheet(
 
            "Functional Expenses",
 
            ["Program", "Services"],
 
            ["Management and", "Administrative"],
 
            ["Fundraising"],
 
            totals_prefix=[f"Total {self.period_desc} Ended"],
 
        )
 
        totals = self.write_classifications_by_account('Expenses', [
 
            {'period': Period.PERIOD, 'post_type': 'program'},
 
            {'period': Period.PERIOD, 'post_type': 'management'},
 
            {'period': Period.PERIOD, 'post_type': 'fundraising'},
 
            {'period': Period.PERIOD},
 
            {'period': Period.PRIOR},
 
        ])
 
        self.write_totals_row(
 
            "Total Expenses",
 
            totals,
 
            stylename=self.style_bottomline,
 
        )
 

	
 
    def write_cash_flows(self) -> None:
 
        self.start_sheet("Cash Flows")
 
        bal_kwargs: Sequence[Dict[str, Any]] = [
 
            {'period': Period.PERIOD},
 
            {'period': Period.PRIOR},
 
        ]
 
        norm_func = operator.neg
 

	
 
        self.add_row(self.string_cell(
 
            "Cash Flows from Operating Activities",
 
            stylename=self.style_bold,
 
        ))
 
        equity_totals = [
 
            -self.balances.total(account=EQUITY_ACCOUNTS, **kwargs)
 
            -self.balances.total(account=data.EQUITY_ACCOUNTS, **kwargs)
 
            for kwargs in bal_kwargs
 
        ]
 
        self.write_totals_row("Change in Net Assets", equity_totals, leading_rows=1)
 
        self.add_row(self.string_cell(
 
            "(Increase) decrease in operating assets:",
 
        ))
 
        asset_totals = self.write_classifications_by_account(
 
            'Assets', bal_kwargs, (self.C_CASH,), self.SPACE, norm_func,
 
        )
 
        self.add_row(self.string_cell(
 
            "Increase (decrease) in operating liabilities:",
 
        ))
 
        liabilities = self.write_classifications_by_account(
 
            'Liabilities', bal_kwargs, (), self.SPACE, norm_func,
 
        )
 
        period_totals = [
 
            sum(bals, core.MutableBalance())
 
            for bals in zip(equity_totals, asset_totals, liabilities)
 
        ]
 
        self.write_totals_row(
 
            "Net cash provided by operating activities",
 
            period_totals,
 
            stylename=self.style_endtotal,
 
        )
 
        self.write_totals_row("Net Increase in Cash", period_totals)
 
        begin_totals = [
 
            self.balances.total(classification=self.C_CASH, period=period)
 
            for period in [Period.THRU_MIDDLE, Period.OPENING]
 
        ]
 
        self.write_totals_row("Beginning Cash", begin_totals)
 
        self.write_totals_row(
 
            "Ending Cash",
 
            period_totals, begin_totals,
 
            stylename=self.style_bottomline,
 
        )
 

	
 
    def write_trial_balances(self) -> None:
 
        self.start_sheet(
 
            "Trial Balances",
 
            ["Account Name"],
 
            ["Classification"],
 
            ["Account Code"],
 
            ["Balance Beginning", self.balances.period_range.start.strftime(self.date_fmt)],
 
            totals_prefix=["Change During", f"{self.period_desc} Ending"],
 
            title_fmt="Chart of Accounts with DRAFT {sheet_name}",
 
        )
 
        # Widen text columns
 
        col_style = self.column_style(3.5)
 
        for col in self.sheet.childNodes[:2]:
 
            col.setAttribute('stylename', col_style)
 
        # Patch up header row text
 
        header_row = self.sheet.lastChild
 
        header_row.removeChild(header_row.firstChild)
 
        header_row.insertBefore(self.multiline_cell(
 
            ["Balance Ending", self.period_name],
 
            stylename=header_row.lastChild.getAttribute('stylename'),
 
        ), header_row.lastChild)
 

	
 
        for acct_root in ['Assets', 'Liabilities', 'Income', 'Expenses', 'Equity']:
 
            norm_func = core.normalize_amount_func(f'{acct_root}:Dummy')
 
            want_balance = acct_root not in EQUITY_ACCOUNTS
 
            want_balance = acct_root not in data.EQUITY_ACCOUNTS
 
            self.add_row()
 
            for account in self.balances.iter_accounts(acct_root):
 
                period_bal = self.balances.total(
 
                    account=account, period=Period.PERIOD, account_exact=True,
 
                )
 
                prior_bal = self.balances.total(
 
                    account=account, period=Period.PRIOR, account_exact=True,
 
                )
 
                if want_balance:
 
                    close_bal = self.balances.total(account=account, account_exact=True)
 
                    close_cell = self.balance_cell(norm_func(close_bal))
 
                    open_cell = self.balance_cell(norm_func(close_bal - period_bal))
 
                else:
 
                    close_cell = odf.table.TableCell()
 
                    open_cell = odf.table.TableCell()
 
                self.add_row(
 
                    self.string_cell(account),
 
                    self.string_cell(account.meta.get('classification', '')),
 
                    self.string_cell(account.meta.get('account-code', '')),
 
                    open_cell,
 
                    self.balance_cell(norm_func(period_bal)),
 
                    close_cell,
 
                    self.balance_cell(norm_func(prior_bal)),
 
                )
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date.
 
""")
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--fund-metadata-key', '-m',
 
        metavar='KEY',
 
        default='project',
 
        help="""Name of the fund metadata key. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        '--unrestricted-fund', '-u',
 
        metavar='PROJECT',
 
        default='Conservancy',
 
        help="""Name of the unrestricted fund. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    return parser.parse_args(arglist)
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    if args.stop_date is None:
 
        if args.start_date is None:
 
            args.stop_date = datetime.date.today()
 
        else:
 
            args.stop_date = cliutil.diff_year(args.start_date, 1)
 
    if args.start_date is None:
 
        args.start_date = cliutil.diff_year(args.stop_date, -1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, options_map = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        start_fy = config.fiscal_year_begin().for_date(args.start_date) - 1
 
        entries, load_errors, options_map = books_loader.load_fy_range(start_fy, args.stop_date)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 

	
 
    data.Account.load_from_books(entries, options_map)
 
    postings = data.Posting.from_entries(entries)
 
    for rewrite_path in args.rewrite_rules:
 
        try:
 
            ruleset = rewrite.RewriteRuleset.from_yaml(rewrite_path)
 
        except ValueError as error:
 
            logger.critical("failed loading rewrite rules from %s: %s",
 
                            rewrite_path, error.args[0])
 
            return cliutil.ExitCode.RewriteRulesError
 
        postings = ruleset.rewrite(postings)
 

	
 
    balances = Balances(
 
        postings,
 
        args.start_date,
 
        args.stop_date,
 
        args.fund_metadata_key,
 
        args.unrestricted_fund,
 
    )
 
    report = Report(balances)
 
    report.set_common_properties(config.books_repo())
 
    report.write_all()
 
    if args.output_file is None:
 
        out_dir_path = config.repository_path() or Path()
 
        args.output_file = out_dir_path / 'BalanceSheet_{}_{}.ods'.format(
 
            args.start_date.isoformat(), args.stop_date.isoformat(),
 
        )
 
        logger.info("Writing report to %s", args.output_file)
conservancy_beancount/tools/opening_balances.py
Show inline comments
 
#!/usr/bin/env python3
 
"""opening_balances.py - Tool to generate opening balances transactions
 

	
 
This tool generates an opening balances transaction for a given date and writes
 
it to stdout. Use this when you close the books for a year to record the final
 
balances for that year.
 

	
 
Run it without arguments to generate opening balances for the current fiscal
 
year. You can also specify a fiscal year to generate opening balances for, or
 
even a specific date (which can be helpful for testing or debugging).
 
"""
 
# SPDX-FileCopyrightText: © 2020 Martin Michlmayr <tbm@cyrius.com>
 
# SPDX-FileCopyrightText: © 2020 Brett Smith
 
# SPDX-License-Identifier: AGPL-3.0-or-later
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import copy
 
import datetime
 
import enum
 
import locale
 
import logging
 
import sys
 

	
 
from typing import (
 
    Dict,
 
    Hashable,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    NamedTuple,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
)
 
from ..beancount_types import (
 
    Error,
 
    MetaKey,
 
    MetaValue,
 
    Transaction,
 
)
 

	
 
from decimal import Decimal, ROUND_HALF_EVEN, ROUND_HALF_UP
 

	
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from ..reports.core import Balance
 

	
 
from beancount.core import data as bc_data
 
from beancount.core import display_context as bc_dcontext
 
from beancount.parser import printer as bc_printer
 

	
 
from beancount.core.convert import get_cost
 
from beancount.core.inventory import Inventory
 
from beancount.core.position import Position, get_position
 

	
 
EQUITY_ACCOUNTS = frozenset([
 
    'Equity',
 
    'Expenses',
 
    'Income',
 
])
 
FUND_ACCOUNTS = frozenset([
 
    'Assets:Prepaid',
 
    'Assets:Receivable',
 
    'Equity:Funds',
 
    'Equity:Realized',
 
    'Expenses',
 
    'Income',
 
    'Liabilities:Payable',
 
    'Liabilities:UnearnedIncome',
 
])
 
RESTRICTED_ACCOUNT = data.Account('Equity:Funds:Restricted')
 
UNRESTRICTED_ACCOUNT = data.Account('Equity:Funds:Unrestricted')
 
PROGNAME = 'opening-balances'
 
logger = logging.getLogger('conservancy_beancount.tools.opening_balances')
 

	
 
def quantize_amount(
 
        amount: data.Amount,
 
        exp: Decimal=Decimal('.01'),
 
        rounding: str=ROUND_HALF_EVEN,
 
) -> data.Amount:
 
    return amount._replace(number=amount.number.quantize(exp, rounding=rounding))
 

	
 
class AccountWithFund(NamedTuple):
 
    account: data.Account
 
    fund: Optional[MetaValue]
 

	
 
    def sortkey(self) -> Hashable:
 
        account, fund = self
 
        return (
 
            0 if fund is None else 1,
 
            locale.strxfrm(account),
 
            locale.strxfrm(str(fund).casefold()),
 
        )
 

	
 

	
 
class Posting(data.Posting):
 
    @staticmethod
 
    def _position_sortkey(position: Position) -> str:
 
        units, cost = position
 
        if cost is None:
 
            # Beancount type-declares that position.cost must be a Cost, but
 
            # in practice that's not true. Call get_position(post) on any
 
            # post without a cost and see what it returns. Hence the ignore.
 
            return units.currency  # type:ignore[unreachable]
 
        else:
 
            return f'{units.currency} {cost.currency} {cost.date.isoformat()}'
 

	
 
    @classmethod
 
    def build_opening(
 
            cls,
 
            key: AccountWithFund,
 
            meta_key: MetaKey,
 
            inventory: Inventory,
 
    ) -> Iterator[bc_data.Posting]:
 
        account, project = key
 
        if project is None:
 
            meta: Optional[Dict[MetaKey, MetaValue]] = None
 
        else:
 
            meta = {meta_key: project}
 
        for units, cost in sorted(inventory, key=cls._position_sortkey):
 
            if cost is None:
 
                units = quantize_amount(units)
 
            yield bc_data.Posting(
 
                account, units, cost, None, None, copy.copy(meta),
 
            )
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        '--fund-metadata-key', '-m',
 
        metavar='KEY',
 
        dest='meta_key',
 
        default='project',
 
        help="""Name of the fund metadata key. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        '--unrestricted-fund', '-u',
 
        metavar='PROJECT',
 
        default='Conservancy',
 
        help="""Name of the unrestricted fund. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        'as_of_date',
 
        metavar='YEAR_OR_DATE',
 
        type=cliutil.year_or_date_arg,
 
        nargs='?',
 
        help="""Date to generate opening balances for. You can provide just
 
a year to generate balances for the start of that fiscal year. Defaults to the
 
current fiscal year.
 
""")
 
    return parser.parse_args(arglist)
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    fy = config.fiscal_year_begin()
 
    if args.as_of_date is None:
 
        args.as_of_date = fy.for_date()
 
    if isinstance(args.as_of_date, int):
 
        args.as_of_date = fy.first_date(args.as_of_date)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        entries, load_errors, _ = books_loader.load_fy_range(0, args.as_of_date)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 

	
 
    inventories: Mapping[AccountWithFund, Inventory] = collections.defaultdict(Inventory)
 
    for post in Posting.from_entries(entries):
 
        if post.meta.date >= args.as_of_date:
 
            continue
 
        account = post.account
 
        fund_acct_match = post.account.is_under(*FUND_ACCOUNTS)
 
        is_equity = account.root_part() in EQUITY_ACCOUNTS
 
        fund_acct_match = post.account.is_under(*data.FUND_ACCOUNTS)
 
        is_equity = account.root_part() in data.EQUITY_ACCOUNTS
 
        if fund_acct_match is None:
 
            project: MetaValue = None
 
        else:
 
            project = post.meta.get(args.meta_key)
 
            if project is None:
 
                bc_printer.print_error(Error(
 
                    post.meta, "no fund specified", post.meta.txn,
 
                ), file=stderr)
 
                project = args.unrestricted_fund
 
            if is_equity:
 
                if project == args.unrestricted_fund:
 
                    account = UNRESTRICTED_ACCOUNT
 
                else:
 
                    account = RESTRICTED_ACCOUNT
 
        inventory = inventories[AccountWithFund(account, project)]
 
        if is_equity:
 
            inventory.add_amount(post.at_cost())
 
        else:
 
            inventory.add_position(get_position(post))
 

	
 
    opening_date = args.as_of_date - datetime.timedelta(1)
 
    opening = bc_data.Transaction(  # type:ignore[operator]
 
        None,  # meta
 
        opening_date,
 
        '*',
 
        None,  # payee
 
        f"Opening balances for FY{fy.for_date(args.as_of_date)}",
 
        frozenset(),  # tags
 
        frozenset(),  # links
 
        [post
 
         for key in sorted(inventories, key=AccountWithFund.sortkey)
 
         for post in Posting.build_opening(key, args.meta_key, inventories[key])
 
        ])
 
    balance = Balance(get_cost(get_position(post))
 
                      for post in opening.postings)
 
    for amount in balance.clean_copy().values():
 
        opening.postings.append(bc_data.Posting(
 
            UNRESTRICTED_ACCOUNT, quantize_amount(-amount), None, None, None,
 
            {args.meta_key: args.unrestricted_fund},
 
        ))
 
    dcontext = bc_dcontext.DisplayContext()
 
    dcontext.set_commas(True)
 
    bc_printer.print_entry(opening, dcontext, file=stdout)
 
    return returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
0 comments (0 inline, 0 general)