Changeset - 8b08997fda07
[Not reviewed]
0 2 0
Ben Sturmfels (bsturmfels) - 19 months ago 2023-02-11 05:00:21
ben@sturm.com.au
reconciler: Add --full-months option to round statement dates to month boundaries
2 files changed with 38 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reconcile/statement_reconciler.py
Show inline comments
...
 
@@ -234,128 +234,142 @@ def validate_fr_csv(sample: str) -> None:
 
        date = datetime.datetime.strptime(row[1], '%m/%d/%Y')
 
    except ValueError:
 
        pass
 
    amount_found = '$' in row[4] and '$' in row[5]
 
    if len(row) != 6 or not date or not amount_found:
 
        sys.exit("This First Republic CSV doesn't seem to have the 6 columns we're expecting, including a date in column 2 and an amount in columns 5 and 6. Please use an unmodified statement direct from the institution.")
 

	
 

	
 
def standardize_fr_record(line, row):
 
    record = {
 
        'date': datetime.datetime.strptime(row[1], '%m/%d/%Y').date(),
 
        'amount': parse_amount(row[4]),
 
        'payee': remove_payee_junk(row[3] or '')[:20],
 
        'check_id': row[2].replace('CHECK  ', '') if 'CHECK  ' in row[2] else '',
 
        'line': line,
 
    }
 
    return record
 

	
 

	
 
def read_fr_csv(f: TextIO) -> list:
 
    reader = csv.reader(f)
 
    # The reader.line_num is the source line number, not the spreadsheet row
 
    # number due to multi-line records.
 
    return sort_records(
 
        standardize_fr_record(i, row) for i, row in enumerate(reader, 1)
 
        if len(row) == 6 and row[2] not in {'LAST STATEMENT', 'THIS STATEMENT'}
 
    )
 

	
 

	
 
def standardize_beancount_record(row) -> Dict:  # type: ignore[no-untyped-def]
 
    """Turn a Beancount query result row into a standard dict representing a transaction."""
 
    return {
 
        'date': row.date,
 
        'amount': row.number_cost_position,
 
        'payee': remove_payee_junk(f'{row.payee or ""} {row.entity or ""} {row.narration or ""}'),
 
        'check_id': str(row.check_id or ''),
 
        'filename': row.filename,
 
        'line': row.line,
 
        'bank_statement': row.bank_statement,
 
    }
 

	
 

	
 
def format_record(record: dict) -> str:
 
    """Generate output lines for a standard 1:1 match."""
 
    if record['payee'] and record['check_id']:
 
        output = f"{record['date'].isoformat()}: {record['amount']:11,.2f} {record['payee'][:25]} #{record['check_id']}".ljust(59)
 
    elif record['payee']:
 
        output = f"{record['date'].isoformat()}: {record['amount']:11,.2f} {record['payee'][:35]}".ljust(59)
 
    else:
 
        output = f"{record['date'].isoformat()}: {record['amount']:11,.2f} #{record['check_id']}".ljust(59)
 
    return output
 

	
 

	
 
def format_multirecord(r1s: List[dict], r2s: List[dict], note: str) -> List[list]:
 
    """Generates output lines for one statement:multiple books transaction match."""
 
    assert len(r1s) == 1
 
    assert len(r2s) > 1
 
    match_output = []
 
    match_output.append([r1s[0]['date'], f'{format_record(r1s[0])}  →  {format_record(r2s[0])}  ✓ Matched{note}'])
 
    for r2 in r2s[1:]:
 
        match_output.append([r1s[0]['date'], f'{r1s[0]["date"].isoformat()}:             ↳                                    →  {format_record(r2)}  ✓ Matched{note}'])
 
    return match_output
 

	
 

	
 
def _start_of_month(time, offset_months=0):
 
    if offset_months > 0:
 
        return _start_of_month(time.replace(day=28) + datetime.timedelta(days=4), offset_months - 1)
 
    else:
 
        return time.replace(day=1)
 

	
 

	
 
def round_to_month(begin_date, end_date):
 
    """Round a beginning and end date to beginning and end of months respectively."""
 
    return (
 
        _start_of_month(begin_date),
 
        _start_of_month(end_date, offset_months=1) - datetime.timedelta(days=1))
 

	
 

	
 
def sort_records(records: List) -> List:
 
    return sorted(records, key=lambda x: (x['date'], x['amount']))
 

	
 

	
 
def first_word_exact_match(a: str, b: str) -> float:
 
    """Score a payee match based first word.
 

	
 
    We get a whole lot of good matches this way. Helps in the
 
    situation where the first word or two of a transaction description
 
    is useful and the rest is garbage.
 

	
 
    """
 
    if len(a) == 0 or len(b) == 0:
 
        return 0.0
 
    first_a = a.split()[0].strip()
 
    first_b = b.split()[0].strip()
 
    if first_a.casefold() == first_b.casefold():
 
        return min(1.0, 0.2 * len(first_a))
 
    else:
 
        return 0.0
 

	
 

	
 
def payee_match(a: str, b: str) -> float:
 
    """Score a match between two payees."""
 
    fuzzy_match = float(fuzz.token_set_ratio(a, b) / 100.00)
 
    first_word_match = first_word_exact_match(a, b)
 
    return max(fuzzy_match, first_word_match)
 

	
 

	
 
def records_match(r1: Dict, r2: Dict) -> Tuple[float, List[str]]:
 
    """Do these records represent the same transaction?"""
 
    date_score = date_proximity(r1['date'], r2['date'])
 
    if r1['date'] == r2['date']:
 
        date_message = ''
 
    elif date_score > 0.0:
 
        diff = abs((r1['date'] - r2['date']).days)
 
        date_message = f'+/- {diff} days'
 
    else:
 
        date_message = 'date mismatch'
 

	
 
    if r1['amount'] == r2['amount']:
 
        amount_score, amount_message = 2.0, ''
 
    else:
 
        amount_score, amount_message = 0.0, 'amount mismatch'
 

	
 
    # We never consider payee if there's a check_id in the books.
 
    check_message = ''
 
    payee_message = ''
 
    # Sometimes we get unrelated numbers in the statement column with check-ids,
 
    # so we can't match based on the existence of a statement check-id.
 
    if r2['check_id']:
 
        payee_score = 0.0
 
        if r1['check_id'] and r2['check_id'] and r1['check_id'] == r2['check_id']:
 
            check_score = 1.0
 
        else:
 
            check_message = 'check-id mismatch'
 
            check_score = 0.0
 
    else:
 
        check_score = 0.0
 
        payee_score = payee_match(r1['payee'], r2['payee'])
 
        if payee_score > FULL_MATCH_THRESHOLD:
 
            payee_message = ''
 
        elif payee_score > PARTIAL_MATCH_THRESHOLD:
 
            payee_message = 'partial payee match'
...
 
@@ -535,203 +549,207 @@ def write_metadata_to_books(metadata_to_apply: List[Tuple[str, int, str]]) -> No
 
        if filename not in file_contents:
 
            with open(filename, 'r') as f:
 
                file_contents[filename] = f.readlines()
 
        # Insert is inefficient, but fast enough for now in practise.
 
        file_contents[filename].insert(line + file_offsets[filename], metadata.rstrip() + '\n')
 
        file_offsets[filename] += 1
 
    # Writes each updated file back to disk.
 
    for filename, contents in file_contents.items():
 
        with open(filename, 'w') as f:
 
            f.writelines(contents)
 
            print(f'Wrote {filename}.')
 

	
 

	
 
def get_repo_relative_path(path: str) -> str:
 
    """Chop off the unique per-person CONSERVANCY_REPOSITORY.
 

	
 
    CSV and PDF statement metadata should be relative to
 
    CONSERVANCY_REPOSITORY ie. without regards to exactly where on
 
    your computer all the files live.
 

	
 
    """
 
    return os.path.relpath(path, start=os.getenv('CONSERVANCY_REPOSITORY'))
 

	
 

	
 
def parse_path(path: str) -> str:
 
    """Validate that a file exists for use in argparse."""
 
    if not os.path.exists(path):
 
        raise argparse.ArgumentTypeError(f'File {path} does not exist.')
 
    return path
 

	
 

	
 
def parse_repo_relative_path(path: str) -> str:
 
    """Validate that a file exists and is within $CONSERVANCY_REPOSITORY.
 

	
 
    For use with argparse.
 

	
 
    """
 
    if not os.path.exists(path):
 
        raise argparse.ArgumentTypeError(f'File {path} does not exist.')
 
    repo = os.getenv('CONSERVANCY_REPOSITORY')
 
    if not repo:
 
        raise argparse.ArgumentTypeError('$CONSERVANCY_REPOSITORY is not set.')
 
    if not path.startswith(repo):
 
        raise argparse.ArgumentTypeError(f'File {path} does not share a common prefix with $CONSERVANCY_REPOSITORY {repo}.')
 
    return path
 

	
 

	
 
def parse_decimal_with_separator(number_text: str) -> decimal.Decimal:
 
    """decimal.Decimal can't parse numbers with thousands separator."""
 
    number_text = number_text.replace(',', '')
 
    return decimal.Decimal(number_text)
 

	
 

	
 
def parse_arguments(argv: List[str]) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME, description='Reconciliation helper')
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument('--beancount-file', required=True, type=parse_path)
 
    parser.add_argument('--csv-statement', required=True, type=parse_repo_relative_path)
 
    parser.add_argument('--bank-statement', required=True, type=parse_repo_relative_path)
 
    parser.add_argument('--account', required=True, help='eg. Liabilities:CreditCard:AMEX')
 
    # parser.add_argument('--report-group-regex')
 
    parser.add_argument('--show-reconciled-matches', action='store_true')
 
    parser.add_argument('--non-interactive', action='store_true', help="Don't prompt to write to the books")    # parser.add_argument('--statement-balance', type=parse_decimal_with_separator, required=True, help="A.K.A \"cleared balance\" taken from the end of the period on the PDF statement. Required because CSV statements don't include final or running totals")
 
    parser.add_argument('--full-months', action='store_true', help='Match payments over the full month, rather that just between the beginning and end dates of the CSV statement')
 
    args = parser.parse_args(args=argv)
 
    return args
 

	
 

	
 
def totals(matches: List[Tuple[List, List, List]]) -> Tuple[decimal.Decimal, decimal.Decimal, decimal.Decimal]:
 
    """Calculate the totals of transactions matched/not-matched."""
 
    total_matched = decimal.Decimal(0)
 
    total_missing_from_books = decimal.Decimal(0)
 
    total_missing_from_statement = decimal.Decimal(0)
 
    for statement_entries, books_entries, _ in matches:
 
        if statement_entries and books_entries:
 
            total_matched += sum(c['amount'] for c in statement_entries)
 
        elif statement_entries:
 
            total_missing_from_books += sum(c['amount'] for c in statement_entries)
 
        else:
 
            total_missing_from_statement += sum(c['amount'] for c in books_entries)
 
    return total_matched, total_missing_from_books, total_missing_from_statement
 

	
 

	
 
def process_unmatched(statement_trans: List[dict], books_trans: List[dict]) -> List[Tuple[List, List, List]]:
 
    """Format the remaining unmatched transactions to be added to one single list of matches."""
 
    matches: List[Tuple[List, List, List]] = []
 
    for r1 in statement_trans:
 
        matches.append(([r1], [], ['no match']))
 
    for r2 in books_trans:
 
        matches.append(([], [r2], ['no match']))
 
    return matches
 

	
 

	
 
def format_output(matches, begin_date, end_date, csv_statement, show_reconciled_matches) -> str:
 
    with io.StringIO() as out:
 
        match_output = format_matches(matches, csv_statement, show_reconciled_matches)
 
        _, total_missing_from_books, total_missing_from_statement = totals(matches)
 
        print('-' * 155, file=out)
 
        statement_heading = f'Statement transactions {begin_date} to {end_date}'
 
        print(f'{statement_heading:<52}            {"Books transactions":<58}   Notes', file=out)
 
        print('-' * 155, file=out)
 
        for _, output in sorted(match_output, key=lambda x: x[0]):
 
            print(output, file=out)
 
        print('-' * 155, file=out)
 
        print(f'Sub-total not on statement: {total_missing_from_statement:12,.2f}', file=out)
 
        print(f'Sub-total not in books:     {total_missing_from_books:12,.2f}', file=out)
 
        print(f'Total:                      {total_missing_from_statement + total_missing_from_books:12,.2f}', file=out)
 
        print('-' * 155, file=out)
 
        return out.getvalue()
 

	
 

	
 
def main(arglist: Optional[Sequence[str]] = None,
 
         stdout: TextIO = sys.stdout,
 
         stderr: TextIO = sys.stderr,
 
         config: Optional[configmod.Config] = None,
 
         ) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    # Validate and normalise the statement into our standard
 
    # transaction data structure.
 
    if 'AMEX' in args.account:
 
        validate_csv = validate_amex_csv
 
        read_csv = read_amex_csv
 
    else:
 
        validate_csv = validate_fr_csv
 
        read_csv = read_fr_csv
 

	
 
    with open(args.csv_statement) as f:
 
        sample = f.read(200)
 
        # Validate should return true/false and a message.
 
        validate_csv(sample)
 
        f.seek(0)
 
        # TODO: Needs a custom read_transactions_from_csv for each of AMEX and
 
        # FR since AMEX has a header row and FR doesn't.
 
        statement_trans = read_csv(f)
 

	
 
    # Dates are taken from the beginning/end of the statement.
 
    begin_date = statement_trans[0]['date']
 
    end_date = statement_trans[-1]['date']
 

	
 
    if args.full_months:
 
        begin_date, end_date = round_to_month(begin_date, end_date)
 

	
 
    # Query for the Beancount books data for this above period.
 
    #
 
    # There are pros and cons for using Beancount's in-memory entries
 
    # list directly and also for using Beancount Query Language (BQL)
 
    # to get a list of transactions? Using BQL because it's
 
    # convenient, but we don't have access to the full transaction
 
    # entry objects. Feels a bit strange that these approaches are so
 
    # disconnected.
 
    #
 
    # beancount.query.query_compile.compile() and
 
    # beancount.query.query_execute.filter_entries() look useful in this respect,
 
    # but I'm not clear on how to use compile(). An example would help.
 
    entries, _, options = loader.load_file(args.beancount_file)
 
    # String concatenation looks bad, but there's no SQL injection possible here
 
    # because BQL can't write back to the Beancount files. I hope!
 
    query = f"""
 
        SELECT filename,
 
        META("lineno") AS line,
 
        META("bank-statement") AS bank_statement,
 
        date,
 
        number(cost(position)),
 
        payee,
 
        ENTRY_META("entity") as entity,
 
        ANY_META("check-id") as check_id,
 
        narration
 
        WHERE account = "{args.account}"
 
            AND date >= {begin_date}
 
            AND date <= {end_date}"""
 
    _, result_rows = run_query(entries, options, query)
 
    books_trans = sort_records([standardize_beancount_record(row) for row in result_rows])
 

	
 
    # Apply two passes of matching, one for standard matches and one
 
    # for subset matches.
 
    matches, remaining_statement_trans, remaining_books_trans = match_statement_and_books(statement_trans, books_trans)
 
    subset_matches, remaining_statement_trans, remaining_books_trans = subset_match(
 
        remaining_statement_trans, remaining_books_trans)
 
    matches.extend(subset_matches)
 

	
 
    # Add the remaining unmatched to make one big list of matches, successful or not.
 
    unmatched = process_unmatched(remaining_statement_trans, remaining_books_trans)
 
    matches.extend(unmatched)
 

	
 
    # Print out results of our matching.
 
    print(format_output(matches, begin_date, end_date, args.csv_statement, args.show_reconciled_matches))
 

	
 
    # Write statement metadata back to the books.
 
    metadata_to_apply = []
 
    for match in matches:
 
        metadata_to_apply.extend(metadata_for_match(match, args.bank_statement, args.csv_statement))
 
    if metadata_to_apply and not args.non_interactive:
 
        print('Mark matched transactions as reconciled in the books? (y/N) ', end='')
 
        if input().lower() == 'y':
 
            write_metadata_to_books(metadata_to_apply)
 

	
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
tests/test_reconcile.py
Show inline comments
 
import datetime
 
import decimal
 
import io
 
import os
 
import tempfile
 
import textwrap
 

	
 
import pytest
 

	
 
from conservancy_beancount.reconcile.statement_reconciler import (
 
    date_proximity,
 
    format_output,
 
    match_statement_and_books,
 
    metadata_for_match,
 
    payee_match,
 
    read_amex_csv,
 
    read_fr_csv,
 
    remove_duplicate_words,
 
    remove_payee_junk,
 
    round_to_month,
 
    subset_match,
 
    totals,
 
    write_metadata_to_books,
 
)
 

	
 
# These data structures represent individual transactions as taken from the
 
# statement ("S") or the books ("B").
 

	
 
# Statement transaction examples.
 
S1 = {
 
    'date': datetime.date(2022, 1, 1),
 
    'amount': decimal.Decimal('10.00'),
 
    'payee': 'Patreon         / Patreon   / 123456/ ST-A1B2C3D4G5H6       /',
 
    'check_id': '',
 
    'line': 222,
 
}
 
S2 = {
 
    'date': datetime.date(2022, 1, 2),
 
    'amount': decimal.Decimal('20.00'),
 
    'payee': 'BT*LINODE           PHILADELPHIA        P',
 
    'check_id': '',
 
    'line': 333,
 
}
 
S3 = {
 
    'date': datetime.date(2022, 1, 3),
 
    'amount': decimal.Decimal('30.00'),
 
    'payee': 'USPS PO 4067540039 0PORTLAND            OR',
 
    'check_id': '',
 
    'line': 444,
 
}
 
S4 = {
 
    'date': datetime.date(2022, 8, 11),
 
    'amount': decimal.Decimal('-2260.00'),
 
    'payee': 'Trust 0000000362 210',
 
    'check_id': '',
 
    'line': 555,
 
}
 

	
 
# Books transaction examples.
 
B1 = {
 
    'date': datetime.date(2022, 1, 1),
 
    'amount': decimal.Decimal('10.00'),
 
    'payee': 'Patreon',
 
    'check_id': '',
 
    'filename': '2022/imports.beancount',
 
    'line': 777,
 
    'bank_statement': '',
 
}
 
B2 = {
 
    'date': datetime.date(2022, 1, 2),
 
    'amount': decimal.Decimal('20.00'),
 
    'payee': 'Linode',
 
    'check_id': '',
 
    'filename': '2022/main.beancount',
 
    'line': 888,
 
    'bank_statement': "Financial/Bank-Statements/AMEX/2022-01-12_AMEX_statement.pdf"
 
}
 
B3_next_day = {
 
    'date': datetime.date(2022, 1, 4),
 
    'amount': decimal.Decimal('30.00'),
 
    'payee': 'USPS',
 
    'check_id': '',
 
    'filename': '2022/main.beancount',
 
    'line': 999,
...
 
@@ -327,64 +330,81 @@ def test_subset_sum_match():
 
        [],  # No remaining books trans.
 
    )
 

	
 

	
 
def test_subset_passes_through_all_non_matches():
 
    """This was used to locate a bug where some of the non-matches had
 
    gone missing due to mutation of books_trans."""
 
    statement_trans = [
 
        S1,  # No match
 
        S4,  # Match
 
    ]
 
    books_trans = [
 
        B2,  # No match
 
        B4A, B4B, B4C,  # Match
 
        B3_next_day, B3_next_week,  # No match
 
    ]
 
    assert subset_match(statement_trans, books_trans) == (
 
        [([S4], [B4A, B4B, B4C], [])],  # Matched
 
        [S1],  # No match: preserved intact
 
        [B2, B3_next_day, B3_next_week]  # No match: preserved intact
 
    )
 

	
 

	
 
def test_handles_amex_csv():
 
    CSV = """Date,Receipt,Description,Card Member,Account #,Amount,Extended Details,Appears On Your Statement As,Address,City/State,Zip Code,Country,Reference,Category\n08/19/2021,,Gandi.net           San Francisco,RODNEY R BROWN,-99999,28.15,"00000009999 00000009999999999999\nGandi.net\nSan Francisco\n00000009999999999999",Gandi.net           San Francisco,"NEPTUNUSSTRAAT 41-63\nHOOFDDORP",,2132 JA,NETHERLANDS (THE),'999999999999999999',Merchandise & Supplies-Internet Purchase\n"""
 
    expected = [
 
        {
 
            'date': datetime.date(2021, 8, 19),
 
            'amount': decimal.Decimal('-28.15'),
 
            'payee': 'Gandi San Francisco',
 
            'check_id': '',
 
            'line': 2,
 
        },
 
    ]
 
    assert read_amex_csv(io.StringIO(CSV)) == expected
 

	
 

	
 
def test_handles_fr_csv():
 
    CSV = """"DD99999999999","03/31/2022","LAST STATEMENT","","","$1,000.00"\n"9999999999999","04/01/2022","INCOMING WIRE","GONDOR S.S. A111111111BCDE0F","$6.50","$1,006.50"\n"DD99999999999","04/18/2022","CHECK  3741","","$-4.50","$1,002.00"\n"DD99999999999","04/30/2022","THIS STATEMENT","","","$102.00"\n"""
 
    expected = [
 
        {
 
            'date': datetime.date(2022, 4, 1),
 
            'amount': decimal.Decimal('6.50'),
 
            'payee': 'GONDOR S.S. A1111111',
 
            'check_id': '',
 
            'line': 2,
 
        },
 
        {
 
            'date': datetime.date(2022, 4, 18),
 
            'amount': decimal.Decimal('-4.50'),
 
            'payee': '',
 
            'check_id': '3741',
 
            'line': 3,
 
        },
 
    ]
 
    assert read_fr_csv(io.StringIO(CSV)) == expected
 

	
 

	
 
def test_format_output():
 
    statement = [S1]
 
    books = [B1]
 
    matches, _, _ = match_statement_and_books(statement, books)
 
    output = format_output(matches, datetime.date(2022, 1, 1), datetime.date(2022, 2, 1), 'test.csv', True)
 
    assert '2022-01-01:       10.00 Patreon         / Patreon   / 12345  →  2022-01-01:       10.00 Patreon                              ✓ Matched' in output
 

	
 

	
 
month_test_data = [
 
    ((datetime.date(2022, 1, 2), datetime.date(2022, 1, 30)),
 
     (datetime.date(2022, 1, 1), datetime.date(2022, 1, 31))),
 
    ((datetime.date(2022, 4, 2), datetime.date(2022, 4, 29)),
 
     (datetime.date(2022, 4, 1), datetime.date(2022, 4, 30))),
 
    ((datetime.date(2022, 2, 2), datetime.date(2022, 2, 27)),
 
     (datetime.date(2022, 2, 1), datetime.date(2022, 2, 28))),
 
    ((datetime.date(2024, 2, 2), datetime.date(2024, 2, 27)),
 
     (datetime.date(2024, 2, 1), datetime.date(2024, 2, 29))),
 
]
 

	
 

	
 
@pytest.mark.parametrize('input_dates,rounded_dates', month_test_data)
 
def test_rounds_to_full_month(input_dates, rounded_dates):
 
    assert round_to_month(*input_dates) == rounded_dates
0 comments (0 inline, 0 general)