Changeset - 71bf8137a56f
[Not reviewed]
0 1 0
Ben Sturmfels (bsturmfels) - 20 months ago 2023-01-11 08:36:43
ben@sturm.com.au
reconcile.helper: Add entry_point to avoid traceback
1 file changed with 13 insertions and 3 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reconcile/helper.py
Show inline comments
...
 
@@ -21,227 +21,237 @@ import datetime
 
import decimal
 
import io
 
import sys
 
import tempfile
 
import textwrap
 
import typing
 
from typing import List
 
import os
 

	
 
from beancount import loader
 
from beancount.query.query import run_query
 

	
 

	
 
def end_of_month(date: datetime.date) -> datetime.date:
 
    """Given a date, return the last day of the month."""
 
    # Using 'day' replaces, rather than adds.
 
    return date + relativedelta(day=31)
 

	
 

	
 
def format_record_for_grep(row: typing.List, homedir: str) -> typing.List:
 
    """Return a line in a grep-style.
 

	
 
    This is so the line can be fed into Emacs grep-mode for quickly jumping to
 
    the relevant lines in the books.
 
    """
 
    file = row[0].replace(homedir, '~')
 
    return [f'{file}:{row[1]}:'] + row[2:]
 

	
 

	
 
def max_column_widths(rows: List) -> List[int]:
 
    """Return the max width for each column in a table of data."""
 
    if not rows:
 
        return []
 
    else:
 
        maxes = [0] * len(rows[0])
 
        for row in rows:
 
            for i, val in enumerate(row):
 
                length = len(str(val))
 
                maxes[i] = max(maxes[i], length)
 
        return maxes
 

	
 

	
 
def tabulate(rows: List, headers: List = None) -> str:
 
    """Format a table of data as a string.
 

	
 
    Implemented here to avoid adding dependency on "tabulate" package.
 
    """
 
    output = io.StringIO()
 
    if headers:
 
        rows = [headers] + rows
 
    widths = max_column_widths(rows)
 
    for row in rows:
 
        for i, col in enumerate(row):
 
            width = widths[i]
 
            if col is None:
 
                print(' ' * width, end=' ', file=output)
 
            elif isinstance(col, str):
 
                print((str(col)).ljust(width), end=' ', file=output)
 
            else:
 
                print((str(col)).rjust(width), end=' ', file=output)
 
        print('', file=output)
 
    return output.getvalue().strip()
 

	
 

	
 
def reconciliation_report(account, end_date, bank_account_balance, uncleared, prev_end_date, our_account_balance, prev_uncleared):
 
    *_, account_name = account.rpartition(':')
 
    # end_date_iso = end_date.isoformat()
 
    # prev_end_date_iso = prev_end_date.isoformat()
 
    output = io.StringIO()
 
    w = csv.writer(output)
 
    w.writerow([f'title:{end_date}: {account_name}'])
 
    w.writerow(['BANK RECONCILIATION', 'ENDING', end_date])
 
    w.writerow([f'  {account}'])
 
    w.writerow([])
 
    w.writerow(['DATE', 'CHECK NUM', 'PAYEE', 'AMOUNT'])
 
    w.writerow([])
 
    w.writerow([end_date, '', 'BANK ACCOUNT BALANCE', bank_account_balance])
 
    w.writerow([])
 
    for trans in uncleared:
 
        w.writerow(trans)
 
    w.writerow([])
 
    w.writerow([end_date, '', 'OUR ACCOUNT BALANCE', our_account_balance])
 
    if prev_uncleared:
 
        w.writerow([])
 
        w.writerow(['Items Still', 'Outstanding On', prev_end_date, 'Appeared By', end_date])
 
        w.writerow([])
 
        for trans in prev_uncleared:
 
            w.writerow(trans)
 
    return output.getvalue()
 

	
 

	
 
def reconciliation_report_path(account, end_date):
 
    *_, account_name = account.rpartition(':')
 
    return f'Financial/Controls/Reports-for-Treasurer/{end_date}_{account_name}_bank-reconciliation.csv'
 

	
 

	
 
def parse_args():
 
def parse_args(argv):
 
    parser = argparse.ArgumentParser(description='Reconciliation helper')
 
    parser.add_argument('--beancount-file', required=True)
 
    parser.add_argument('--account', help='Full account name, e.g. "Liabilities:CreditCard:AMEX"', required=True)
 
    parser.add_argument('--prev-end-date', type=datetime.date.fromisoformat)
 
    parser.add_argument('--cur-end-date', type=datetime.date.fromisoformat)
 
    parser.add_argument('--month', help='YYYY-MM of ending month. Use with --period.')
 
    parser.add_argument('--period', help='Months in the past to consider. Use with --month.', type=int, choices=[1, 3, 12])
 
    parser.add_argument('--statement-match')
 
    parser.add_argument('--cost-function', default='COST')
 
    parser.add_argument('--grep-output-filename')
 
    # parser.add_argument('--report-group-regex')
 
    args = parser.parse_args()
 
    args = parser.parse_args(args=argv[1:])
 
    if args.month or args.period:
 
        if not (args.month and args.period):
 
            parser.error('--month and --period must be used together')
 
    else:
 
        if not (args.cur_end_date and args.prev_end_date):
 
            parser.error(' --prev-end-date and --cur-end-date must be used together')
 
    return args
 

	
 

	
 
def beancount_file_exists(path):
 
    return os.path.isfile(path)
 

	
 

	
 
args = parse_args()
 
def main(args):
 
    if not beancount_file_exists(args.beancount_file):
 
        sys.exit(f'Beancount file does not exist: {args.beancount_file}')
 
    if args.month or args.period:
 
        parsed_date = datetime.datetime.strptime(args.month, '%Y-%m').date()
 
        preDate = end_of_month(parsed_date - relativedelta(months=args.period)).isoformat()
 
        lastDateInPeriod = end_of_month(parsed_date).isoformat()
 
        month = args.month
 
    else:
 
        preDate = args.prev_end_date
 
        lastDateInPeriod = args.cur_end_date.isoformat()
 
        month = args.cur_end_date.strftime('%Y-%m')
 
    grep_output_file: typing.IO
 
    if args.grep_output_filename:
 
        grep_output_file = open(args.grep_output_filename, 'w')
 
    else:
 
        grep_output_file = tempfile.NamedTemporaryFile(prefix='bc-reconcile-grep-output_', mode='w', delete=False)
 
    beancount_file = args.beancount_file
 
    account = args.account
 
    cost_function = args.cost_function
 
    statement_match = args.statement_match if args.statement_match else month
 

	
 
    QUERIES = {
 
        f"00: CLEARED BAL ENDING DAY BEFORE {preDate}":
 
        # $CONLEDGER -V -C -e "$preDate" bal "/$acct/"
 
        f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
 
            AND date < {preDate} AND META('bank-statement') != NULL""",
 

	
 
        f"01: ALL TRANSACTION BAL ENDING DAY BEFORE {preDate}":
 
        # $CONLEDGER -V -e "$preDate" bal "/$acct/"
 
        f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
 
            AND date < {preDate}""",
 

	
 
        f"02: ALL TRANSACTION BAL, ending {lastDateInPeriod}":
 
        # $CONLEDGER -V -e "$date" bal "/$acct/"
 
        f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
 
        AND date <= {lastDateInPeriod}""",
 

	
 
        f"03: UNCLEARED TRANSACTIONS, ending {lastDateInPeriod}":
 
        f"""SELECT date, {cost_function}(position) as amt, ANY_META('check-id') as chknum, narration, payee, ENTRY_META('code') as code
 
        WHERE account = "{account}"
 
        AND date <= {lastDateInPeriod} AND META('bank-statement') = NULL
 
        ORDER BY date, payee, narration""",
 

	
 
        "04: UNCLEARED TRANSACTION FILE, SUITABLE FOR GREP":
 
        # $CONLEDGER -w -F "%(filename):%(beg_line): %(date) %(code) %(payee) %(amount)\n" --sort d -U -e "$date" reg "/$acct/" > "$TMPDIR/unreconciled-lines"
 
        f"""SELECT  ENTRY_META('filename') as file, META('lineno') as line, date,
 
        {cost_function}(position) as amt, ANY_META('check-id') as chknum, narration, payee, ANY_META("entity") as entity, ENTRY_META('code') as c
 
        WHERE account = "{account}"
 
        AND date <= {lastDateInPeriod} AND META('bank-statement') = NULL
 
        ORDER BY date, payee, narration""",
 

	
 
        f"05: CLEARED BALANCE ending {lastDateInPeriod}":
 
        # $CONLEDGER -V -C -e "$date" bal "/$acct/"
 
        f"""SELECT sum({cost_function}(position)) AS aa WHERE account = "{account}"
 
        AND date <= {lastDateInPeriod} AND META('bank-statement') != NULL""",
 

	
 
        f"06: CLEARED SUBTRACTIONS on {month}'s statement":
 
        # $CONLEDGER -V -C --limit "a > 0 and tag(\"Statement\") =~ /$statementSearchString/" bal "/$acct/"
 
        f"""SELECT  sum(number({cost_function}(position))) AS aa
 
        WHERE account = "{account}"
 
        and META("bank-statement") ~  "{statement_match}" and number({cost_function}(position)) < 0""",
 

	
 
        f"07: CLEARED ADDITIONS on {month}'s statement":
 
        # $CONLEDGER -V -C --limit "a < 0 and tag(\"Statement\") =~ /$statementSearchString/" bal "/$acct/"
 
        f"""SELECT  sum(number({cost_function}(position))) AS aa
 
        WHERE account = "{account}"
 
        and META("bank-statement") ~  "{statement_match}" and number({cost_function}(position)) > 0""",
 
    }
 

	
 
    # Run Beancount queries.
 
    print(f"START RECONCILIATION FOR {account} ENDING {lastDateInPeriod} (previous end date {preDate})")
 
    entries, _, options = loader.load_file(beancount_file)
 
    uncleared_rows = []  # Hack to capture results of query 03.
 
    cleared_balance = decimal.Decimal('0')
 
    all_trans_balance = decimal.Decimal('0')
 
    for desc, query in QUERIES.items():
 
        rtypes, rrows = run_query(entries, options, query, numberify=True)
 
        if not rrows:
 
            print(f'{desc:<55} {"N/A":>11}')
 
        elif desc.startswith('04'):
 
            homedir = os.getenv('HOME', '')
 
            print(f'{desc}\n   See {grep_output_file.name}')
 
            grep_rows = [format_record_for_grep(row, homedir) for row in rrows]
 
            print(tabulate(grep_rows), file=grep_output_file)
 
        elif len(rrows) == 1 and isinstance(rrows[0][0], decimal.Decimal):
 
            result = rrows[0][0]
 
            print(f'{desc:<55} {result:11,.2f}')
 
            if desc.startswith('02'):
 
                all_trans_balance = result
 
            if desc.startswith('05'):
 
                cleared_balance = result
 
        else:
 
            headers = [c[0].capitalize() for c in rtypes]
 
            if desc.startswith('03'):
 
                uncleared_rows = rrows
 
            print(desc)
 
            print(textwrap.indent(tabulate(rrows, headers=headers), '    '))
 

	
 
    uncleared = [(r[0], r[2], r[4] or r[3], r[1]) for r in uncleared_rows]
 
    report_path = os.path.join(os.getenv('CONSERVANCY_REPOSITORY', ''), reconciliation_report_path(account, lastDateInPeriod))
 
    # TODO: Make the directory if it doesn't exist.
 
    with open(report_path, 'w') as f:
 
        f.write(reconciliation_report(account, lastDateInPeriod, cleared_balance, uncleared, '1900-01-01', all_trans_balance, []))
 
    print(f'Wrote reconciliation report: {report_path}.')
 

	
 

	
 
if __name__ == '__main__':
 
    args = parse_args(sys.argv)
 
    main(args)
 

	
 

	
 
def entry_point():
 
    args = parse_args(sys.argv)
 
    main(args)
0 comments (0 inline, 0 general)