Changeset - f23e0431e277
[Not reviewed]
0 1 1
Brett Smith - 3 years ago 2021-01-31 22:06:40
brettcsmith@brettcsmith.org
reconcile: New statement reconciler.

For now, this is basically just a specialized ledger report. It highlights
rows that already appear reconciled, and reports different balances, with
appropriate formulas to assist interactive reconciliation.

In the future I hope we can extend this to read various CSV statements and
then highlight rows different based on discrepancies between the statement
and the books, sort of like the PayPal reconciler does now.
2 files changed with 455 insertions and 1 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reconcile/statement.py
Show inline comments
 
new file 100644
 
"""statement.py - Reconciliation report based on statement metadata"""
 
# Copyright © 2021  Brett Smith
 
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import logging
 
import os
 
import sys
 

	
 
from typing import (
 
    Any,
 
    Collection,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    Optional,
 
    Sequence,
 
    Set,
 
    TextIO,
 
    Tuple,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaKey,
 
)
 

	
 
from pathlib import Path
 

	
 
import odf.table  # type:ignore[import]
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from .. import rtutil
 
from ..ranges import DateRange
 
from ..reports import core
 

	
 
PROGNAME = 'reconcile-statement'
 
logger = logging.getLogger('conservancy_beancount.reconcile.statement')
 

	
 
class StatementReconciliation(core.BaseODS[data.Posting, data.Account]):
 
    COLUMNS: Mapping[str, Union[None, int, float]] = collections.OrderedDict([
 
        ("Date", None),
 
        (data.Metadata.human_name('entity'), 1.5),
 
        ("Description", 2),
 
        ("Amount", 1),
 
        ("ID", 1.5),
 
        ("Statement", 2),
 
    ])
 
    ID_META_MAP: Mapping[str, MetaKey] = collections.OrderedDict([
 
        ('Assets,PayPal', 'paypal-id'),
 
        ('Assets', 'check-id'),
 
    ])
 
    STATEMENT_META_MAP: Mapping[str, MetaKey] = {
 
        'Assets': 'bank-statement',
 
        'Expenses': 'tax-statement',
 
    }
 

	
 
    class Display(enum.Enum):
 
        RECONCILED = '#00ffff'
 
        NORMAL = '<normal>'
 
        HIDDEN = '<hidden>'
 

	
 
    def __init__(
 
            self,
 
            rt_wrapper: Optional[rtutil.RT],
 
            pre_range: DateRange,
 
            rec_range: DateRange,
 
            post_range: DateRange,
 
            accounts: Collection[data.Account],
 
            statement_metakey: Optional[str]=None,
 
            id_metakey: Optional[str]=None,
 
    ) -> None:
 
        super().__init__(rt_wrapper)
 
        self.pre_range = pre_range
 
        self.rec_range = rec_range
 
        self.post_range = post_range
 
        self.accounts = sorted(accounts)
 
        self.statement_metakey = statement_metakey
 
        self.id_metakey = id_metakey
 

	
 
    def section_key(self, row: data.Posting) -> data.Account:
 
        return row.account
 

	
 
    def init_styles(self) -> None:
 
        super().init_styles()
 
        self.style_baldate = self.merge_styles(self.style_date, self.style_bold)
 
        self.style_balance = self.merge_styles(self.style_bold, self.style_endtotal)
 

	
 
    def start_section(self, account: str) -> None:
 
        self.use_sheet(account.replace(':', ' '))
 
        for width in self.COLUMNS.values():
 
            col_style = None if width is None else self.column_style(width)
 
            self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
 
        self.add_row(*(
 
            self.string_cell(text, stylename=self.style_bold)
 
            for text in self.COLUMNS
 
        ))
 
        self.lock_first_row()
 

	
 
    def _date_range_posts(
 
            self,
 
            postings: Sequence[data.Posting],
 
            date_range: DateRange,
 
    ) -> core.RelatedPostings:
 
        return core.RelatedPostings(p for p in postings if p.meta.date in date_range)
 

	
 
    def _most_common_first_link(self, posts: core.RelatedPostings, key: MetaKey) -> str:
 
        counter = collections.Counter(post.meta.first_link(key, '') for post in posts)
 
        for value, _ in counter.most_common(2):
 
            if value:
 
                return value
 
        else:
 
            return ''
 

	
 
    def write_posts(
 
            self,
 
            posts: core.RelatedPostings,
 
            statement_metakey: MetaKey,
 
            id_metakey: MetaKey,
 
            common_display: Display=Display.NORMAL,
 
            common_statement: Optional[str]=None,
 
    ) -> Tuple[int, int]:
 
        if common_statement is None:
 
            common_statement = self._most_common_first_link(posts, statement_metakey)
 
        if common_display.value.startswith('#'):
 
            common_style: Optional[odf.style.Style] = self.border_style(
 
                core.Border.LEFT, '10pt', 'solid', common_display.value,
 
            )
 
        else:
 
            common_style = None
 

	
 
        self.add_row()
 
        start_row = self.row_count()
 
        for posting in posts:
 
            if posting.meta.first_link(statement_metakey) == common_statement:
 
                if common_display is self.Display.HIDDEN:
 
                    continue
 
                row_style = self.merge_styles(self.style_date, common_style)
 
            else:
 
                row_style = self.style_date
 
            self.add_row(
 
                self.date_cell(posting.meta.date, stylename=row_style),
 
                self.string_cell(posting.meta.get('entity', '<none>')),
 
                self.string_cell(posting.meta.txn.narration),
 
                self.currency_cell(posting.units),
 
                self.string_cell(posting.meta.get(id_metakey, ''), stylename=self.style_endtext),
 
                self.meta_links_cell(posting.meta.report_links(statement_metakey)),
 
            )
 
        end_row = self.row_count()
 
        if start_row == end_row:
 
            self.sheet.childNodes.pop()
 
        else:
 
            start_row += 1
 
        return (start_row, end_row)
 

	
 
    def write_pre_posts(
 
            self,
 
            posts: core.RelatedPostings,
 
            statement_metakey: MetaKey,
 
            id_metakey: MetaKey,
 
    ) -> None:
 
        posts = self._date_range_posts(posts, self.pre_range)
 
        header_cell = self.string_cell(
 
            "",
 
            stylename=self.merge_styles(self.style_centertext, self.style_bold),
 
            numbercolumnsspanned=len(self.COLUMNS),
 
        )
 
        self.add_row()
 
        self.add_row(header_cell)
 
        start_row, end_row = self.write_posts(
 
            posts, statement_metakey, id_metakey, self.Display.HIDDEN,
 
        )
 
        if start_row == end_row:
 
            text = "No unreconciled postings found in prior period"
 
        else:
 
            text = "Unreconciled postings in prior period"
 
        header_cell.firstChild.addText(text)
 

	
 
    def write_bal_row(
 
            self,
 
            description: str,
 
            balance: core.Balance,
 
            balance_style: Union[None, str, odf.style.Style]=None,
 
            date: Optional[datetime.date]=None,
 
    ) -> odf.table.TableRow:
 
        balance_style = self.merge_styles(self.style_bold, balance_style)
 
        if date is None:
 
            date_cell = odf.table.TableCell()
 
        else:
 
            date_cell = self.date_cell(date, stylename=self.style_baldate)
 
        return self.add_row(
 
            date_cell,
 
            odf.table.TableCell(),
 
            self.string_cell(description, stylename=self.style_bold),
 
            self.balance_cell(balance, stylename=balance_style),
 
        )
 

	
 
    def _address(
 
            self,
 
            start_col: str,
 
            start_row: Optional[int]=None,
 
            end_row: Optional[int]=None,
 
            end_col: Optional[str]=None,
 
    ) -> str:
 
        if '.' not in start_col:
 
            start_col = f'.{start_col}'
 
        if start_row is None:
 
            start_row = 1
 
            if end_row is None:
 
                end_row = 2 ** 20
 
        if end_row is None:
 
            return f'[{start_col}{start_row}]'
 
        if end_col is None:
 
            end_col = start_col
 
        elif '.' not in end_col:
 
            end_col = f'.{end_col}'
 
        return f'[{start_col}{start_row}:{end_col}{end_row}]'
 

	
 
    def write_rec_posts(
 
            self,
 
            posts: core.RelatedPostings,
 
            statement_metakey: MetaKey,
 
            id_metakey: MetaKey,
 
            open_balance: core.Balance,
 
    ) -> None:
 
        posts = self._date_range_posts(posts, self.rec_range)
 
        period_bal = posts.balance()
 
        common_statement = self._most_common_first_link(posts, statement_metakey)
 
        self.add_row()
 
        self.write_bal_row(
 
            "Opening Balance", open_balance, self.style_endtotal, self.rec_range.start,
 
        )
 
        start_row, end_row = self.write_posts(
 
            posts, statement_metakey, id_metakey, self.Display.RECONCILED, common_statement,
 
        )
 
        post_range = self._address('D', start_row, end_row)
 
        rec_addr = self._address('D', end_row + 3)
 
        self.add_row()
 

	
 
        row = self.write_bal_row("Unreconciled Total", core.Balance(
 
            p.units for p in posts if p.meta.first_link(statement_metakey) != common_statement
 
        ), self.style_total)
 
        row.lastChild.setAttribute('formula', f'of:=SUM({post_range})-{rec_addr}')
 

	
 
        row = self.write_bal_row("Reconciled Total", core.Balance(
 
            p.units for p in posts if p.meta.first_link(statement_metakey) == common_statement
 
        ))
 
        stmt_range = self._address('F', start_row, end_row)
 
        if common_statement:
 
            _, stmt_op = next(self._meta_link_pairs((common_statement,)))
 
            stmt_op += '*'
 
        else:
 
            stmt_op = '<>'
 
        row.lastChild.setAttribute(
 
            'formula',
 
            f'of:=SUMIFS({post_range};{stmt_range};"{stmt_op}")',
 
        )
 

	
 
        row = self.write_bal_row("Period Total", period_bal, self.style_total)
 
        row.lastChild.setAttribute('formula', f'of:=SUM({post_range})')
 

	
 
        row = self.write_bal_row(
 
            "Ending Balance",
 
            open_balance + period_bal,
 
            self.style_bottomline,
 
            self.rec_range.stop - datetime.timedelta(days=1),
 
        )
 
        bal_addr = self._address('D', start_row - 2)
 
        row.lastChild.setAttribute('formula', f'of:=SUM({post_range})+{bal_addr}')
 

	
 
    def _get_metakey(self, account: data.Account, key: str, default: str) -> str:
 
        user_spec: Optional[str] = getattr(self, f'{key.lower()}_metakey')
 
        if user_spec is not None:
 
            return user_spec
 
        account_map: Mapping[str, MetaKey] = getattr(self, f'{key.upper()}_META_MAP')
 
        acct_key = account.is_under(*account_map)
 
        if acct_key is None:
 
            return default
 
        else:
 
            return account_map[acct_key]
 

	
 
    def write(self, rows: Iterable[data.Posting]) -> None:
 
        acct_posts = dict(core.RelatedPostings.group_by_account(
 
            post for post in rows
 
            if post.meta.date < self.post_range.stop
 
            and post.account.is_under(*self.accounts)
 
        ))
 
        for account in self.accounts:
 
            try:
 
                postings = acct_posts[account]
 
            except KeyError:
 
                postings = core.RelatedPostings()
 
            statement_metakey = self._get_metakey(account, 'statement', 'statement')
 
            id_metakey = self._get_metakey(account, 'id', 'lineno')
 
            open_balance = core.Balance()
 
            for post, open_balance in postings.iter_with_balance():
 
                if post.meta.date >= self.rec_range.start:
 
                    break
 
            after_posts = self._date_range_posts(postings, self.post_range)
 
            self.start_section(account)
 
            self.write_pre_posts(postings, statement_metakey, id_metakey)
 
            self.write_rec_posts(postings, statement_metakey, id_metakey, open_balance)
 
            self.write_posts(after_posts, statement_metakey, id_metakey)
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        required=True,
 
        help="""Date to start reconciliation, inclusive, in YYYY-MM-DD format.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to end reconciliation, exclusive, in YYYY-MM-DD format.
 
The default is one month after the start date.
 
""")
 
    parser.add_argument(
 
        '--account', '-a',
 
        dest='accounts',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""Reconcile this account. You can specify this option
 
multiple times. You can specify a part of the account hierarchy, or an account
 
classification from metadata. Default 'Cash'.
 
""")
 
    parser.add_argument(
 
        '--id-metadata-key', '-i',
 
        metavar='METAKEY',
 
        help="""Show the named metadata as a posting identifier.
 
Default varies by account.
 
""")
 
    parser.add_argument(
 
        '--statement-metadata-key', '-s',
 
        metavar='METAKEY',
 
        help="""Use the named metadata to determine which postings have already
 
been reconciled. Default varies by account.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is `LedgerReport_<StartDate>_<StopDate>.ods`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if not args.accounts:
 
        args.accounts = ['Cash']
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    if args.stop_date is None:
 
        year_diff, smonth = divmod(args.start_date.month, 12)
 
        syear = args.start_date.year + year_diff
 
        smonth += 1
 
        try:
 
            args.stop_date = datetime.date(syear, smonth, args.start_date.day)
 
        except ValueError:
 
            year_diff, smonth = divmod(smonth, 12)
 
            syear += year_diff
 
            smonth += 1
 
            args.stop_date = datetime.date(syear, smonth, 1)
 
    days_diff = args.stop_date - args.start_date
 
    pre_range = DateRange(args.start_date - days_diff, args.start_date)
 
    rec_range = DateRange(args.start_date, args.stop_date)
 
    post_range = DateRange(args.stop_date, args.stop_date + days_diff)
 

	
 
    returncode = os.EX_OK
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, options = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        entries, load_errors, options = books_loader.load_fy_range(pre_range.start, post_range.stop)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 

	
 
    data.Account.load_from_books(entries, options)
 
    real_accounts: Set[data.Account] = set()
 
    for account_spec in args.accounts:
 
        new_accounts = frozenset(
 
            account
 
            for account in data.Account.iter_accounts(account_spec)
 
            if account.is_open_on_date(args.start_date) is not False
 
        )
 
        if new_accounts:
 
            real_accounts.update(new_accounts)
 
        else:
 
            logger.critical("account %r did not match any open accounts", account_spec)
 
            return 2
 

	
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
    rt_wrapper = config.rt_wrapper()
 
    if rt_wrapper is None:
 
        logger.warning("could not initialize RT client; spreadsheet links will be broken")
 

	
 
    report = StatementReconciliation(
 
        rt_wrapper,
 
        pre_range,
 
        rec_range,
 
        post_range,
 
        real_accounts,
 
        args.statement_metadata_key,
 
        args.id_metadata_key,
 
    )
 
    report.write(data.Posting.from_entries(entries))
 
    if args.output_file is None:
 
        out_dir_path = config.repository_path() or Path()
 
        args.output_file = out_dir_path / 'ReconciliationReport_{}_{}.ods'.format(
 
            args.start_date.isoformat(),
 
            args.stop_date.isoformat(),
 
        )
 
        logger.info("Writing report to %s", args.output_file)
 
    ods_file = cliutil.bytes_output(args.output_file, stdout)
 
    report.save_file(ods_file)
 
    return returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
setup.py
Show inline comments
...
 
@@ -5,7 +5,7 @@ from setuptools import setup
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.16.2',
 
    version='1.17.0',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
...
 
@@ -54,6 +54,7 @@ setup(
 
            'pdfform-extract-irs990scheduleA = conservancy_beancount.pdfforms.extract.irs990scheduleA:entry_point',
 
            'pdfform-fill = conservancy_beancount.pdfforms.fill:entry_point',
 
            'reconcile-paypal = conservancy_beancount.reconcile.paypal:entry_point',
 
            'reconcile-statement = conservancy_beancount.reconcile.statement:entry_point',
 
            'split-ods-links = conservancy_beancount.tools.split_ods_links:entry_point',
 
        ],
 
    },
0 comments (0 inline, 0 general)