Changeset - f3c68ff46287
[Not reviewed]
0 1 3
Brett Smith - 4 years ago 2020-06-25 14:51:37
brettcsmith@brettcsmith.org
opening_balances: New tool.
4 files changed with 411 insertions and 1 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/tools/__init__.py
Show inline comments
 
new file 100644
conservancy_beancount/tools/opening_balances.py
Show inline comments
 
new file 100644
 
#!/usr/bin/env python3
 
"""opening_balances.py - Tool to generate opening balances transactions
 

	
 
This tool generates an opening balances transaction for a given date and writes
 
it to stdout. Use this when you close the books for a year to record the final
 
balances for that year.
 

	
 
Run it without arguments to generate opening balances for the current fiscal
 
year. You can also specify a fiscal year to generate opening balances for, or
 
even a specific date (which can be helpful for testing or debugging).
 
"""
 
# SPDX-FileCopyrightText: © 2020 Martin Michlmayr <tbm@cyrius.com>
 
# SPDX-FileCopyrightText: © 2020 Brett Smith
 
# SPDX-License-Identifier: AGPL-3.0-or-later
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import copy
 
import datetime
 
import enum
 
import locale
 
import logging
 
import sys
 

	
 
from typing import (
 
    Dict,
 
    Hashable,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    NamedTuple,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
)
 
from ..beancount_types import (
 
    Error,
 
    MetaKey,
 
    MetaValue,
 
    Transaction,
 
)
 

	
 
from decimal import Decimal, ROUND_HALF_EVEN, ROUND_HALF_UP
 

	
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from ..reports.core import Balance
 

	
 
from beancount.core import data as bc_data
 
from beancount.core import display_context as bc_dcontext
 
from beancount.parser import printer as bc_printer
 

	
 
from beancount.core.convert import get_cost
 
from beancount.core.inventory import Inventory
 
from beancount.core.position import Position, get_position
 

	
 
EQUITY_ACCOUNTS = frozenset([
 
    'Equity',
 
    'Expenses',
 
    'Income',
 
])
 
FUND_ACCOUNTS = frozenset([
 
    'Assets:Prepaid',
 
    'Assets:Receivable',
 
    'Equity:Funds',
 
    'Equity:Realized',
 
    'Expenses',
 
    'Income',
 
    'Liabilities:Payable',
 
    'Liabilities:UnearnedIncome',
 
])
 
RESTRICTED_ACCOUNT = data.Account('Equity:Funds:Restricted')
 
UNRESTRICTED_ACCOUNT = data.Account('Equity:Funds:Unrestricted')
 
PROGNAME = 'opening-balances'
 
logger = logging.getLogger('conservancy_beancount.tools.opening_balances')
 

	
 
def quantize_amount(
 
        amount: data.Amount,
 
        exp: Decimal=Decimal('.01'),
 
        rounding: str=ROUND_HALF_EVEN,
 
) -> data.Amount:
 
    return amount._replace(number=amount.number.quantize(exp, rounding=rounding))
 

	
 
class AccountWithFund(NamedTuple):
 
    account: data.Account
 
    fund: Optional[MetaValue]
 

	
 
    def sortkey(self) -> Hashable:
 
        account, fund = self
 
        return (
 
            0 if fund is None else 1,
 
            locale.strxfrm(account),
 
            locale.strxfrm(str(fund).casefold()),
 
        )
 

	
 

	
 
class Posting(data.Posting):
 
    @staticmethod
 
    def _position_sortkey(position: Position) -> str:
 
        units, cost = position
 
        if cost is None:
 
            # Beancount type-declares that position.cost must be a Cost, but
 
            # in practice that's not true. Call get_position(post) on any
 
            # post without a cost and see what it returns. Hence the ignore.
 
            return units.currency  # type:ignore[unreachable]
 
        else:
 
            return f'{units.currency} {cost.currency} {cost.date.isoformat()}'
 

	
 
    @classmethod
 
    def build_opening(
 
            cls,
 
            key: AccountWithFund,
 
            meta_key: MetaKey,
 
            inventory: Inventory,
 
    ) -> Iterator[bc_data.Posting]:
 
        account, project = key
 
        if project is None:
 
            meta: Optional[Dict[MetaKey, MetaValue]] = None
 
        else:
 
            meta = {meta_key: project}
 
        for units, cost in sorted(inventory, key=cls._position_sortkey):
 
            if cost is None:
 
                units = quantize_amount(units)
 
            yield bc_data.Posting(
 
                account, units, cost, None, None, copy.copy(meta),
 
            )
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        '--fund-metadata-key', '-m',
 
        metavar='KEY',
 
        dest='meta_key',
 
        default='project',
 
        help="""Name of the fund metadata key. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        '--unrestricted-fund', '-u',
 
        metavar='PROJECT',
 
        default='Conservancy',
 
        help="""Name of the unrestricted fund. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        'as_of_date',
 
        metavar='YEAR_OR_DATE',
 
        type=cliutil.year_or_date_arg,
 
        nargs='?',
 
        help="""Date to generate opening balances for. You can provide just
 
a year to generate balances for the start of that fiscal year. Defaults to the
 
current fiscal year.
 
""")
 
    return parser.parse_args(arglist)
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    fy = config.fiscal_year_begin()
 
    if args.as_of_date is None:
 
        args.as_of_date = fy.for_date()
 
    if isinstance(args.as_of_date, int):
 
        args.as_of_date = fy.first_date(args.as_of_date)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    else:
 
        entries, load_errors, _ = books_loader.load_fy_range(0, args.as_of_date)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 

	
 
    inventories: Mapping[AccountWithFund, Inventory] = collections.defaultdict(Inventory)
 
    for post in Posting.from_entries(entries):
 
        if post.meta.date >= args.as_of_date:
 
            continue
 
        account = post.account
 
        fund_acct_match = post.account.is_under(*FUND_ACCOUNTS)
 
        is_equity = account.root_part() in EQUITY_ACCOUNTS
 
        if fund_acct_match is None:
 
            project: MetaValue = None
 
        else:
 
            project = post.meta.get(args.meta_key)
 
            if project is None:
 
                bc_printer.print_error(Error(
 
                    post.meta, "no fund specified", post.meta.txn,
 
                ), file=stderr)
 
                project = args.unrestricted_fund
 
            if is_equity:
 
                if project == args.unrestricted_fund:
 
                    account = UNRESTRICTED_ACCOUNT
 
                else:
 
                    account = RESTRICTED_ACCOUNT
 
        inventory = inventories[AccountWithFund(account, project)]
 
        if is_equity:
 
            inventory.add_amount(post.at_cost())
 
        else:
 
            inventory.add_position(get_position(post))
 

	
 
    opening_date = args.as_of_date - datetime.timedelta(1)
 
    opening = bc_data.Transaction(  # type:ignore[operator]
 
        None,  # meta
 
        opening_date,
 
        '*',
 
        None,  # payee
 
        f"Opening balances for FY{fy.for_date(args.as_of_date)}",
 
        frozenset(),  # tags
 
        frozenset(),  # links
 
        [post
 
         for key in sorted(inventories, key=AccountWithFund.sortkey)
 
         for post in Posting.build_opening(key, args.meta_key, inventories[key])
 
        ])
 
    balance = Balance(get_cost(get_position(post))
 
                      for post in opening.postings)
 
    for amount in balance.clean_copy().values():
 
        opening.postings.append(bc_data.Posting(
 
            UNRESTRICTED_ACCOUNT, quantize_amount(-amount), None, None, None,
 
            {args.meta_key: args.unrestricted_fund},
 
        ))
 
    dcontext = bc_dcontext.DisplayContext()
 
    dcontext.set_commas(True)
 
    bc_printer.print_entry(opening, dcontext, file=stdout)
 
    return 0 if returncode == 0 else 16 + returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
setup.py
Show inline comments
...
 
@@ -5,7 +5,7 @@ from setuptools import setup
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.4.1',
 
    version='1.5.0',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
...
 
@@ -38,6 +38,7 @@ setup(
 
            'accrual-report = conservancy_beancount.reports.accrual:entry_point',
 
            'fund-report = conservancy_beancount.reports.fund:entry_point',
 
            'ledger-report = conservancy_beancount.reports.ledger:entry_point',
 
            'opening-balances = conservancy_beancount.tools.opening_balances:entry_point',
 
        ],
 
    },
 
)
tests/test_opening_balances.py
Show inline comments
 
new file 100644
 
"""test_tools_opening_balances.py - Unit tests for opening balance generation"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import copy
 
import datetime
 
import io
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from beancount import loader as bc_loader
 
from conservancy_beancount.tools import opening_balances as openbalmod
 

	
 
from decimal import Decimal
 
from typing import NamedTuple, Optional
 

	
 
A_CHECKING = 'Assets:Checking'
 
A_EUR = 'Assets:EUR'
 
A_PREPAID = 'Assets:Prepaid:Expenses'
 
A_RECEIVABLE = 'Assets:Receivable:Accounts'
 
A_RESTRICTED = 'Equity:Funds:Restricted'
 
A_UNRESTRICTED = 'Equity:Funds:Unrestricted'
 
A_CURRCONV = 'Equity:Realized:CurrencyConversion'
 
A_CREDITCARD = 'Liabilities:CreditCard'
 
A_PAYABLE = 'Liabilities:Payable:Accounts'
 
A_UNEARNED = 'Liabilities:UnearnedIncome'
 

	
 
class FlatPosting(NamedTuple):
 
    account: str
 
    units_number: Decimal
 
    units_currency: str
 
    cost_number: Optional[Decimal]
 
    cost_currency: Optional[str]
 
    cost_date: Optional[datetime.date]
 
    project: Optional[str]
 

	
 
    def __repr__(self):
 
        cost_s = f' {{{self.cost_number} {self.cost_currency}, {self.cost_date}}}'
 
        if cost_s == ' {None None, None}':
 
            cost_s = ''
 
        return f'<{self.account}  {self.units_number} {self.units_currency}{cost_s}>'
 

	
 
    @classmethod
 
    def make(cls,
 
             account,
 
             units_number,
 
             units_currency='USD',
 
             cost_number=None,
 
             cost_date=None,
 
             project=None,
 
             cost_currency=None,
 
    ):
 
        if cost_number is not None:
 
            cost_number = Decimal(cost_number)
 
            if cost_currency is None:
 
                cost_currency = 'USD'
 
            if isinstance(cost_date, str):
 
                cost_date = datetime.datetime.strptime(cost_date, '%Y-%m-%d').date()
 
        return cls(account, Decimal(units_number), units_currency,
 
                   cost_number, cost_currency, cost_date, project)
 

	
 
    @classmethod
 
    def from_beancount(cls, posting):
 
        units_number, units_currency = posting.units
 
        if posting.cost is None:
 
            cost_number = cost_currency = cost_date = None
 
        else:
 
            cost_number, cost_currency, cost_date, _ = posting.cost
 
        try:
 
            project = posting.meta['project']
 
        except (AttributeError, KeyError):
 
            project = None
 
        return cls(posting.account, units_number, units_currency,
 
                   cost_number, cost_currency, cost_date, project)
 

	
 
    @classmethod
 
    def from_output(cls, output):
 
        entries, _, _ = bc_loader.load_string(output.read())
 
        return (cls.from_beancount(post) for post in entries[-1].postings)
 

	
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/fund.beancount'),
 
        )
 
    output = io.StringIO()
 
    errors = io.StringIO()
 
    retcode = openbalmod.main(arglist, output, errors, config)
 
    output.seek(0)
 
    return retcode, output, errors
 

	
 
@pytest.mark.parametrize('arg', ['2018', '2018-03-01'])
 
def test_2018_opening(arg):
 
    retcode, output, errors = run_main([arg])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    assert list(FlatPosting.from_output(output)) == [
 
        FlatPosting.make(A_CHECKING, 10000),
 
        FlatPosting.make(A_RESTRICTED, -3000, project='Alpha'),
 
        FlatPosting.make(A_RESTRICTED, -2000, project='Bravo'),
 
        FlatPosting.make(A_RESTRICTED, -1000, project='Charlie'),
 
        FlatPosting.make(A_UNRESTRICTED, -4000, project='Conservancy'),
 
    ]
 

	
 
@pytest.mark.parametrize('arg', ['2019', '2019-03-01'])
 
def test_2019_opening(arg):
 
    retcode, output, errors = run_main([arg])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    assert list(FlatPosting.from_output(output)) == [
 
        FlatPosting.make(A_CHECKING, 10050),
 
        FlatPosting.make(A_PREPAID, 20, project='Alpha'),
 
        FlatPosting.make(A_RECEIVABLE, 32, 'EUR', '1.25', '2018-03-03', 'Conservancy'),
 
        FlatPosting.make(A_RESTRICTED, -3060, project='Alpha'),
 
        FlatPosting.make(A_RESTRICTED, -1980, project='Bravo'),
 
        FlatPosting.make(A_RESTRICTED, -1000, project='Charlie'),
 
        FlatPosting.make(A_UNRESTRICTED, -4036, project='Conservancy'),
 
        FlatPosting.make(A_PAYABLE, -4, project='Conservancy'),
 
        FlatPosting.make(A_UNEARNED, -30, project='Alpha'),
 
    ]
 

	
 
@pytest.mark.parametrize('arg', ['2020', '2020-12-31'])
 
def test_2020_opening(arg):
 
    retcode, output, errors = run_main([arg])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    assert list(FlatPosting.from_output(output)) == [
 
        FlatPosting.make(A_CHECKING, 10276),
 
        FlatPosting.make(A_EUR, 32, 'EUR', '1.5', '2019-03-03'),
 
        FlatPosting.make(A_RESTRICTED, -3064, project='Alpha'),
 
        FlatPosting.make(A_RESTRICTED, -2180, project='Bravo'),
 
        FlatPosting.make(A_RESTRICTED, -1000, project='Charlie'),
 
        FlatPosting.make(A_UNRESTRICTED, -4080, project='Conservancy'),
 
    ]
0 comments (0 inline, 0 general)