Changeset - d9360f1ceafe
[Not reviewed]
0 1 0
Brett Smith - 4 years ago 2020-09-17 14:24:13
brettcsmith@brettcsmith.org
audit_report: Use concurrent.futures for parallelization.

This is basically a pure maintainability change: concurrent.futures is the
nicest API that's available in both Python 3.6 and 3.7, and our other tools
are using it.
1 file changed with 5 insertions and 5 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/tools/audit_report.py
Show inline comments
 
"""audit_report.py - Utility to run all reports for an audit"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import concurrent.futures as futmod
 
import datetime
 
import logging
 
import multiprocessing
 
import os
 
import runpy
 
import sys
 
import tempfile
 

	
 
from pathlib import Path
 

	
 
from typing import (
 
    Callable,
 
    Iterator,
 
    List,
 
    Optional,
 
    Sequence,
 
    Set,
 
    TextIO,
 
    Tuple,
 
)
 
from types import (
 
    ModuleType,
 
)
 

	
 
from . import extract_odf_links
 
from .. import cliutil
 
from .. import config as configmod
 
from ..reports import accrual
 
from ..reports import balance_sheet
 
from ..reports import fund
 
from ..reports import ledger
 

	
 
from beancount.scripts import check as bc_check
 

	
 
ArgList = List[str]
 
ReportFunc = Callable[[ArgList, TextIO, TextIO, configmod.Config], int]
 

	
 
PROGNAME = 'audit-report'
 
logger = logging.getLogger('conservancy_beancount.tools.audit_report')
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        '--verbose', '-v',
 
        action='store_true',
 
        help="""Display progress information
 
""")
 
    cliutil.add_jobs_argument(parser)
 
    parser.add_argument(
 
        '--output-directory', '-O',
 
        metavar='DIR',
 
        type=Path,
 
        help="""Write all reports to this directory.
 
Default is a newly-created directory under your repository.
 
""")
 
    parser.add_argument(
 
        '--force',
 
        action='store_true',
 
        help="""Run reports even if bean-check reports errors.
 
""")
 
    parser.add_argument(
 
        '--rewrite-rules', '--rewrite', '-r',
 
        metavar='PATH',
 
        action='append',
 
        type=Path,
 
        default=[],
 
        help="""Path to rewrite rules for the balance sheet.
 
Passed to `balance-sheet-report -r`.
 
""")
 
    parser.add_argument(
 
        'audit_year',
 
        metavar='YEAR',
 
        nargs='?',
 
        type=cliutil.year_or_date_arg,
 
        help="""Main fiscal year to generate reports for.
 
Defaults to the last complete fiscal year.
 
""")
 
    parser.add_argument(
 
        'end_date',
 
        metavar='END',
 
        nargs='?',
 
        type=cliutil.date_arg,
 
        help="""End date for reports for the following fiscal year.
 
The default is automatically calculated from today's date.
 
""")
 
    args = parser.parse_args(arglist)
 
    args.arg_error = parser.error
 
    return args
 

	
 
def now_s() -> str:
 
    return datetime.datetime.now().isoformat(sep=' ', timespec='seconds')
 

	
 
def bean_check(books_path: Path) -> int:
 
    sys.argv = ['bean-check', str(books_path)]
 
    logger.debug("running %r", sys.argv)
 
    # bean-check logs timing information to the root logger at INFO level.
 
    # Suppress that.
 
    logging.getLogger().setLevel(logging.WARNING)
 
    return bc_check.main()  # type:ignore[no-any-return]
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if args.verbose:
 
        logger.setLevel(logging.DEBUG)
 
        reports_logger = logging.getLogger('conservancy_beancount.reports')
 
        reports_logger.setLevel(logging.DEBUG)
 
        reports_logger.getChild('rewrite').setLevel(args.loglevel)
 

	
 
    fy = config.fiscal_year_begin()
 
    today = datetime.date.today()
 
    if args.audit_year is None:
 
        args.audit_year = fy.for_date(today) - 1
 
    audit_begin = fy.first_date(args.audit_year)
 
    audit_end = fy.next_fy_date(args.audit_year)
 
    if args.end_date is None:
 
        days_diff = (today - audit_end).days
 
        if days_diff < (28 * 2):
 
            args.end_date = today
 
        elif days_diff >= 365:
 
            args.end_date = fy.next_fy_date(args.audit_year + 1)
 
        else:
 
            end_date = today - datetime.timedelta(days=today.day + 1)
 
            args.end_date = end_date.replace(day=1)
 
    if args.end_date < audit_end:
 
        args.arg_error("end date is within audited fiscal year")
 
    next_year = fy.for_date(args.end_date)
 
    repo_path = config.repository_path()
 

	
 
    if args.output_directory is None:
 
        args.output_directory = Path(tempfile.mkdtemp(
 
            prefix=f'FY{args.audit_year}AuditReports.', dir=repo_path,
 
        ))
 
        logger.info("writing reports to %s", args.output_directory)
 
    else:
 
        args.output_directory.mkdir(exist_ok=True)
 
    output_reports: List[Path] = []
 
    def common_args(out_name: str, year: Optional[int]=None, *arglist: str) -> Iterator[str]:
 
        if year is not None:
 
            out_name = f'FY{year}{out_name}.ods'
 
        if year == args.audit_year:
 
            yield f'--begin={audit_begin.isoformat()}'
 
            yield f'--end={audit_end.isoformat()}'
 
        elif year == next_year:
 
            yield f'--begin={audit_end.isoformat()}'
 
            yield f'--end={args.end_date.isoformat()}'
 
        elif year is not None:
 
            raise ValueError(f"unknown year {year!r}")
 
        out_path = args.output_directory / out_name
 
        output_reports.append(out_path)
 
        for path in args.rewrite_rules:
 
            yield f'--rewrite-rules={path}'
 
        yield f'--output-file={out_path}'
 
        yield from arglist
 
    reports: List[Tuple[ReportFunc, ArgList]] = [
 
        # Reports are sorted roughly in descending order of how long each takes
 
        # to generate.
 
        (ledger.main, list(common_args('GeneralLedger', args.audit_year))),
 
        (ledger.main, list(common_args('GeneralLedger', next_year))),
 
        (ledger.main, list(common_args('Disbursements', args.audit_year, '--disbursements'))),
 
        (ledger.main, list(common_args('Receipts', args.audit_year, '--receipts'))),
 
        (ledger.main, list(common_args('Disbursements', next_year, '--disbursements'))),
 
        (ledger.main, list(common_args('Receipts', next_year, '--receipts'))),
 
        (accrual.main, list(common_args('AgingReport.ods'))),
 
        (balance_sheet.main, list(common_args('Summary', args.audit_year))),
 
        (fund.main, list(common_args('FundReport', args.audit_year))),
 
        (fund.main, list(common_args('FundReport', next_year))),
 
    ]
 

	
 
    books = config.books_loader()
 
    if books is None:
 
        logger.critical("no books available to load")
 
        return os.EX_NOINPUT
 

	
 
    with multiprocessing.Pool(args.jobs, maxtasksperchild=1) as pool:
 
    with futmod.ProcessPoolExecutor(args.jobs) as pool:
 
        logger.debug("%s: process pool ready with %s workers", now_s(), args.jobs)
 
        fy_paths = books._iter_fy_books(fy.range(args.audit_year - 1, args.end_date))
 
        check_results = pool.imap_unordered(bean_check, fy_paths)
 
        check_results = pool.map(bean_check, fy_paths)
 
        if all(exitcode == 0 for exitcode in check_results):
 
            logger.debug("%s: bean-check passed", now_s())
 
        else:
 
            logger.log(
 
                logging.WARNING if args.force else logging.ERROR,
 
                "%s: bean-check failed",
 
                now_s(),
 
            )
 
            if not args.force:
 
                return os.EX_DATAERR
 

	
 
        report_results = [
 
            pool.apply_async(report_func, (arglist,), {'config': config})
 
            pool.submit(report_func, arglist, config=config)
 
            for report_func, arglist in reports
 
        ]
 
        report_errors = [res.get() for res in report_results if res.get() != 0]
 
        report_errors = [res.result() for res in report_results if res.result() != 0]
 
        if not report_errors:
 
            logger.debug("%s: all reports generated", now_s())
 
        else:
 
            logger.error("%s: %s reports generated errors", now_s(), len(report_errors))
 
            if not args.force:
 
                return max(report_errors)
 
    return os.EX_OK
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
0 comments (0 inline, 0 general)