Changeset - 20d242c7c753
[Not reviewed]
0 2 0
Ben Sturmfels (bsturmfels) - 2 years ago 2022-03-02 07:26:10
ben@sturm.com.au
reconcile: Fix a mutation bug causing not all matches to be passed through.
2 files changed with 54 insertions and 20 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reconcile/statement_reconciler.py
Show inline comments
...
 
@@ -83,67 +83,70 @@ Other related problems we're not dealing with here:
 
 - balance checks are manually updated in
 
   svn/Financial/Ledger/sanity-check-balances.yaml
 

	
 
 - transactions are entered manually and reconciled after the fact,
 
   but importing from statements may be useful in some cases
 

	
 
"""
 

	
 
# TODO:
 
#  - extract the magic numbers
 
#  - consider merging in helper.py
 

	
 
import argparse
 
import collections
 
import copy
 
import csv
 
import datetime
 
import decimal
 
import io
 
import itertools
 
import logging
 
import os
 
import re
 
import sys
 
from typing import Callable, Dict, List, Tuple, TextIO
 
from typing import Callable, Dict, List, Optional, Sequence, Tuple, TextIO
 

	
 
from beancount import loader
 
from beancount.query.query import run_query
 
from colorama import Fore, Style  # type: ignore
 

	
 
from .. import cliutil
 
from .. import config as configmod
 

	
 
if not sys.warnoptions:
 
    import warnings
 
    # Disable annoying warning from thefuzz prompting for a C extension. The
 
    # current pure-Python implementation isn't a bottleneck for us.
 
    warnings.filterwarnings('ignore', category=UserWarning, module='thefuzz.fuzz')
 
from thefuzz import fuzz  # type: ignore
 

	
 
logger = logging.getLogger()
 
logger.setLevel(logging.INFO)
 

	
 
# Console logging.
 
logger.addHandler(logging.StreamHandler())
 
PROGNAME = 'reconcile-statement'
 
logger = logging.getLogger(__name__)
 

	
 
# Get some interesting feedback on call to RT with this:
 
# logger.setLevel(logging.DEBUG)
 
# logger.addHandler(logging.StreamHandler())
 

	
 
JUNK_WORDS = [
 
    'software',
 
    'freedom',
 
    'conservancy',
 
    'conse',
 
    'payment',
 
    'echeck',
 
    'bill',
 
    'debit',
 
    'wire',
 
    'credit',
 
    "int'l",
 
    "in.l",
 
    'llc',
 
    'online',
 
    'donation',
 
    'usd',
 
    'inc',
 
]
 
JUNK_WORDS_RES = [re.compile(word, re.IGNORECASE) for word in JUNK_WORDS]
 
ZERO_RE = re.compile('^0+')
 

	
 

	
...
 
@@ -436,123 +439,139 @@ def write_metadata_to_books(metadata_to_apply: List[Tuple[str, int, str]]) -> No
 
            print(f'Wrote {filename}.')
 

	
 

	
 
def get_repo_relative_path(path: str) -> str:
 
    return os.path.relpath(path, start=os.getenv('CONSERVANCY_REPOSITORY'))
 

	
 

	
 
def parse_path(path: str) -> str:
 
    if not os.path.exists(path):
 
        raise argparse.ArgumentTypeError(f'File {path} does not exist.')
 
    return path
 

	
 

	
 
def parse_repo_relative_path(path: str) -> str:
 
    if not os.path.exists(path):
 
        raise argparse.ArgumentTypeError(f'File {path} does not exist.')
 
    repo = os.getenv('CONSERVANCY_REPOSITORY')
 
    if not repo:
 
        raise argparse.ArgumentTypeError('$CONSERVANCY_REPOSITORY is not set.')
 
    if not path.startswith(repo):
 
        raise argparse.ArgumentTypeError(f'File {path} does not share a common prefix with $CONSERVANCY_REPOSITORY {repo}.')
 
    return path
 

	
 

	
 
def parse_args(argv: List[str]) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(description='Reconciliation helper')
 
def parse_decimal_with_separator(number_text: str) -> decimal.Decimal:
 
    """decimal.Decimal can't parse numbers with thousands separator."""
 
    number_text = number_text.replace(',', '')
 
    return decimal.Decimal(number_text)
 

	
 

	
 
def parse_arguments(argv: List[str]) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME, description='Reconciliation helper')
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument('--beancount-file', required=True, type=parse_path)
 
    parser.add_argument('--csv-statement', required=True, type=parse_repo_relative_path)
 
    parser.add_argument('--bank-statement', required=True, type=parse_repo_relative_path)
 
    parser.add_argument('--account', required=True, help='eg. Liabilities:CreditCard:AMEX')
 
    parser.add_argument('--grep-output-filename')
 
    # parser.add_argument('--report-group-regex')
 
    parser.add_argument('--show-reconciled-matches', action='store_true')
 
    parser.add_argument('--statement-balance', type=decimal.Decimal, required=True, help="A.K.A \"cleared balance\" taken from the end of the period on the PDF statement. Required because CSV statements don't include final or running totals")
 
    parser.add_argument('--statement-balance', type=parse_decimal_with_separator, required=True, help="A.K.A \"cleared balance\" taken from the end of the period on the PDF statement. Required because CSV statements don't include final or running totals")
 
    parser.add_argument('--non-interactive', action='store_true', help="Don't prompt to write to the books")
 
    return parser.parse_args(args=argv[1:])
 
    args = parser.parse_args(args=argv)
 
    return args
 

	
 

	
 
def totals(matches: List[Tuple[List, List, List]]) -> Tuple[decimal.Decimal, decimal.Decimal, decimal.Decimal]:
 
    total_matched = decimal.Decimal(0)
 
    total_missing_from_books = decimal.Decimal(0)
 
    total_missing_from_statement = decimal.Decimal(0)
 
    for statement_entries, books_entries, _ in matches:
 
        if statement_entries and books_entries:
 
            total_matched += sum(c['amount'] for c in statement_entries)
 
        elif statement_entries:
 
            total_missing_from_books += sum(c['amount'] for c in statement_entries)
 
        else:
 
            total_missing_from_statement += sum(c['amount'] for c in books_entries)
 
    return total_matched, total_missing_from_books, total_missing_from_statement
 

	
 

	
 
def subset_match(statement_trans: List[dict], books_trans: List[dict]) -> Tuple[List[Tuple[List, List, List]], List[Dict], List[Dict]]:
 
    matches = []
 
    remaining_books_trans = []
 
    remaining_statement_trans = []
 

	
 
    groups = itertools.groupby(books_trans, key=lambda x: (x['date'], x['payee']))
 
    for _, group in groups:
 
        best_match_score = 0.0
 
        best_match_index = None
 
        best_match_note = []
 
        matches_found = 0
 

	
 
        group_items = list(group)
 
        total = sum(x['amount'] for x in group_items)
 
        r2 = copy.copy(group_items[0])
 
        r2['amount'] = total
 
        for i, r1 in enumerate(statement_trans):
 
            score, note = records_match(r1, r2)
 
            if score >= 0.5 and score >= best_match_score:
 
                matches_found += 1
 
                best_match_score = score
 
                best_match_index = i
 
                best_match_note = note
 
        if best_match_score > 0.5 and matches_found == 1 and 'check-id mismatch' not in best_match_note or best_match_score > 0.8:
 
            matches.append(([statement_trans[best_match_index]], group_items, best_match_note))
 
            if best_match_index is not None:
 
                del statement_trans[best_match_index]
 
            for item in group_items:
 
                # TODO: Why?
 
                books_trans.remove(item)
 
        else:
 
            remaining_books_trans.append(r2)
 
    for r1 in statement_trans:
 
        remaining_statement_trans.append(r1)
 
    return matches, remaining_statement_trans, remaining_books_trans
 

	
 

	
 
def process_unmatched(statement_trans: List[dict], books_trans: List[dict]) -> List[Tuple[List, List, List]]:
 
    matches: List[Tuple[List, List, List]] = []
 
    for r1 in statement_trans:
 
        matches.append(([r1], [], ['no match']))
 
    for r2 in books_trans:
 
        matches.append(([], [r2], ['no match']))
 
    return matches
 

	
 

	
 
def main(args: argparse.Namespace) -> None:
 
def main(arglist: Optional[Sequence[str]] = None,
 
         stdout: TextIO = sys.stdout,
 
         stderr: TextIO = sys.stderr,
 
         config: Optional[configmod.Config] = None,
 
         ) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    # TODO: Should put in a sanity check to make sure the statement you're feeding
 
    # in matches the account you've provided.
 

	
 
    # TODO: Can we open the files first, then pass the streams on to the rest of the program?
 

	
 
    if 'AMEX' in args.account:
 
        validate_csv = validate_amex_csv
 
        standardize_statement_record = standardize_amex_record
 
    else:
 
        validate_csv = validate_fr_csv
 
        standardize_statement_record = standardize_fr_record
 

	
 
    with open(args.csv_statement) as f:
 
        sample = f.read(200)
 
        validate_csv(sample, args.account)
 
        f.seek(0)
 
        statement_trans = read_transactions_from_csv(f, standardize_statement_record)
 

	
 
    begin_date = statement_trans[0]['date']
 
    end_date = statement_trans[-1]['date']
 

	
 
    # Do we traverse and filter the in-memory entries list and filter that, or do we
 
    # use Beancount Query Language (BQL) to get a list of transactions? Currently
 
    # using BQL.
...
 
@@ -586,31 +605,28 @@ def main(args: argparse.Namespace) -> None:
 

	
 
    print('-' * 155)
 
    print(f'{"Statement transaction":<52}            {"Books transaction":<58}   Notes')
 
    print('-' * 155)
 
    for _, output in sorted(match_output, key=lambda x: x[0]):
 
        print(output)
 
    print('-' * 155)
 
    print(f'Statement period {begin_date} to {end_date}')
 
    print(f'Statement/cleared balance:    {args.statement_balance:12,.2f}    (as provided by you)')
 
    print(f'Books balance:                {books_balance:12,.2f}    (all transactions, includes unreconciled)')
 
    print(f'Total not on statement:       {total_missing_from_statement:12,.2f}')
 
    print(f'Total not on books:           {total_missing_from_books:12,.2f}')
 
    print('-' * 155)
 

	
 
    # Write statement metadata back to books
 
    metadata_to_apply = []
 
    for match in matches:
 
        metadata_to_apply.extend(metadata_for_match(match, args.bank_statement, args.csv_statement))
 
    if metadata_to_apply and not args.non_interactive:
 
        print('Mark matched transactions as reconciled in the books? (y/N) ', end='')
 
        if input().lower() == 'y':
 
            write_metadata_to_books(metadata_to_apply)
 

	
 

	
 
if __name__ == '__main__':
 
    args = parse_args(sys.argv)
 
    main(args)
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
def entry_point():
 
    args = parse_args(sys.argv)
 
    main(args)
 
if __name__ == '__main__':
 
    exit(entry_point())
tests/test_reconcile.py
Show inline comments
...
 
@@ -287,24 +287,42 @@ def test_totals():
 
    assert totals([
 
        ([S1], [B1], []),
 
        ([S2], [], []),
 
        ([], [B3_next_day], []),
 
    ]) == (decimal.Decimal('10'), decimal.Decimal('20'), decimal.Decimal('30'))
 

	
 
def test_payee_not_considered_if_check_id_present():
 
    # These records match aside from check-id.
 
    statement = [S3]
 
    books = [B3_unmatched_check_id]
 
    assert match_statement_and_books(statement, books) == (
 
        [],
 
        [S3],
 
        [B3_unmatched_check_id],
 
    )
 

	
 
def test_subset_sum_match():
 
    statement = [S4]
 
    books = [B4A, B4B, B4C]
 
    assert subset_match(statement, books) == (
 
        [([S4], [B4A, B4B, B4C], [])],
 
        [],  # No remaining statement trans.
 
        [],  # No remaining books trans.
 
    )
 

	
 
def test_subset_passes_through_all_non_matches():
 
    """This was used to locate a bug where some of the non-matches had
 
    gone missing due to mutation of books_trans."""
 
    statement_trans = [
 
        S1,  # No match
 
        S4,  # Match
 
    ]
 
    books_trans = [
 
        B2,  # No match
 
        B4A, B4B, B4C, # Match
 
        B3_next_day, B3_next_week,  # No match
 
    ]
 
    assert subset_match(statement_trans, books_trans) == (
 
        [([S4], [B4A, B4B, B4C], [])],  # Matched
 
        [S1],  # No match: preserved intact
 
        [B2, B3_next_day, B3_next_week]  # No match: preserved intact
 
    )
0 comments (0 inline, 0 general)