Changeset - 8b08997fda07
[Not reviewed]
0 2 0
Ben Sturmfels (bsturmfels) - 16 months ago 2023-02-11 05:00:21
ben@sturm.com.au
reconciler: Add --full-months option to round statement dates to month boundaries
2 files changed with 38 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reconcile/statement_reconciler.py
Show inline comments
...
 
@@ -250,96 +250,110 @@ def standardize_fr_record(line, row):
 
    return record
 

	
 

	
 
def read_fr_csv(f: TextIO) -> list:
 
    reader = csv.reader(f)
 
    # The reader.line_num is the source line number, not the spreadsheet row
 
    # number due to multi-line records.
 
    return sort_records(
 
        standardize_fr_record(i, row) for i, row in enumerate(reader, 1)
 
        if len(row) == 6 and row[2] not in {'LAST STATEMENT', 'THIS STATEMENT'}
 
    )
 

	
 

	
 
def standardize_beancount_record(row) -> Dict:  # type: ignore[no-untyped-def]
 
    """Turn a Beancount query result row into a standard dict representing a transaction."""
 
    return {
 
        'date': row.date,
 
        'amount': row.number_cost_position,
 
        'payee': remove_payee_junk(f'{row.payee or ""} {row.entity or ""} {row.narration or ""}'),
 
        'check_id': str(row.check_id or ''),
 
        'filename': row.filename,
 
        'line': row.line,
 
        'bank_statement': row.bank_statement,
 
    }
 

	
 

	
 
def format_record(record: dict) -> str:
 
    """Generate output lines for a standard 1:1 match."""
 
    if record['payee'] and record['check_id']:
 
        output = f"{record['date'].isoformat()}: {record['amount']:11,.2f} {record['payee'][:25]} #{record['check_id']}".ljust(59)
 
    elif record['payee']:
 
        output = f"{record['date'].isoformat()}: {record['amount']:11,.2f} {record['payee'][:35]}".ljust(59)
 
    else:
 
        output = f"{record['date'].isoformat()}: {record['amount']:11,.2f} #{record['check_id']}".ljust(59)
 
    return output
 

	
 

	
 
def format_multirecord(r1s: List[dict], r2s: List[dict], note: str) -> List[list]:
 
    """Generates output lines for one statement:multiple books transaction match."""
 
    assert len(r1s) == 1
 
    assert len(r2s) > 1
 
    match_output = []
 
    match_output.append([r1s[0]['date'], f'{format_record(r1s[0])}  →  {format_record(r2s[0])}  ✓ Matched{note}'])
 
    for r2 in r2s[1:]:
 
        match_output.append([r1s[0]['date'], f'{r1s[0]["date"].isoformat()}:             ↳                                    →  {format_record(r2)}  ✓ Matched{note}'])
 
    return match_output
 

	
 

	
 
def _start_of_month(time, offset_months=0):
 
    if offset_months > 0:
 
        return _start_of_month(time.replace(day=28) + datetime.timedelta(days=4), offset_months - 1)
 
    else:
 
        return time.replace(day=1)
 

	
 

	
 
def round_to_month(begin_date, end_date):
 
    """Round a beginning and end date to beginning and end of months respectively."""
 
    return (
 
        _start_of_month(begin_date),
 
        _start_of_month(end_date, offset_months=1) - datetime.timedelta(days=1))
 

	
 

	
 
def sort_records(records: List) -> List:
 
    return sorted(records, key=lambda x: (x['date'], x['amount']))
 

	
 

	
 
def first_word_exact_match(a: str, b: str) -> float:
 
    """Score a payee match based first word.
 

	
 
    We get a whole lot of good matches this way. Helps in the
 
    situation where the first word or two of a transaction description
 
    is useful and the rest is garbage.
 

	
 
    """
 
    if len(a) == 0 or len(b) == 0:
 
        return 0.0
 
    first_a = a.split()[0].strip()
 
    first_b = b.split()[0].strip()
 
    if first_a.casefold() == first_b.casefold():
 
        return min(1.0, 0.2 * len(first_a))
 
    else:
 
        return 0.0
 

	
 

	
 
def payee_match(a: str, b: str) -> float:
 
    """Score a match between two payees."""
 
    fuzzy_match = float(fuzz.token_set_ratio(a, b) / 100.00)
 
    first_word_match = first_word_exact_match(a, b)
 
    return max(fuzzy_match, first_word_match)
 

	
 

	
 
def records_match(r1: Dict, r2: Dict) -> Tuple[float, List[str]]:
 
    """Do these records represent the same transaction?"""
 
    date_score = date_proximity(r1['date'], r2['date'])
 
    if r1['date'] == r2['date']:
 
        date_message = ''
 
    elif date_score > 0.0:
 
        diff = abs((r1['date'] - r2['date']).days)
 
        date_message = f'+/- {diff} days'
 
    else:
 
        date_message = 'date mismatch'
 

	
 
    if r1['amount'] == r2['amount']:
 
        amount_score, amount_message = 2.0, ''
 
    else:
 
        amount_score, amount_message = 0.0, 'amount mismatch'
 

	
 
    # We never consider payee if there's a check_id in the books.
 
    check_message = ''
 
    payee_message = ''
...
 
@@ -551,176 +565,180 @@ def get_repo_relative_path(path: str) -> str:
 
    CSV and PDF statement metadata should be relative to
 
    CONSERVANCY_REPOSITORY ie. without regards to exactly where on
 
    your computer all the files live.
 

	
 
    """
 
    return os.path.relpath(path, start=os.getenv('CONSERVANCY_REPOSITORY'))
 

	
 

	
 
def parse_path(path: str) -> str:
 
    """Validate that a file exists for use in argparse."""
 
    if not os.path.exists(path):
 
        raise argparse.ArgumentTypeError(f'File {path} does not exist.')
 
    return path
 

	
 

	
 
def parse_repo_relative_path(path: str) -> str:
 
    """Validate that a file exists and is within $CONSERVANCY_REPOSITORY.
 

	
 
    For use with argparse.
 

	
 
    """
 
    if not os.path.exists(path):
 
        raise argparse.ArgumentTypeError(f'File {path} does not exist.')
 
    repo = os.getenv('CONSERVANCY_REPOSITORY')
 
    if not repo:
 
        raise argparse.ArgumentTypeError('$CONSERVANCY_REPOSITORY is not set.')
 
    if not path.startswith(repo):
 
        raise argparse.ArgumentTypeError(f'File {path} does not share a common prefix with $CONSERVANCY_REPOSITORY {repo}.')
 
    return path
 

	
 

	
 
def parse_decimal_with_separator(number_text: str) -> decimal.Decimal:
 
    """decimal.Decimal can't parse numbers with thousands separator."""
 
    number_text = number_text.replace(',', '')
 
    return decimal.Decimal(number_text)
 

	
 

	
 
def parse_arguments(argv: List[str]) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME, description='Reconciliation helper')
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument('--beancount-file', required=True, type=parse_path)
 
    parser.add_argument('--csv-statement', required=True, type=parse_repo_relative_path)
 
    parser.add_argument('--bank-statement', required=True, type=parse_repo_relative_path)
 
    parser.add_argument('--account', required=True, help='eg. Liabilities:CreditCard:AMEX')
 
    # parser.add_argument('--report-group-regex')
 
    parser.add_argument('--show-reconciled-matches', action='store_true')
 
    parser.add_argument('--non-interactive', action='store_true', help="Don't prompt to write to the books")    # parser.add_argument('--statement-balance', type=parse_decimal_with_separator, required=True, help="A.K.A \"cleared balance\" taken from the end of the period on the PDF statement. Required because CSV statements don't include final or running totals")
 
    parser.add_argument('--full-months', action='store_true', help='Match payments over the full month, rather that just between the beginning and end dates of the CSV statement')
 
    args = parser.parse_args(args=argv)
 
    return args
 

	
 

	
 
def totals(matches: List[Tuple[List, List, List]]) -> Tuple[decimal.Decimal, decimal.Decimal, decimal.Decimal]:
 
    """Calculate the totals of transactions matched/not-matched."""
 
    total_matched = decimal.Decimal(0)
 
    total_missing_from_books = decimal.Decimal(0)
 
    total_missing_from_statement = decimal.Decimal(0)
 
    for statement_entries, books_entries, _ in matches:
 
        if statement_entries and books_entries:
 
            total_matched += sum(c['amount'] for c in statement_entries)
 
        elif statement_entries:
 
            total_missing_from_books += sum(c['amount'] for c in statement_entries)
 
        else:
 
            total_missing_from_statement += sum(c['amount'] for c in books_entries)
 
    return total_matched, total_missing_from_books, total_missing_from_statement
 

	
 

	
 
def process_unmatched(statement_trans: List[dict], books_trans: List[dict]) -> List[Tuple[List, List, List]]:
 
    """Format the remaining unmatched transactions to be added to one single list of matches."""
 
    matches: List[Tuple[List, List, List]] = []
 
    for r1 in statement_trans:
 
        matches.append(([r1], [], ['no match']))
 
    for r2 in books_trans:
 
        matches.append(([], [r2], ['no match']))
 
    return matches
 

	
 

	
 
def format_output(matches, begin_date, end_date, csv_statement, show_reconciled_matches) -> str:
 
    with io.StringIO() as out:
 
        match_output = format_matches(matches, csv_statement, show_reconciled_matches)
 
        _, total_missing_from_books, total_missing_from_statement = totals(matches)
 
        print('-' * 155, file=out)
 
        statement_heading = f'Statement transactions {begin_date} to {end_date}'
 
        print(f'{statement_heading:<52}            {"Books transactions":<58}   Notes', file=out)
 
        print('-' * 155, file=out)
 
        for _, output in sorted(match_output, key=lambda x: x[0]):
 
            print(output, file=out)
 
        print('-' * 155, file=out)
 
        print(f'Sub-total not on statement: {total_missing_from_statement:12,.2f}', file=out)
 
        print(f'Sub-total not in books:     {total_missing_from_books:12,.2f}', file=out)
 
        print(f'Total:                      {total_missing_from_statement + total_missing_from_books:12,.2f}', file=out)
 
        print('-' * 155, file=out)
 
        return out.getvalue()
 

	
 

	
 
def main(arglist: Optional[Sequence[str]] = None,
 
         stdout: TextIO = sys.stdout,
 
         stderr: TextIO = sys.stderr,
 
         config: Optional[configmod.Config] = None,
 
         ) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    # Validate and normalise the statement into our standard
 
    # transaction data structure.
 
    if 'AMEX' in args.account:
 
        validate_csv = validate_amex_csv
 
        read_csv = read_amex_csv
 
    else:
 
        validate_csv = validate_fr_csv
 
        read_csv = read_fr_csv
 

	
 
    with open(args.csv_statement) as f:
 
        sample = f.read(200)
 
        # Validate should return true/false and a message.
 
        validate_csv(sample)
 
        f.seek(0)
 
        # TODO: Needs a custom read_transactions_from_csv for each of AMEX and
 
        # FR since AMEX has a header row and FR doesn't.
 
        statement_trans = read_csv(f)
 

	
 
    # Dates are taken from the beginning/end of the statement.
 
    begin_date = statement_trans[0]['date']
 
    end_date = statement_trans[-1]['date']
 

	
 
    if args.full_months:
 
        begin_date, end_date = round_to_month(begin_date, end_date)
 

	
 
    # Query for the Beancount books data for this above period.
 
    #
 
    # There are pros and cons for using Beancount's in-memory entries
 
    # list directly and also for using Beancount Query Language (BQL)
 
    # to get a list of transactions? Using BQL because it's
 
    # convenient, but we don't have access to the full transaction
 
    # entry objects. Feels a bit strange that these approaches are so
 
    # disconnected.
 
    #
 
    # beancount.query.query_compile.compile() and
 
    # beancount.query.query_execute.filter_entries() look useful in this respect,
 
    # but I'm not clear on how to use compile(). An example would help.
 
    entries, _, options = loader.load_file(args.beancount_file)
 
    # String concatenation looks bad, but there's no SQL injection possible here
 
    # because BQL can't write back to the Beancount files. I hope!
 
    query = f"""
 
        SELECT filename,
 
        META("lineno") AS line,
 
        META("bank-statement") AS bank_statement,
 
        date,
 
        number(cost(position)),
 
        payee,
 
        ENTRY_META("entity") as entity,
 
        ANY_META("check-id") as check_id,
 
        narration
 
        WHERE account = "{args.account}"
 
            AND date >= {begin_date}
 
            AND date <= {end_date}"""
 
    _, result_rows = run_query(entries, options, query)
 
    books_trans = sort_records([standardize_beancount_record(row) for row in result_rows])
 

	
 
    # Apply two passes of matching, one for standard matches and one
 
    # for subset matches.
 
    matches, remaining_statement_trans, remaining_books_trans = match_statement_and_books(statement_trans, books_trans)
 
    subset_matches, remaining_statement_trans, remaining_books_trans = subset_match(
 
        remaining_statement_trans, remaining_books_trans)
 
    matches.extend(subset_matches)
 

	
 
    # Add the remaining unmatched to make one big list of matches, successful or not.
 
    unmatched = process_unmatched(remaining_statement_trans, remaining_books_trans)
 
    matches.extend(unmatched)
 

	
 
    # Print out results of our matching.
 
    print(format_output(matches, begin_date, end_date, args.csv_statement, args.show_reconciled_matches))
 

	
 
    # Write statement metadata back to the books.
 
    metadata_to_apply = []
 
    for match in matches:
tests/test_reconcile.py
Show inline comments
 
import datetime
 
import decimal
 
import io
 
import os
 
import tempfile
 
import textwrap
 

	
 
import pytest
 

	
 
from conservancy_beancount.reconcile.statement_reconciler import (
 
    date_proximity,
 
    format_output,
 
    match_statement_and_books,
 
    metadata_for_match,
 
    payee_match,
 
    read_amex_csv,
 
    read_fr_csv,
 
    remove_duplicate_words,
 
    remove_payee_junk,
 
    round_to_month,
 
    subset_match,
 
    totals,
 
    write_metadata_to_books,
 
)
 

	
 
# These data structures represent individual transactions as taken from the
 
# statement ("S") or the books ("B").
 

	
 
# Statement transaction examples.
 
S1 = {
 
    'date': datetime.date(2022, 1, 1),
 
    'amount': decimal.Decimal('10.00'),
 
    'payee': 'Patreon         / Patreon   / 123456/ ST-A1B2C3D4G5H6       /',
 
    'check_id': '',
 
    'line': 222,
 
}
 
S2 = {
 
    'date': datetime.date(2022, 1, 2),
 
    'amount': decimal.Decimal('20.00'),
 
    'payee': 'BT*LINODE           PHILADELPHIA        P',
 
    'check_id': '',
 
    'line': 333,
 
}
 
S3 = {
 
    'date': datetime.date(2022, 1, 3),
 
    'amount': decimal.Decimal('30.00'),
 
    'payee': 'USPS PO 4067540039 0PORTLAND            OR',
 
    'check_id': '',
 
    'line': 444,
 
}
 
S4 = {
 
    'date': datetime.date(2022, 8, 11),
 
    'amount': decimal.Decimal('-2260.00'),
 
    'payee': 'Trust 0000000362 210',
 
    'check_id': '',
 
    'line': 555,
 
}
 

	
 
# Books transaction examples.
 
B1 = {
 
    'date': datetime.date(2022, 1, 1),
 
    'amount': decimal.Decimal('10.00'),
 
    'payee': 'Patreon',
 
    'check_id': '',
 
    'filename': '2022/imports.beancount',
 
    'line': 777,
 
    'bank_statement': '',
 
}
...
 
@@ -343,48 +346,65 @@ def test_subset_passes_through_all_non_matches():
 
    assert subset_match(statement_trans, books_trans) == (
 
        [([S4], [B4A, B4B, B4C], [])],  # Matched
 
        [S1],  # No match: preserved intact
 
        [B2, B3_next_day, B3_next_week]  # No match: preserved intact
 
    )
 

	
 

	
 
def test_handles_amex_csv():
 
    CSV = """Date,Receipt,Description,Card Member,Account #,Amount,Extended Details,Appears On Your Statement As,Address,City/State,Zip Code,Country,Reference,Category\n08/19/2021,,Gandi.net           San Francisco,RODNEY R BROWN,-99999,28.15,"00000009999 00000009999999999999\nGandi.net\nSan Francisco\n00000009999999999999",Gandi.net           San Francisco,"NEPTUNUSSTRAAT 41-63\nHOOFDDORP",,2132 JA,NETHERLANDS (THE),'999999999999999999',Merchandise & Supplies-Internet Purchase\n"""
 
    expected = [
 
        {
 
            'date': datetime.date(2021, 8, 19),
 
            'amount': decimal.Decimal('-28.15'),
 
            'payee': 'Gandi San Francisco',
 
            'check_id': '',
 
            'line': 2,
 
        },
 
    ]
 
    assert read_amex_csv(io.StringIO(CSV)) == expected
 

	
 

	
 
def test_handles_fr_csv():
 
    CSV = """"DD99999999999","03/31/2022","LAST STATEMENT","","","$1,000.00"\n"9999999999999","04/01/2022","INCOMING WIRE","GONDOR S.S. A111111111BCDE0F","$6.50","$1,006.50"\n"DD99999999999","04/18/2022","CHECK  3741","","$-4.50","$1,002.00"\n"DD99999999999","04/30/2022","THIS STATEMENT","","","$102.00"\n"""
 
    expected = [
 
        {
 
            'date': datetime.date(2022, 4, 1),
 
            'amount': decimal.Decimal('6.50'),
 
            'payee': 'GONDOR S.S. A1111111',
 
            'check_id': '',
 
            'line': 2,
 
        },
 
        {
 
            'date': datetime.date(2022, 4, 18),
 
            'amount': decimal.Decimal('-4.50'),
 
            'payee': '',
 
            'check_id': '3741',
 
            'line': 3,
 
        },
 
    ]
 
    assert read_fr_csv(io.StringIO(CSV)) == expected
 

	
 

	
 
def test_format_output():
 
    statement = [S1]
 
    books = [B1]
 
    matches, _, _ = match_statement_and_books(statement, books)
 
    output = format_output(matches, datetime.date(2022, 1, 1), datetime.date(2022, 2, 1), 'test.csv', True)
 
    assert '2022-01-01:       10.00 Patreon         / Patreon   / 12345  →  2022-01-01:       10.00 Patreon                              ✓ Matched' in output
 

	
 

	
 
month_test_data = [
 
    ((datetime.date(2022, 1, 2), datetime.date(2022, 1, 30)),
 
     (datetime.date(2022, 1, 1), datetime.date(2022, 1, 31))),
 
    ((datetime.date(2022, 4, 2), datetime.date(2022, 4, 29)),
 
     (datetime.date(2022, 4, 1), datetime.date(2022, 4, 30))),
 
    ((datetime.date(2022, 2, 2), datetime.date(2022, 2, 27)),
 
     (datetime.date(2022, 2, 1), datetime.date(2022, 2, 28))),
 
    ((datetime.date(2024, 2, 2), datetime.date(2024, 2, 27)),
 
     (datetime.date(2024, 2, 1), datetime.date(2024, 2, 29))),
 
]
 

	
 

	
 
@pytest.mark.parametrize('input_dates,rounded_dates', month_test_data)
 
def test_rounds_to_full_month(input_dates, rounded_dates):
 
    assert round_to_month(*input_dates) == rounded_dates
0 comments (0 inline, 0 general)