Ben Sturmfels (bsturmfels) - 3 years ago 2022-02-23 06:34:52
reconcile: Simplify multirecord sorting.
1 file changed with 13 insertions and 13 deletions:
@@ -152,67 +152,67 @@ def standardize_fr_record(row: Dict, line: int) -> Dict:
        'line': line,


def standardize_beancount_record(row) -> Dict:  # type: ignore[no-untyped-def]
    """Turn a Beancount query result row into a standard dict representing a transaction."""
    return {
        'amount': row.number_cost_position,
        'payee': remove_payee_junk(f'{row.payee or ""} {row.entity or ""} {row.narration or ""}'),
        'check_id': str(row.check_id or ''),
        'filename': row.filename,
        'line': row.line,
        'bank_statement': row.bank_statement,


def format_record(record: dict) -> str:
    if record['payee'] and record['check_id']:
        output = f"{record['date'].isoformat()}: {record['amount']:11,.2f} {record['payee'][:25]} #{record['check_id']}".ljust(59)
    elif record['payee']:
        output = f"{record['date'].isoformat()}: {record['amount']:11,.2f} {record['payee'][:35]}".ljust(59)
        output = f"{record['date'].isoformat()}: {record['amount']:11,.2f} #{record['check_id']}".ljust(59)
    return output


def format_multirecord(r1s, r2s, note):
    total = sum(x['amount'] for x in r2s)
    assert len(r1s) == 1
    assert len(r2s) > 1
    match_output = []
    match_output.append([r1s[0]['date'].isoformat() + ' ' + r1s[0]['payee'], f'{format_record(r1s[0])}  →  {format_record(r2s[0])}  ✓ Matched{note}'])
    for i, r2 in enumerate(r2s[1:]):
        match_output.append([r1s[0]['date'].isoformat() + str(i) + r1s[0]['payee'], f'{r1s[0]["date"].isoformat()}:             ↳                                    →  {format_record(r2)}  ✓ Matched{note}'])
    match_output.append([r1s[0]['date'], f'{format_record(r1s[0])}  →  {format_record(r2s[0])}  ✓ Matched{note}'])
    for r2 in r2s[1:]:
        match_output.append([r1s[0]['date'], f'{r1s[0]["date"].isoformat()}:             ↳                                    →  {format_record(r2)}  ✓ Matched{note}'])
    return match_output

def sort_records(records: List) -> List:
    return sorted(records, key=lambda x: (x['date'], x['amount']))


def first_word_exact_match(a, b):
    if len(a) == 0 or len(b) == 0:
        return 0
    first_a = a.split()[0].strip()
    first_b = b.split()[0].strip()
    if first_a.casefold() == first_b.casefold():
        return min(1.0, 0.2 * len(first_a))
        return 0;

def payee_match(a, b):
    fuzzy_match = fuzz.token_set_ratio(a, b) / 100.00
    first_word_match = first_word_exact_match(a, b)
    return max(fuzzy_match, first_word_match)

def records_match(r1: Dict, r2: Dict) -> Tuple[bool, str]:
    """Do these records represent the same transaction?"""

    date_score = date_proximity(r1['date'], r2['date'])
    if r1['date'] == r2['date']:
        date_message = ''
    elif date_score > 0.0:
        diff = abs((r1['date'] - r2['date']).days)
        date_message = f'+/- {diff} days'
        date_message = 'date mismatch'
@@ -261,77 +261,77 @@ def match_statement_and_books(statement_trans: list, books_trans: list):
    matches = []
    remaining_books_trans = []
    remaining_statement_trans = []

    for r1 in statement_trans:
        best_match_score = 0
        best_match_index = None
        best_match_note = ''
        matches_found = 0
        for i, r2 in enumerate(books_trans):
            score, note = records_match(r1, r2)
            if score >= 0.5 and score >= best_match_score:
                matches_found += 1
                best_match_score = score
                best_match_index = i
                best_match_note = note
        if best_match_score > 0.5 and matches_found == 1 and 'check-id mismatch' not in best_match_note or best_match_score > 0.8:
            if best_match_score <= 0.8:
                best_match_note.append('only one decent match')
            matches.append(([r1], [books_trans[best_match_index]], best_match_note))
            # Don't try to make a second match against this books entry.
            del books_trans[best_match_index]
    for r2 in books_trans:
    return matches, remaining_statement_trans, remaining_books_trans


def format_matches(matches, csv_statement: str, show_reconciled_matches):
    match_output = []
    for r1, r2, note in matches:
    for r1s, r2s, note in matches:
        note = ', '.join(note)
        note = ': ' + note if note else note
        if r1 and r2:
            if show_reconciled_matches and all(x['bank_statement'] for x in r2):
                if len(r2) == 1:
                    match_output.append([r1[0]['date'].isoformat() + r1[0]['payee'], f'{format_record(r1[0])}  →  {format_record(r2[0])}  ✓ Matched{note}'])
        if r1s and r2s:
            if show_reconciled_matches and all(x['bank_statement'] for x in r2s):
                if len(r2s) == 1:
                    match_output.append([r1s[0]['date'], f'{format_record(r1s[0])}  →  {format_record(r2s[0])}  ✓ Matched{note}'])
                    match_output.extend(format_multirecord(r1, r2, note))
        elif r1:
            match_output.append([r1[0]['date'].isoformat() + r1[0]['payee'], Fore.RED + Style.BRIGHT + f'{format_record(r1[0])}  →  {" ":^59}  ✗ NOT IN BOOKS ({os.path.basename(csv_statement)}:{r1[0]["line"]})' + Style.RESET_ALL])
                    match_output.extend(format_multirecord(r1s, r2s, note))
        elif r1s:
            match_output.append([r1s[0]['date'], Fore.RED + Style.BRIGHT + f'{format_record(r1s[0])}  →  {" ":^59}  ✗ NOT IN BOOKS ({os.path.basename(csv_statement)}:{r1s[0]["line"]})' + Style.RESET_ALL])
            match_output.append([r2[0]['date'].isoformat() + r2[0]['payee'], Fore.RED + Style.BRIGHT + f'{" ":^59}  →  {format_record(r2[0])}  ✗ NOT ON STATEMENT ({os.path.basename(r2[0]["filename"])}:{r2[0]["line"]})' + Style.RESET_ALL])
            match_output.append([r2s[0]['date'], Fore.RED + Style.BRIGHT + f'{" ":^59}  →  {format_record(r2s[0])}  ✗ NOT ON STATEMENT ({os.path.basename(r2s[0]["filename"])}:{r2s[0]["line"]})' + Style.RESET_ALL])
    return match_output


def date_proximity(d1, d2):
    diff = abs((d1 - d2).days)
    if diff > 60:
        return 0
        return 1.0 - (diff / 60.0)

def metadata_for_match(match, statement_filename, csv_filename):
    # Can we really ever have multiple statement entries? Probably not.
    statement_filename = get_repo_relative_path(statement_filename)
    csv_filename = get_repo_relative_path(csv_filename)
    metadata = []
    statement_entries, books_entries, _ = match
    for books_entry in books_entries:
        for statement_entry in statement_entries:
            if not books_entry['bank_statement']:
                metadata.append((books_entry['filename'], books_entry['line'], f'    bank-statement: {statement_filename}'))
                metadata.append((books_entry['filename'], books_entry['line'], f'    bank-statement-csv: {csv_filename}:{statement_entry["line"]}'))
    return metadata


# TODO: Is there a way to pull the side-effecting code out of this function?

def write_metadata_to_books(metadata_to_apply: List[Tuple[str, int, str]]) -> None:
    """Insert reconciliation metadata in the books files.

    Takes a list of edits to make as tuples of form (filename, lineno, metadata):

@@ -478,61 +478,61 @@ def main(args):

    books_balance_query = f"""SELECT sum(COST(position)) AS aa WHERE account = "{args.account}"
        AND date <= {end_date.isoformat()}"""
    result_types, result_rows = run_query(entries, options, books_balance_query, numberify=True)
    books_balance = result_rows[0][0] if result_rows else 0

    books_balance_reconciled_query = f"""SELECT sum(COST(position)) AS aa WHERE account = "{args.account}"
        AND date <= {end_date.isoformat()} AND META('bank-statement') != NULL"""
    result_types, result_rows = run_query(entries, options, books_balance_reconciled_query, numberify=True)
    books_balance_reconciled = result_rows[0][0] if result_rows else 0

    # String concatenation looks bad, but there's no SQL injection possible here
    # because BQL can't write back to the Beancount files. I hope!
    query = f'SELECT filename, META("lineno") AS line, META("bank-statement") AS bank_statement, date, number(cost(position)), payee, ENTRY_META("entity") as entity, ANY_META("check-id") as check_id, narration where account = "{args.account}" and date >= {begin_date} and date <= {end_date}'
    result_types, result_rows = run_query(entries, options, query)

    books_trans = sort_records([standardize_beancount_record(row) for row in result_rows])

    matches, remaining_statement_trans, remaining_books_trans = match_statement_and_books(statement_trans, books_trans)
    subset_matches, remaining_statement_trans, remaining_books_trans = subset_match(remaining_statement_trans, remaining_books_trans)
    unmatched = process_unmatched(remaining_statement_trans, remaining_books_trans)

    match_output = format_matches(matches, args.csv_statement, args.show_reconciled_matches)

    total_matched, total_missing_from_books, total_missing_from_statement = totals(matches)

    out = io.StringIO()
    print('-' * 155)
    print(f'{"Statement transaction":<52}            {"Books transaction":<58}   Notes')
    print('-' * 155)
    for _, output in sorted(match_output):
    for _, output in sorted(match_output, key=lambda x: x[0]):
    print('-' * 155)
    print(f'Statement period {begin_date} to {end_date}')
    print(f'Statement/cleared balance:    {args.statement_balance:12,.2f}    (as provided by you)')
    print(f'Books balance:                {books_balance:12,.2f}    (all transactions, includes unreconciled)')
    # print(f'Books balance (reconciled):   {books_balance_reconciled:12,.2f}    (transactions with "bank-statement" tag only)')
    # print(f'Matched above:                {total_matched:12,.2f}    ("bank-statement" tag yet to be applied)')
    print(f'Total not on statement:       {total_missing_from_statement:12,.2f}')
    print(f'Total not on books:           {total_missing_from_books:12,.2f}')
    print('-' * 155)

    # Write statement metadata back to books
    metadata_to_apply = []
    for match in matches:
        metadata_to_apply.extend(metadata_for_match(match, args.bank_statement, args.csv_statement))
    if metadata_to_apply and not args.non_interactive:
        print('Mark matched transactions as reconciled in the books? (y/N) ', end='')
        if input().lower() == 'y':


if __name__ == '__main__':
    args = parse_args(sys.argv)

# Local Variables:
# python-shell-interpreter: "/home/ben/\.virtualenvs/conservancy-beancount-py39/bin/python"
# End:
