Changeset - 6b1ce7d73a1a
[Not reviewed]
0 1 0
Ben Sturmfels (bsturmfels) - 3 years ago 2022-02-09 01:58:11
ben@sturm.com.au
reconcile: Fix edge case in helper when only one unreconciled transaction.
1 file changed with 3 insertions and 2 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reconcile/helper.py
Show inline comments
 
"""Tool to help identify unreconciled postings.
 

	
 
Run like this:
 

	
 
  python conservancy_beancount/reconcile/helper.py \
 
    --beancount-file=$HOME/conservancy/beancount/books/2021.beancount \
 
    --prev-end-date=2021-05-13 --cur-end-date=2021-12-31 \
 
    --account="Liabilities:CreditCard:AMEX"
 

	
 
In the spirit of bc-reconcile-helper.plx (the original Perl code)
 

	
 
Not implemented:
 
 - --report-group-regex
 
 - git branch selection from bean-query-goofy-daemon.plx
 

	
 
"""
 
import argparse
 
from dateutil.relativedelta import relativedelta
 
import datetime
 
import decimal
 
import io
 
import tempfile
 
import textwrap
 
import typing
 
from typing import List
 
import os
 

	
 
from beancount import loader
 
from beancount.query.query import run_query
 

	
 

	
 
def end_of_month(date: datetime.date) -> datetime.date:
 
    """Given a date, return the last day of the month."""
 
    # Using 'day' replaces, rather than adds.
 
    return date + relativedelta(day=31)
 

	
 

	
 
def format_record_for_grep(row: typing.List, homedir: str) -> typing.List:
 
    """Return a line in a grep-style.
 

	
 
    This is so the line can be fed into Emacs grep-mode for quickly jumping to
 
    the relevant lines in the books.
 
    """
 
    file = row[0].replace(homedir, '~')
 
    return [f'{file}:{row[1]}:'] + row[2:]
 

	
 

	
 
def max_column_widths(rows: List) -> List[int]:
 
    """Return the max width for each column in a table of data."""
 
    if not rows:
 
        return []
 
    else:
 
        maxes = [0] * len(rows[0])
 
        for row in rows:
 
            for i, val in enumerate(row):
 
                length = len(str(val))
 
                maxes[i] = max(maxes[i], length)
 
        return maxes
 

	
 

	
 
def tabulate(rows: List, headers: List=None) -> str:
 
    """Format a table of data as a string.
 

	
 
    Implemented here to avoid adding dependency on "tabulate" package.
 
    """
 
    output = io.StringIO()
 
    if headers:
 
        rows = [headers] + rows
 
    widths = max_column_widths(rows)
 
    for row in rows:
 
        for i, col in enumerate(row):
 
            width = widths[i]
 
            if col is None:
 
                print(' ' * width, end=' ', file=output)
 
            elif isinstance(col, str):
 
                print((str(col)).ljust(width), end=' ', file=output)
 
            else:
 
                print((str(col)).rjust(width), end=' ', file=output)
 
        print('', file=output)
 
    return output.getvalue().strip()
 

	
 

	
 
# Parse all the arguments
 
parser = argparse.ArgumentParser(description='Reconciliation helper')
 
parser.add_argument('--beancount-file', required=True)
 
parser.add_argument('--account', help='Full account name, e.g. "Liabilities:CreditCard:AMEX"', required=True)
 
parser.add_argument('--prev-end-date', type=datetime.date.fromisoformat)
 
parser.add_argument('--cur-end-date', type=datetime.date.fromisoformat)
 
parser.add_argument('--month', help='YYYY-MM of ending month. Use with --period.')
 
parser.add_argument('--period', help='Months in the past to consider. Use with --month.', type=int, choices=[1, 3, 12])
 
parser.add_argument('--statement-match')
 
parser.add_argument('--cost-function', default='COST')
 
parser.add_argument('--grep-output-filename')
 
# parser.add_argument('--report-group-regex')
 
args = parser.parse_args()
 
if args.month or args.period:
 
    if not (args.month and args.period):
 
        parser.error('--month and --period must be used together')
 
    parsed_date = datetime.datetime.strptime(args.month, '%Y-%m').date()
 
    preDate = end_of_month(parsed_date - relativedelta(months=args.period)).isoformat()
 
    lastDateInPeriod = end_of_month(parsed_date).isoformat()
 
    month = args.month
 
else:
 
    if not (args.cur_end_date and args.prev_end_date):
 
        parser.error(' --prev-end-date and --cur-end-date must be used together')
 
    preDate = args.prev_end_date
 
    lastDateInPeriod = args.cur_end_date.isoformat()
 
    month = args.cur_end_date.strftime('%Y-%m')
 
grep_output_file: typing.IO
 
if args.grep_output_filename:
 
    grep_output_file = open(args.grep_output_filename, 'w')
 
else:
 
    grep_output_file = tempfile.NamedTemporaryFile(prefix='bc-reconcile-grep-output_', mode='w', delete=False)
 
beancount_file = args.beancount_file
 
account = args.account
 
cost_function = args.cost_function
 
statement_match = args.statement_match if args.statement_match else month
 

	
 
QUERIES = {
 
    f"00: CLEARED BAL ENDING DAY BEFORE {preDate}":
 
    # $CONLEDGER -V -C -e "$preDate" bal "/$acct/"
 
    f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
 
        AND date < {preDate} AND META('bank-statement') != NULL""",
 

	
 
    f"01: ALL TRANSACTION BAL ENDING DAY BEFORE {preDate}":
 
    # $CONLEDGER -V -e "$preDate" bal "/$acct/"
 
    f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
 
        AND date < {preDate}""",
 

	
 
    f"02: ALL TRANSACTION BAL, ending {lastDateInPeriod}":
 
    # $CONLEDGER -V -e "$date" bal "/$acct/"
 
    f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
 
    AND date <= {lastDateInPeriod}""",
 

	
 
    f"03: UNCLEARED TRANSACTIONS, ending {lastDateInPeriod}":
 
    f"""SELECT date, {cost_function}(position) as amt, ANY_META('check-id') as chknum, narration, payee, ENTRY_META('code') as code
 
    WHERE account = "{account}"
 
    AND date <= {lastDateInPeriod} AND META('bank-statement') = NULL
 
    ORDER BY date, payee, narration""",
 

	
 
    "04: UNCLEARED TRANSACTION FILE, SUITABLE FOR GREP":
 
    # $CONLEDGER -w -F "%(filename):%(beg_line): %(date) %(code) %(payee) %(amount)\n" --sort d -U -e "$date" reg "/$acct/" > "$TMPDIR/unreconciled-lines"
 
    f"""SELECT  ENTRY_META('filename') as file, META('lineno') as line, date,
 
    {cost_function}(position) as amt, ANY_META('check-id') as chknum, narration, payee, ANY_META("entity") as entity, ENTRY_META('code') as c
 
    WHERE account = "{account}"
 
    AND date <= {lastDateInPeriod} AND META('bank-statement') = NULL
 
    ORDER BY date, payee, narration""",
 

	
 
    f"05: CLEARED BALANCE ending {lastDateInPeriod}":
 
    # $CONLEDGER -V -C -e "$date" bal "/$acct/"
 
    f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
 
    AND date <= {lastDateInPeriod} AND META('bank-statement') != NULL""",
 

	
 
    f"06: CLEARED SUBTRACTIONS on {month}'s statement":
 
    # $CONLEDGER -V -C --limit "a > 0 and tag(\"Statement\") =~ /$statementSearchString/" bal "/$acct/"
 
    f"""SELECT  sum(number({cost_function}(position))) AS aa
 
    WHERE account = "{account}"
 
    and META("bank-statement") ~  "{statement_match}" and number({cost_function}(position)) < 0""",
 

	
 
    f"07: CLEARED ADDITIONS on {month}'s statement":
 
    # $CONLEDGER -V -C --limit "a < 0 and tag(\"Statement\") =~ /$statementSearchString/" bal "/$acct/"
 
    f"""SELECT  sum(number({cost_function}(position))) AS aa
 
    WHERE account = "{account}"
 
    and META("bank-statement") ~  "{statement_match}" and number({cost_function}(position)) > 0""",
 
}
 

	
 
# Run Beancount queries.
 
print(f"START RECONCILIATION FOR {account} ENDING {lastDateInPeriod} (previous end date {preDate})")
 
entries, _, options = loader.load_file(beancount_file)
 
for desc, query in QUERIES.items():
 
    rtypes, rrows = run_query(entries, options, query, numberify=True)
 
    if not rrows:
 
        print(f'{desc:<55} {"N/A":>11}')
 
    elif desc.startswith('04'):
 
        homedir = os.getenv('HOME', '')
 
        print(f'{desc}\n   See {grep_output_file.name}')
 
        grep_rows = [format_record_for_grep(row, homedir) for row in rrows]
 
        print(tabulate(grep_rows), file=grep_output_file)
 
    elif len(rrows) == 1:
 
    elif len(rrows) == 1 and isinstance(rrows[0][0], decimal.Decimal):
 
        result = rrows[0][0]
 
        print(f'{desc:<55} {result:>11,.2f}')
 
        print(f'{desc:<55} {result:11,.2f}')
 
    else:
 
        headers = [c[0].capitalize() for c in rtypes]
 
        print(desc)
 
        print(textwrap.indent(tabulate(rrows, headers=headers), '    '))
0 comments (0 inline, 0 general)