Changeset - 1cc4e732f4e2
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-10-21 14:51:18
brettcsmith@brettcsmith.org
accrual: Add --end option.

For assemble-audit-reports as shown.
2 files changed with 21 insertions and 3 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -622,179 +622,192 @@ class OutgoingReport(BaseReport):
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, one of `aging`, `balance`, or
 
`outgoing`. If not specified, the default is `aging` when no search terms are
 
given, `outgoing` for search terms that return a single outstanding payable,
 
and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
 
        type=int,
 
        default=0,
 
        help="""How far back to search the books for related transactions.
 
You can either specify a fiscal year, or a negative offset from the current
 
fiscal year, to start loading entries from. The default is to load the current,
 
unaudited books.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Do not consider entries from this date forward, in YYYY-MM-DD
 
format.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for the balance and outgoing reports, and a generated
 
filename for other reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None and not any(
 
            term.meta_key == 'invoice' or term.meta_key == 'rt-id'
 
            for term in args.search_terms
 
    ):
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        load_since = None if args.report_type == ReportType.AGING else args.since
 
        entries, load_errors, _ = books_loader.load_all(load_since)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 
    filters.remove_opening_balance_txn(entries)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 

	
 
    postings_src = data.Posting.from_entries(entries)
 
    stop_date = args.stop_date or datetime.date(datetime.MAXYEAR, 12, 31)
 
    postings_src: Iterator[data.Posting] = (
 
        posting
 
        for posting in data.Posting.from_entries(entries)
 
        if posting.meta.date < stop_date
 
    )
 
    for rewrite_path in args.rewrite_rules:
 
        try:
 
            ruleset = rewrite.RewriteRuleset.from_yaml(rewrite_path)
 
        except ValueError as error:
 
            logger.critical("failed loading rewrite rules from %s: %s",
 
                            rewrite_path, error.args[0])
 
            return cliutil.ExitCode.RewriteRulesError
 
        postings_src = ruleset.rewrite(postings_src)
 
    postings = list(filter_search(postings_src, args.search_terms))
 
    if not postings:
 
        logger.warning("no matching entries found to report")
 
        returncode = returncode or cliutil.ExitCode.NoDataFiltered
 
    # groups is a mapping of metadata value strings to AccrualPostings.
 
    # The keys are basically arbitrary, the report classes don't rely on them,
 
    # but they do help symbolize what's being grouped.
 
    # For the outgoing approval report, groups maps rt-id link strings to
 
    # associated accruals.
 
    # For all other reports, groups comes from AccrualReport.make_consistent().
 
    groups: PostGroups
 
    if args.report_type is None or args.report_type is ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.group_by_first_meta_link(postings, 'rt-id'))
 
        if args.report_type is None and len(groups) == 1:
 
            key = next(iter(groups))
 
            group = groups[key]
 
            account = group[0].account
 
            if (AccrualAccount.by_account(account) is AccrualAccount.PAYABLE
 
                and all(post.account == account for post in group)
 
                and not group.balance().ge_zero()
 
                and key):  # Make sure we have a usable rt-id
 
                args.report_type = ReportType.OUTGOING
 
    if args.report_type is not ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.make_consistent(postings))
 
    if args.report_type is not ReportType.AGING:
 
        groups = {
 
            key: posts for key, posts in groups.items() if not posts.is_paid()
 
        } or groups
 
    del postings
 

	
 
    report: Optional[BaseReport] = None
 
    output_path: Optional[Path] = None
 
    if args.report_type is ReportType.AGING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate aging report: RT client is required")
 
        else:
 
            if args.output_file is None:
 
                now = datetime.datetime.now()
 
                out_dir_path = config.repository_path() or Path()
 
                args.output_file = out_dir_path / now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods')
 
                logger.info("Writing report to %s", args.output_file)
 
            out_bin = cliutil.bytes_output(args.output_file, stdout)
 
            report = AgingReport(rt_wrapper, out_bin)
 
            report = AgingReport(rt_wrapper, out_bin, args.stop_date)
 
            report.ods.set_common_properties(config.books_repo())
 
    elif args.report_type is ReportType.OUTGOING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate outgoing report: RT client is required")
 
        else:
 
            out_file = cliutil.text_output(args.output_file, stdout)
 
            report = OutgoingReport(rt_wrapper, out_file)
 
    else:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = BalanceReport(out_file)
 

	
 
    if report is None:
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        report.run(groups)
 
    return returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/tools/audit_report.py
Show inline comments
...
 
@@ -139,91 +139,96 @@ def main(arglist: Optional[Sequence[str]]=None,
 
    if args.end_date is None:
 
        days_diff = (today - audit_end).days
 
        if days_diff < (28 * 2):
 
            args.end_date = today
 
        elif days_diff >= 365:
 
            args.end_date = fy.next_fy_date(args.audit_year + 1)
 
        else:
 
            end_date = today - datetime.timedelta(days=today.day + 1)
 
            args.end_date = end_date.replace(day=1)
 
    if args.end_date < audit_end:
 
        args.arg_error("end date is within audited fiscal year")
 
    next_year = fy.for_date(args.end_date)
 
    repo_path = config.repository_path()
 

	
 
    if args.output_directory is None:
 
        args.output_directory = Path(tempfile.mkdtemp(
 
            prefix=f'FY{args.audit_year}AuditReports.', dir=repo_path,
 
        ))
 
        logger.info("writing reports to %s", args.output_directory)
 
    else:
 
        args.output_directory.mkdir(exist_ok=True)
 
    output_reports: List[Path] = []
 
    def common_args(out_name: str, year: Optional[int]=None, *arglist: str) -> Iterator[str]:
 
        if year is not None:
 
            out_name = f'FY{year}{out_name}.ods'
 
        if year == args.audit_year:
 
            yield f'--begin={audit_begin.isoformat()}'
 
            yield f'--end={audit_end.isoformat()}'
 
        elif year == next_year:
 
            yield f'--begin={audit_end.isoformat()}'
 
            yield f'--end={args.end_date.isoformat()}'
 
        elif year is not None:
 
            raise ValueError(f"unknown year {year!r}")
 
        out_path = args.output_directory / out_name
 
        output_reports.append(out_path)
 
        for path in args.rewrite_rules:
 
            yield f'--rewrite-rules={path}'
 
        yield f'--output-file={out_path}'
 
        yield from arglist
 
    reports: List[Tuple[ReportFunc, ArgList]] = [
 
        # Reports are sorted roughly in descending order of how long each takes
 
        # to generate.
 
        (ledger.main, list(common_args('GeneralLedger', args.audit_year))),
 
        (ledger.main, list(common_args('GeneralLedger', next_year))),
 
        (ledger.main, list(common_args('Disbursements', args.audit_year, '--disbursements'))),
 
        (ledger.main, list(common_args('Receipts', args.audit_year, '--receipts'))),
 
        (ledger.main, list(common_args('Disbursements', next_year, '--disbursements'))),
 
        (ledger.main, list(common_args('Receipts', next_year, '--receipts'))),
 
        (accrual.main, list(common_args('AgingReport.ods'))),
 
        (accrual.main, list(common_args(f'FY{next_year}AgingReport.ods'))),
 
        (accrual.main, list(common_args(
 
            f'FY{args.audit_year}AgingReport.ods',
 
            None,
 
            f'--end={audit_end.isoformat()}',
 
        ))),
 
        (balance_sheet.main, list(common_args('Summary', args.audit_year))),
 
        (fund.main, list(common_args('FundReport', args.audit_year))),
 
        (fund.main, list(common_args('FundReport', next_year))),
 
    ]
 

	
 
    books = config.books_loader()
 
    if books is None:
 
        logger.critical("no books available to load")
 
        return os.EX_NOINPUT
 

	
 
    with futmod.ProcessPoolExecutor(args.jobs) as pool:
 
        logger.debug("%s: process pool ready with %s workers", now_s(), args.jobs)
 
        fy_paths = books._iter_fy_books(fy.range(args.audit_year - 1, args.end_date))
 
        check_results = pool.map(bean_check, fy_paths)
 
        if all(exitcode == 0 for exitcode in check_results):
 
            logger.debug("%s: bean-check passed", now_s())
 
        else:
 
            logger.log(
 
                logging.WARNING if args.force else logging.ERROR,
 
                "%s: bean-check failed",
 
                now_s(),
 
            )
 
            if not args.force:
 
                return os.EX_DATAERR
 

	
 
        report_results = [
 
            pool.submit(report_func, arglist, config=config)
 
            for report_func, arglist in reports
 
        ]
 
        report_errors = [res.result() for res in report_results if res.result() != 0]
 
        if not report_errors:
 
            logger.debug("%s: all reports generated", now_s())
 
        else:
 
            logger.error("%s: %s reports generated errors", now_s(), len(report_errors))
 
            if not args.force:
 
                return max(report_errors)
 
    return os.EX_OK
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
0 comments (0 inline, 0 general)