Changeset - 461536453835
[Not reviewed]
0 8 0
Brett Smith - 4 years ago 2020-07-27 14:54:04
brettcsmith@brettcsmith.org
cliutil: Add ReturnFlag.

Take this opportunity to re-standardize flag values now that it's clear
what's most common.
8 files changed with 37 insertions and 38 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/cliutil.py
Show inline comments
...
 
@@ -111,48 +111,68 @@ class InfoAction(argparse.Action):
 

	
 
class LogLevel(enum.IntEnum):
 
    DEBUG = logging.DEBUG
 
    INFO = logging.INFO
 
    WARNING = logging.WARNING
 
    ERROR = logging.ERROR
 
    CRITICAL = logging.CRITICAL
 
    WARN = WARNING
 
    ERR = ERROR
 
    CRIT = CRITICAL
 

	
 
    @classmethod
 
    def from_arg(cls, arg: str) -> int:
 
        try:
 
            return cls[arg.upper()].value
 
        except KeyError:
 
            raise ValueError(f"unknown loglevel {arg!r}") from None
 

	
 
    @classmethod
 
    def choices(cls) -> Iterable[str]:
 
        for level in sorted(cls, key=operator.attrgetter('value')):
 
            yield level.name.lower()
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    """Common return codes for tools
 

	
 
    Tools should combine these flags to report different errors, and then use
 
    ReturnFlag.returncode(flags) to report their final exit status code.
 

	
 
    Values 1, 2, 4, and 8 should be reserved for this class to be shared across
 
    all tools. Flags 16, 32, and 64 are available for tools to report their own
 
    specific errors.
 
    """
 
    LOAD_ERRORS = 1
 
    NOTHING_TO_REPORT = 2
 
    _RESERVED4 = 4
 
    _RESERVED8 = 8
 

	
 
    @classmethod
 
    def returncode(cls, flags: int) -> int:
 
        return 0 if flags == 0 else 16 + flags
 

	
 

	
 
class SearchTerm(NamedTuple):
 
    """NamedTuple representing a user's metadata filter
 

	
 
    SearchTerm knows how to parse and store posting metadata filters provided
 
    by the user in `key=value` format. Reporting tools can use this to filter
 
    postings that match the user's criteria, to report on subsets of the books.
 

	
 
    Typical usage looks like::
 

	
 
      argument_parser.add_argument(
 
        'search_terms',
 
        type=SearchTerm.arg_parser(),
 
        …,
 
      )
 

	
 
      args = argument_parser.parse_args(…)
 
      for query in args.search_terms:
 
        postings = query.filter_postings(postings)
 
    """
 
    meta_key: MetaKey
 
    pattern: str
 

	
 
    @classmethod
 
    def arg_parser(cls,
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -614,55 +614,48 @@ class OutgoingReport(BaseReport):
 
            except rt.RtError:
 
                self.logger.debug("RT exception on edit_ticket", exc_info=True)
 
                ok = False
 
            if not ok:
 
                self.logger.warning("failed to set custom fields for rt:%s", ticket_id)
 

	
 

	
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    # 2 was used in the past, it can probably be reclaimed.
 
    REPORT_ERRORS = 4
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, one of `aging`, `balance`, or
 
`outgoing`. If not specified, the default is `aging` when no search terms are
 
given, `outgoing` for search terms that return a single outstanding payable,
 
and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
...
 
@@ -701,56 +694,56 @@ metadata to match. A single ticket number is a shortcut for
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    else:
 
        load_since = None if args.report_type == ReportType.AGING else args.since
 
        entries, load_errors, _ = books_loader.load_all(load_since)
 
    filters.remove_opening_balance_txn(entries)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
        returncode |= cliutil.ReturnFlag.LOAD_ERRORS
 

	
 
    postings = list(filter_search(
 
        data.Posting.from_entries(entries), args.search_terms,
 
    ))
 
    if not postings:
 
        logger.warning("no matching entries found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 
        returncode |= cliutil.ReturnFlag.NOTHING_TO_REPORT
 
    # groups is a mapping of metadata value strings to AccrualPostings.
 
    # The keys are basically arbitrary, the report classes don't rely on them,
 
    # but they do help symbolize what's being grouped.
 
    # For the outgoing approval report, groups maps rt-id link strings to
 
    # associated accruals.
 
    # For all other reports, groups comes from AccrualReport.make_consistent().
 
    groups: PostGroups
 
    if args.report_type is None or args.report_type is ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.group_by_first_meta_link(postings, 'rt-id'))
 
        if args.report_type is None and len(groups) == 1:
 
            key = next(iter(groups))
 
            group = groups[key]
 
            account = group[0].account
 
            if (AccrualAccount.by_account(account) is AccrualAccount.PAYABLE
 
                and all(post.account == account for post in group)
 
                and not group.balance().ge_zero()
 
                and key):  # Make sure we have a usable rt-id
 
                args.report_type = ReportType.OUTGOING
 
    if args.report_type is not ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.make_consistent(postings))
 
    if args.report_type is not ReportType.AGING:
 
        groups = {
 
            key: posts for key, posts in groups.items() if not posts.is_paid()
 
        } or groups
...
 
@@ -761,33 +754,33 @@ def main(arglist: Optional[Sequence[str]]=None,
 
    if args.report_type is ReportType.AGING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate aging report: RT client is required")
 
        else:
 
            now = datetime.datetime.now()
 
            if args.output_file is None:
 
                out_dir_path = config.repository_path() or Path()
 
                args.output_file = out_dir_path / now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods')
 
                logger.info("Writing report to %s", args.output_file)
 
            out_bin = cliutil.bytes_output(args.output_file, stdout)
 
            report = AgingReport(rt_wrapper, out_bin)
 
    elif args.report_type is ReportType.OUTGOING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate outgoing report: RT client is required")
 
        else:
 
            out_file = cliutil.text_output(args.output_file, stdout)
 
            report = OutgoingReport(rt_wrapper, out_file)
 
    else:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = BalanceReport(out_file)
 

	
 
    if report is None:
 
        returncode |= ReturnFlag.REPORT_ERRORS
 
        returncode |= 16
 
    else:
 
        report.run(groups)
 
    return 0 if returncode == 0 else 16 + returncode
 
    return cliutil.ReturnFlag.returncode(returncode)
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/reports/fund.py
Show inline comments
...
 
@@ -262,53 +262,48 @@ class TextReport:
 
        for acct_s, bal_seq in output:
 
            if acct_s.endswith(fund_start):
 
                print(line_fmt.format('―' * acct_width, '―' * bal_width),
 
                      file=self.out_file)
 
            bal_iter = iter(bal_seq)
 
            print(line_fmt.format(acct_s, next(bal_iter)), file=self.out_file)
 
            for bal_s in bal_iter:
 
                print(line_fmt.format('', bal_s), file=self.out_file)
 

	
 

	
 
class ReportType(enum.Enum):
 
    TEXT = TextReport
 
    ODS = ODSReport
 
    TXT = TEXT
 
    SPREADSHEET = ODS
 

	
 
    @classmethod
 
    def from_arg(cls, s: str) -> 'ReportType':
 
        try:
 
            return cls[s.upper()]
 
        except KeyError:
 
            raise ValueError(f"no report type matches {s!r}") from None
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date.
 
""")
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='TYPE',
 
        type=ReportType.from_arg,
 
        help="""Type of report to generate. `text` gives a plain two-column text
...
 
@@ -361,67 +356,67 @@ def main(arglist: Optional[Sequence[str]]=None,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    if args.stop_date is None:
 
        if args.start_date is None:
 
            args.stop_date = datetime.date.today()
 
        else:
 
            args.stop_date = diff_year(args.start_date, 1)
 
    if args.start_date is None:
 
        args.start_date = diff_year(args.stop_date, -1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    else:
 
        entries, load_errors, _ = books_loader.load_fy_range(args.start_date, args.stop_date)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
        returncode |= cliutil.ReturnFlag.LOAD_ERRORS
 

	
 
    postings = (
 
        post
 
        for post in data.Posting.from_entries(entries)
 
        if post.meta.date < args.stop_date
 
    )
 
    for search_term in args.search_terms:
 
        postings = search_term.filter_postings(postings)
 
    fund_postings = {
 
        key: related
 
        for key, related in core.RelatedPostings.group_by_meta(postings, 'project')
 
        if isinstance(key, str)
 
    }
 
    period_cls = core.PeriodPostings.with_start_date(args.start_date)
 
    fund_map = collections.OrderedDict(
 
        (fund, dict(period_cls.group_by_account(fund_postings[fund])))
 
        for fund in sorted(fund_postings, key=lambda s: locale.strxfrm(s.casefold()))
 
    )
 
    if not fund_map:
 
        logger.warning("no matching postings found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 
        returncode |= cliutil.ReturnFlag.NOTHING_TO_REPORT
 
    elif args.report_type is ReportType.TEXT:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = TextReport(args.start_date, args.stop_date, out_file)
 
        report.write(fund_map.items())
 
    else:
 
        ods_report = ODSReport(args.start_date, args.stop_date)
 
        ods_report.write(fund_map.items())
 
        if args.output_file is None:
 
            out_dir_path = config.repository_path() or Path()
 
            args.output_file = out_dir_path / 'FundReport_{}_{}.ods'.format(
 
                args.start_date.isoformat(), args.stop_date.isoformat(),
 
            )
 
            logger.info("Writing report to %s", args.output_file)
 
        ods_file = cliutil.bytes_output(args.output_file, stdout)
 
        ods_report.save_file(ods_file)
 
    return 0 if returncode == 0 else 16 + returncode
 
    return cliutil.ReturnFlag.returncode(returncode)
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/reports/ledger.py
Show inline comments
...
 
@@ -594,53 +594,48 @@ class TransactionODS(LedgerODS):
 
                    except KeyError:
 
                        dup = False
 
                    if dup:
 
                        meta_cell = odf.table.TableCell()
 
                    elif meta_key in data.LINK_METADATA:
 
                        meta_cell = self.meta_links_cell(post.meta.report_links(meta_key))
 
                    else:
 
                        meta_cell = self.string_cell(post.meta.get(meta_key, ''))
 
                    meta_cells.append(meta_cell)
 
                if post.cost is None:
 
                    amount_cell = odf.table.TableCell()
 
                else:
 
                    amount_cell = self.currency_cell(self.norm_func(post.units))
 
                self.add_row(
 
                    odf.table.TableCell(),
 
                    odf.table.TableCell(),
 
                    self.string_cell(post.account),
 
                    self.string_cell(post.meta.get('entity') or ''),
 
                    amount_cell,
 
                    self.currency_cell(self.norm_func(post.at_cost())),
 
                    *meta_cells,
 
                )
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
class CashReportAction(argparse.Action):
 
    def __call__(self,
 
                 parser: argparse.ArgumentParser,
 
                 namespace: argparse.Namespace,
 
                 values: Union[Sequence[Any], str, None]=None,
 
                 option_string: Optional[str]=None,
 
    ) -> None:
 
        namespace.txn_filter = self.const
 
        if namespace.accounts is None:
 
            namespace.accounts = []
 
        namespace.accounts.append('Assets:PayPal')
 
        namespace.accounts.append('Cash')
 
        if namespace.stop_date is None:
 
            namespace.stop_date = datetime.date.today()
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--disbursements',
 
        action=CashReportAction,
 
        const=TransactionFilter.DEBIT,
 
        nargs=0,
...
 
@@ -769,80 +764,80 @@ def main(arglist: Optional[Sequence[str]]=None,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    today = datetime.date.today()
 
    if args.start_date is None:
 
        args.start_date = diff_year(today, -1)
 
        if args.stop_date is None:
 
            args.stop_date = today + datetime.timedelta(days=30)
 
    elif args.stop_date is None:
 
        args.stop_date = diff_year(args.start_date, 1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, options = books.Loader.load_none(config.config_file_path())
 
    else:
 
        entries, load_errors, options = books_loader.load_fy_range(args.start_date, args.stop_date)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
        returncode |= cliutil.ReturnFlag.LOAD_ERRORS
 

	
 
    data.Account.load_from_books(entries, options)
 
    postings = data.Posting.from_entries(entries)
 
    for search_term in args.search_terms:
 
        postings = search_term.filter_postings(postings)
 

	
 
    rt_wrapper = config.rt_wrapper()
 
    if rt_wrapper is None:
 
        logger.warning("could not initialize RT client; spreadsheet links will be broken")
 
    try:
 
        if args.txn_filter is None:
 
            report = LedgerODS(
 
                args.start_date,
 
                args.stop_date,
 
                args.accounts,
 
                rt_wrapper,
 
                args.sheet_size,
 
                args.show_totals,
 
                args.add_totals,
 
            )
 
        else:
 
            report = TransactionODS(
 
                args.start_date,
 
                args.stop_date,
 
                args.accounts,
 
                rt_wrapper,
 
                args.sheet_size,
 
                args.show_totals,
 
                args.add_totals,
 
                args.txn_filter,
 
            )
 
    except ValueError as error:
 
        logger.error("%s: %r", *error.args)
 
        return 2
 
    report.write(postings)
 
    if not any(report.account_groups.values()):
 
        logger.warning("no matching postings found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 
        returncode |= cliutil.ReturnFlag.NOTHING_TO_REPORT
 

	
 
    if args.output_file is None:
 
        out_dir_path = config.repository_path() or Path()
 
        args.output_file = out_dir_path / '{}Report_{}_{}.ods'.format(
 
            report.report_name,
 
            args.start_date.isoformat(),
 
            args.stop_date.isoformat(),
 
        )
 
        logger.info("Writing report to %s", args.output_file)
 
    ods_file = cliutil.bytes_output(args.output_file, stdout)
 
    report.save_file(ods_file)
 
    return 0 if returncode == 0 else 16 + returncode
 
    return cliutil.ReturnFlag.returncode(returncode)
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/tools/opening_balances.py
Show inline comments
...
 
@@ -121,52 +121,48 @@ class Posting(data.Posting):
 
            return units.currency  # type:ignore[unreachable]
 
        else:
 
            return f'{units.currency} {cost.currency} {cost.date.isoformat()}'
 

	
 
    @classmethod
 
    def build_opening(
 
            cls,
 
            key: AccountWithFund,
 
            meta_key: MetaKey,
 
            inventory: Inventory,
 
    ) -> Iterator[bc_data.Posting]:
 
        account, project = key
 
        if project is None:
 
            meta: Optional[Dict[MetaKey, MetaValue]] = None
 
        else:
 
            meta = {meta_key: project}
 
        for units, cost in sorted(inventory, key=cls._position_sortkey):
 
            if cost is None:
 
                units = quantize_amount(units)
 
            yield bc_data.Posting(
 
                account, units, cost, None, None, copy.copy(meta),
 
            )
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        '--fund-metadata-key', '-m',
 
        metavar='KEY',
 
        dest='meta_key',
 
        default='project',
 
        help="""Name of the fund metadata key. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        '--unrestricted-fund', '-u',
 
        metavar='PROJECT',
 
        default='Conservancy',
 
        help="""Name of the unrestricted fund. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        'as_of_date',
 
        metavar='YEAR_OR_DATE',
 
        type=cliutil.year_or_date_arg,
 
        nargs='?',
 
        help="""Date to generate opening balances for. You can provide just
 
a year to generate balances for the start of that fiscal year. Defaults to the
...
 
@@ -178,49 +174,49 @@ def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    fy = config.fiscal_year_begin()
 
    if args.as_of_date is None:
 
        args.as_of_date = fy.for_date()
 
    if isinstance(args.as_of_date, int):
 
        args.as_of_date = fy.first_date(args.as_of_date)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    else:
 
        entries, load_errors, _ = books_loader.load_fy_range(0, args.as_of_date)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
        returncode |= cliutil.ReturnFlag.LOAD_ERRORS
 

	
 
    inventories: Mapping[AccountWithFund, Inventory] = collections.defaultdict(Inventory)
 
    for post in Posting.from_entries(entries):
 
        if post.meta.date >= args.as_of_date:
 
            continue
 
        account = post.account
 
        fund_acct_match = post.account.is_under(*FUND_ACCOUNTS)
 
        is_equity = account.root_part() in EQUITY_ACCOUNTS
 
        if fund_acct_match is None:
 
            project: MetaValue = None
 
        else:
 
            project = post.meta.get(args.meta_key)
 
            if project is None:
 
                bc_printer.print_error(Error(
 
                    post.meta, "no fund specified", post.meta.txn,
 
                ), file=stderr)
 
                project = args.unrestricted_fund
 
            if is_equity:
 
                if project == args.unrestricted_fund:
 
                    account = UNRESTRICTED_ACCOUNT
 
                else:
 
                    account = RESTRICTED_ACCOUNT
 
        inventory = inventories[AccountWithFund(account, project)]
 
        if is_equity:
...
 
@@ -230,30 +226,30 @@ def main(arglist: Optional[Sequence[str]]=None,
 

	
 
    opening_date = args.as_of_date - datetime.timedelta(1)
 
    opening = bc_data.Transaction(  # type:ignore[operator]
 
        None,  # meta
 
        opening_date,
 
        '*',
 
        None,  # payee
 
        f"Opening balances for FY{fy.for_date(args.as_of_date)}",
 
        frozenset(),  # tags
 
        frozenset(),  # links
 
        [post
 
         for key in sorted(inventories, key=AccountWithFund.sortkey)
 
         for post in Posting.build_opening(key, args.meta_key, inventories[key])
 
        ])
 
    balance = Balance(get_cost(get_position(post))
 
                      for post in opening.postings)
 
    for amount in balance.clean_copy().values():
 
        opening.postings.append(bc_data.Posting(
 
            UNRESTRICTED_ACCOUNT, quantize_amount(-amount), None, None, None,
 
            {args.meta_key: args.unrestricted_fund},
 
        ))
 
    dcontext = bc_dcontext.DisplayContext()
 
    dcontext.set_commas(True)
 
    bc_printer.print_entry(opening, dcontext, file=stdout)
 
    return 0 if returncode == 0 else 16 + returncode
 
    return cliutil.ReturnFlag.returncode(returncode)
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
tests/test_reports_accrual.py
Show inline comments
...
 
@@ -754,48 +754,48 @@ def test_main_balance_report_because_no_rt_id():
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        rf'\b{re.escape(invoice)}:$',
 
        r'^\s+-50\.00 USD outstanding since 2010-06-20$',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    [],
 
    ['entity=Lawyer'],
 
])
 
def test_main_aging_report(arglist):
 
    if arglist:
 
        recv_rows = [row for row in AGING_AR if 'Lawyer' in row.entity]
 
        pay_rows = [row for row in AGING_AP if 'Lawyer' in row.entity]
 
    else:
 
        recv_rows = AGING_AR
 
        pay_rows = AGING_AP
 
    retcode, output, errors = run_main(arglist, out_type=io.BytesIO)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_aging_ods(output, datetime.date.today(), recv_rows, pay_rows)
 

	
 
def test_main_no_books():
 
    errors = check_main_fails([], testutil.TestConfig(), 1 | 8)
 
    errors = check_main_fails([], testutil.TestConfig(), 1 | 2)
 
    testutil.check_lines_match(iter(errors), [
 
        r':[01]: +no books to load in configuration\b',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['499'],
 
    ['505/99999'],
 
    ['-t', 'balance', 'entity=NonExistent'],
 
])
 
def test_main_no_matches(arglist, caplog):
 
    check_main_fails(arglist, None, 8)
 
    check_main_fails(arglist, None, 2)
 
    testutil.check_logs_match(caplog, [
 
        ('WARNING', 'no matching entries found to report'),
 
    ])
 

	
 
def test_main_no_rt(caplog):
 
    config = testutil.TestConfig(
 
        books_path=testutil.test_path('books/accruals.beancount'),
 
    )
 
    check_main_fails(['-t', 'out'], config, 4)
 
    check_main_fails(['-t', 'out'], config, 16)
 
    testutil.check_logs_match(caplog, [
 
        ('ERROR', 'unable to generate outgoing report: RT client is required'),
 
    ])
tests/test_reports_fund.py
Show inline comments
...
 
@@ -259,26 +259,26 @@ def run_main(out_type, arglist, config=None):
 
def test_text_report(project, start_date, stop_date):
 
    retcode, output, errors = run_main(io.StringIO, [
 
        '-b', start_date.isoformat(), '-e', stop_date.isoformat(), project,
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_text_report(output, project, start_date, stop_date)
 

	
 
@pytest.mark.parametrize('start_date,stop_date', [
 
    (START_DATE, STOP_DATE),
 
    (MID_DATE, STOP_DATE),
 
    (START_DATE, MID_DATE),
 
])
 
def test_ods_report(start_date, stop_date):
 
    retcode, output, errors = run_main(io.BytesIO, [
 
        '--begin', start_date.isoformat(), '--end', stop_date.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    check_ods_report(ods, start_date, stop_date)
 

	
 
def test_main_no_postings(caplog):
 
    retcode, output, errors = run_main(io.StringIO, ['NonexistentProject'])
 
    assert retcode == 24
 
    assert retcode == 18
 
    assert any(log.levelname == 'WARNING' for log in caplog.records)
tests/test_reports_ledger.py
Show inline comments
...
 
@@ -536,26 +536,26 @@ def test_main_cash_report(ledger_entries, flag):
 
        '-b', START_DATE.isoformat(),
 
        '-e', STOP_DATE.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    postings = data.Posting.from_entries(ledger_entries)
 
    for account, expected in ExpectedPostings.group_by_account(postings):
 
        if account == 'Assets:Checking' or account == 'Assets:PayPal':
 
            expected.check_txn_report(ods, txn_filter, START_DATE, STOP_DATE)
 
        else:
 
            expected.check_not_in_report(ods)
 

	
 
@pytest.mark.parametrize('arg', [
 
    'Assets:NoneSuchBank',
 
    'Funny money',
 
])
 
def test_main_invalid_account(caplog, arg):
 
    retcode, output, errors = run_main(['-a', arg])
 
    assert retcode == 2
 
    assert any(log.message.endswith(f': {arg!r}') for log in caplog.records)
 

	
 
def test_main_no_postings(caplog):
 
    retcode, output, errors = run_main(['NonexistentProject'])
 
    assert retcode == 24
 
    assert retcode == 18
 
    assert any(log.levelname == 'WARNING' for log in caplog.records)
0 comments (0 inline, 0 general)