Changeset - 95f7524b000e
[Not reviewed]
0 4 0
Brett Smith - 4 years ago 2021-02-24 20:31:23
brettcsmith@brettcsmith.org
cliutil: Add ExtendAction.

This is a user affordance we've wanted for a while, and
query-report *really* wants it.
4 files changed with 85 insertions and 12 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/cliutil.py
Show inline comments
 
"""cliutil - Utilities for CLI tools"""
 
PKGNAME = 'conservancy_beancount'
 
LICENSE = """
 
Copyright © 2020, 2021  Brett Smith and other contributors
 

	
 
This program is free software: you can redistribute it and/or modify it.
 
Refer to the LICENSE.txt that came with the software for details.
 

	
 
This program is distributed in the hope that it will be useful,
 
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE."""
 

	
 
import argparse
 
import datetime
 
import enum
 
import io
 
import logging
 
import operator
 
import os
 
import pkg_resources
 
import re
 
import signal
 
import subprocess
 
import sys
 
import traceback
 
import types
 

	
 
from pathlib import Path
 

	
 
import rt.exceptions as rt_error
 
import yaml
 

	
 
from . import data
 
from . import errors
 
from . import filters
 
from . import rtutil
 

	
 
from typing import (
 
    cast,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    Container,
 
    Generic,
 
    Hashable,
 
    IO,
 
    Iterable,
 
    Iterator,
 
    List,
 
    NamedTuple,
 
    NoReturn,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Type,
 
    TypeVar,
 
    Union,
 
)
 
from .beancount_types import (
 
    MetaKey,
 
)
 

	
 
ET = TypeVar('ET', bound=enum.Enum)
 
OutputFile = Union[int, IO]
 

	
 
CPU_COUNT = len(os.sched_getaffinity(0))
 
STDSTREAM_PATH = Path('-')
 
VERSION = pkg_resources.require(PKGNAME)[0].version
 

	
 
class EnumArgument(Generic[ET]):
 
    """Wrapper class to use an enum as argument values
 

	
 
    Use this class when the user can choose one of some arbitrary enum names
 
    as an argument. It will let user abbreviate and use any case, and will
 
    return the correct value as long as it's unambiguous. Typical usage
 
    looks like::
 

	
 
        enum_arg = EnumArgument(Enum)
 
        arg_parser.add_argument(
 
          '--choice',
 
          type=enum_arg.enum_type,  # or .value_type
 
          help=f"Choices are {enum_arg.choices_str()}",
 
 
        )
 
    """
 
    # I originally wrote this as a mixin class, to eliminate the need for the
 
    # explicit wrapping in the example above. But Python 3.6 doesn't really
 
    # support mixins with Enums; see <https://bugs.python.org/issue29577>.
 
    # This functionality could be moved to a mixin when we drop support for
 
    # Python 3.6.
 

	
 
    def __init__(self, base: Type[ET]) -> None:
 
        self.base = base
 

	
 
    def enum_type(self, arg: str) -> ET:
 
        """Return a single enum whose name matches the user argument"""
 
        regexp = re.compile(re.escape(arg), re.IGNORECASE)
...
 
@@ -150,96 +151,132 @@ class ExceptHook:
 
            error_type = "RT access denied"
 
        elif isinstance(exc_value, rt_error.ConnectionError):
 
            exitcode = os.EX_TEMPFAIL
 
            error_type = "RT connection error"
 
        elif isinstance(exc_value, rt_error.RtError):
 
            exitcode = os.EX_UNAVAILABLE
 
            error_type = f"RT {error_type}"
 
        elif isinstance(exc_value, errors.RewriteRuleError):
 
            exitcode = ExitCode.RewriteRulesError
 
            msg = str(exc_value)
 
            if exc_value.source is not None:
 
                msg += f"\n\n source: {yaml.safe_dump(exc_value.source)}"
 
        elif isinstance(exc_value, OSError):
 
            if exc_value.filename is None:
 
                exitcode = os.EX_OSERR
 
                error_type = "OS error"
 
                msg = exc_value.strerror
 
            else:
 
                # There are more specific exit codes for input problems vs.
 
                # output problems, but without knowing how the file was
 
                # intended to be used, we can't use them.
 
                exitcode = os.EX_IOERR
 
                error_type = "I/O error"
 
                msg = f"{exc_value.filename}: {exc_value.strerror}"
 
        else:
 
            exitcode = os.EX_SOFTWARE
 
            error_type = f"internal {error_type}"
 
        self.logger.critical("%s%s%s", error_type, ": " if msg else "", msg)
 
        self.logger.debug(
 
            ''.join(traceback.format_exception(exc_type, exc_value, exc_tb)),
 
        )
 
        raise SystemExit(exitcode)
 

	
 

	
 
class ExitCode(enum.IntEnum):
 
    # BSD exit codes commonly used
 
    NoConfiguration = os.EX_CONFIG
 
    NoConfig = NoConfiguration
 
    NoDataFiltered = os.EX_DATAERR
 
    NoDataLoaded = os.EX_NOINPUT
 
    OK = os.EX_OK
 
    Ok = OK
 
    RewriteRulesError = os.EX_DATAERR
 

	
 
    # Our own exit codes, working down from that range
 
    BeancountErrors = 63
 

	
 

	
 
class ExtendAction(argparse.Action):
 
    """argparse action to let a user build a list from a string
 

	
 
    This is a fancier version of argparse's built-in ``action='append'``.
 
    The user's input is turned into a list of strings, split by a regexp
 
    pattern you provide. Typical usage looks like::
 

	
 
        parser = argparse.ArgumentParser()
 
        parser.add_argument(
 
          '--option', ...,
 
          action=ExtendAction,
 
          const=regexp_pattern,  # default is r'\s*,\s*'
 
          ...,
 
        )
 
    """
 
    DEFAULT_PATTERN = r'\s*,\s*'
 

	
 
    def __call__(self,
 
                 parser: argparse.ArgumentParser,
 
                 namespace: argparse.Namespace,
 
                 values: Union[Sequence[Any], str, None]=None,
 
                 option_string: Optional[str]=None,
 
    ) -> None:
 
        pattern: str = self.const or self.DEFAULT_PATTERN
 
        value: Optional[List[str]] = getattr(namespace, self.dest, None)
 
        if value is None:
 
            value = []
 
            setattr(namespace, self.dest, value)
 
        if values is None:
 
            values = []
 
        elif isinstance(values, str):
 
            values = [values]
 
        for s in values:
 
            value.extend(re.split(pattern, s))
 

	
 

	
 
class InfoAction(argparse.Action):
 
    def __call__(self,
 
                 parser: argparse.ArgumentParser,
 
                 namespace: argparse.Namespace,
 
                 values: Union[Sequence[Any], str, None]=None,
 
                 option_string: Optional[str]=None,
 
    ) -> NoReturn:
 
        if isinstance(self.const, str):
 
            info = self.const
 
            exitcode = 0
 
        else:
 
            info, exitcode = self.const
 
        print(info)
 
        raise SystemExit(exitcode)
 

	
 

	
 
class LogLevel(enum.IntEnum):
 
    DEBUG = logging.DEBUG
 
    INFO = logging.INFO
 
    WARNING = logging.WARNING
 
    ERROR = logging.ERROR
 
    CRITICAL = logging.CRITICAL
 
    WARN = WARNING
 
    ERR = ERROR
 
    CRIT = CRITICAL
 

	
 
    def _choices_sortkey(self) -> Hashable:
 
        return self.value
 

	
 

	
 
class SearchTerm(NamedTuple):
 
    """NamedTuple representing a user's metadata filter
 

	
 
    SearchTerm knows how to parse and store posting metadata filters provided
 
    by the user in `key=value` format. Reporting tools can use this to filter
 
    postings that match the user's criteria, to report on subsets of the books.
 

	
 
    Typical usage looks like::
 

	
 
      argument_parser.add_argument(
 
        'search_terms',
 
        type=SearchTerm.arg_parser(),
 
        …,
 
      )
 

	
 
      args = argument_parser.parse_args(…)
 
      for query in args.search_terms:
 
        postings = query.filter_postings(postings)
conservancy_beancount/reconcile/statement.py
Show inline comments
...
 
@@ -293,100 +293,100 @@ class StatementReconciliation(core.BaseODS[data.Posting, data.Account]):
 
    def write(self, rows: Iterable[data.Posting]) -> None:
 
        acct_posts = dict(core.RelatedPostings.group_by_account(
 
            post for post in rows
 
            if post.meta.date < self.post_range.stop
 
            and post.account.is_under(*self.accounts)
 
        ))
 
        for account in self.accounts:
 
            try:
 
                postings = acct_posts[account]
 
            except KeyError:
 
                postings = core.RelatedPostings()
 
            statement_metakey = self._get_metakey(account, 'statement', 'statement')
 
            id_metakey = self._get_metakey(account, 'id', 'lineno')
 
            open_balance = core.Balance()
 
            for post, open_balance in postings.iter_with_balance():
 
                if post.meta.date >= self.rec_range.start:
 
                    break
 
            after_posts = self._date_range_posts(postings, self.post_range)
 
            self.start_section(account)
 
            self.write_pre_posts(postings, statement_metakey, id_metakey)
 
            self.write_rec_posts(postings, statement_metakey, id_metakey, open_balance)
 
            self.write_posts(after_posts, statement_metakey, id_metakey)
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        required=True,
 
        help="""Date to start reconciliation, inclusive, in YYYY-MM-DD format.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to end reconciliation, exclusive, in YYYY-MM-DD format.
 
The default is one month after the start date.
 
""")
 
    parser.add_argument(
 
        '--account', '-a',
 
        dest='accounts',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""Reconcile this account. You can specify this option
 
multiple times. You can specify a part of the account hierarchy, or an account
 
classification from metadata. Default adapts to your search criteria.
 
        action=cliutil.ExtendAction,
 
        help="""Reconcile this account. You can specify multiple
 
comma-separated accounts or classifications, and/or specify this option
 
multiple times. Default adapts to your search criteria.
 
""")
 
    parser.add_argument(
 
        '--id-metadata-key', '-i',
 
        metavar='METAKEY',
 
        help="""Show the named metadata as a posting identifier.
 
Default varies by account.
 
""")
 
    parser.add_argument(
 
        '--statement-metadata-key', '-s',
 
        metavar='METAKEY',
 
        help="""Use the named metadata to determine which postings have already
 
been reconciled. Default varies by account.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is `ReconciliationReport_<StartDate>_<StopDate>.ods`.
 
""")
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser(),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match.
 
""")
 
    args = parser.parse_args(arglist)
 
    if not args.accounts:
 
        if any(term.meta_key == 'payroll-type' for term in args.search_terms):
 
            args.accounts = ['Expenses:Payroll']
 
        else:
 
            args.accounts = ['Cash']
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
conservancy_beancount/reports/ledger.py
Show inline comments
...
 
@@ -694,119 +694,122 @@ def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace
 
disbursements report.
 
""")
 
    parser.add_argument(
 
        '--receipts',
 
        action=CashReportAction,
 
        const=ReportType.CREDIT_TRANSACTIONS,
 
        nargs=0,
 
        help="""Shortcut to set all the necessary options to generate a cash
 
receipts report.
 
""")
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date, or 30 days from today if the start
 
date was also not specified.
 
""")
 
    report_type = cliutil.EnumArgument(ReportType)
 
    report_type_default = ReportType.FULL_LEDGER
 
    report_type_arg = parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='TYPE',
 
        type=report_type.enum_type,
 
        default=report_type_default,
 
        help=f"""The type of report to generate. Choices are
 
{report_type.choices_str()}. Default is {report_type_default.name.lower()!r}.
 
""")
 
    # --transactions got merged into --report-type; this is backwards compatibility.
 
    parser.add_argument(
 
        '--transactions',
 
        dest=report_type_arg.dest,
 
        type=cast(Callable, report_type_arg.type),
 
        help=argparse.SUPPRESS,
 
    )
 
    parser.add_argument(
 
        '--account', '-a',
 
        dest='accounts',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""Show this account in the report. You can specify this option
 
        action=cliutil.ExtendAction,
 
        help="""Show this account in the report. You can specify multiple
 
comma-separated accounts or classifications, and/or specify this option
 
multiple times. You can specify a part of the account hierarchy, or an account
 
classification from metadata. If not specified, the default set adapts to your
 
search criteria.
 
""")
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--show-totals', '-S',
 
        metavar='ACCOUNT',
 
        action='append',
 
        action=cliutil.ExtendAction,
 
        help="""When entries for this account appear in the report, include
 
account balance(s) as well. You can specify this option multiple times. Pass in
 
a part of the account hierarchy. The default is all accounts.
 
account balance(s) as well. You can specify multiple comma-separated parts of
 
the account hierarchy, and/or specify this option multiple times.
 
The default is all accounts.
 
""")
 
    parser.add_argument(
 
        '--add-totals', '-T',
 
        metavar='ACCOUNT',
 
        action='append',
 
        action=cliutil.ExtendAction,
 
        help="""When an account could be included in the report but does not
 
have any entries in the date range, include a header and account balance(s) for
 
it. You can specify this option multiple times. Pass in a part of the account
 
hierarchy. The default set adapts to your search criteria.
 
it. You can specify multiple comma-separated parts of the account hierarchy,
 
and/or specify this option multiple times. The default set adapts to your
 
search criteria.
 
""")
 
    parser.add_argument(
 
        '--sheet-size', '--size',
 
        metavar='SIZE',
 
        type=int,
 
        default=LedgerODS.SHEET_SIZE,
 
        help="""Try to limit sheets to this many rows. The report will
 
automatically create new sheets to make this happen. When that's not possible,
 
it will issue a warning.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is `LedgerReport_<StartDate>_<StopDate>.ods`.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.add_totals is None and args.search_terms:
 
        args.add_totals = []
 
    if args.accounts is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.accounts = [
 
                'Income',
 
                'Expenses',
 
                'Equity',
 
                'Assets:Receivable',
 
                'Assets:Prepaid',
 
                'Liabilities:UnearnedIncome',
 
                'Liabilities:Payable',
 
            ]
 
        else:
 
            args.accounts = list(LedgerODS.ACCOUNT_COLUMNS)
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
tests/test_cliutil.py
Show inline comments
...
 
@@ -253,48 +253,81 @@ def test_version_argument(argparser, capsys, arg):
 
    (datetime.date(2012, 2, 29), 2, datetime.date(2014, 3, 1)),
 
    (datetime.date(2012, 2, 29), 4, datetime.date(2016, 2, 29)),
 
    (datetime.date(2012, 2, 29), -2, datetime.date(2010, 2, 28)),
 
    (datetime.date(2012, 2, 29), -4, datetime.date(2008, 2, 29)),
 
    (datetime.date(2010, 3, 1), 1, datetime.date(2011, 3, 1)),
 
    (datetime.date(2010, 3, 1), 2, datetime.date(2012, 3, 1)),
 
    (datetime.date(2010, 3, 1), -1, datetime.date(2009, 3, 1)),
 
    (datetime.date(2010, 3, 1), -2, datetime.date(2008, 3, 1)),
 
])
 
def test_diff_year(date, diff, expected):
 
    assert cliutil.diff_year(date, diff) == expected
 

	
 
@pytest.mark.parametrize('cmd,expected', [
 
    (['true'], True),
 
    (['true', '--version'], True),
 
    (['false'], False),
 
    (['false', '--version'], False),
 
    ([str(testutil.TESTS_DIR)], False),
 
])
 
def test_can_run(cmd, expected):
 
    assert cliutil.can_run(cmd) == expected
 

	
 
@pytest.mark.parametrize('name,choice', ArgChoices.__members__.items())
 
def test_enum_arg_enum_type(arg_enum, name, choice):
 
    assert arg_enum.enum_type(name.lower()) is choice
 
    assert arg_enum.enum_type(choice.value) is choice
 

	
 
@pytest.mark.parametrize('arg', 'az\0')
 
def test_enum_arg_no_enum_match(arg_enum, arg):
 
    with pytest.raises(ValueError):
 
        arg_enum.enum_type(arg)
 

	
 
@pytest.mark.parametrize('name,choice', ArgChoices.__members__.items())
 
def test_enum_arg_value_type(arg_enum, name, choice):
 
    assert arg_enum.value_type(name.lower()) == choice.value
 
    assert arg_enum.value_type(choice.value) == choice.value
 

	
 
@pytest.mark.parametrize('arg', 'az\0')
 
def test_enum_arg_no_value_match(arg_enum, arg):
 
    with pytest.raises(ValueError):
 
        arg_enum.value_type(arg)
 

	
 
def test_enum_arg_choices_str_defaults(arg_enum):
 
    assert arg_enum.choices_str() == ', '.join(repr(c.value) for c in ArgChoices)
 

	
 
def test_enum_arg_choices_str_args(arg_enum):
 
    sep = '/'
 
    assert arg_enum.choices_str(sep, '{}') == sep.join(c.value for c in ArgChoices)
 

	
 
@pytest.mark.parametrize('values,sep', testutil.combine_values(
 
    [['foo'], ['bar', 'baz'], ['qu', 'quu', 'quux']],
 
    [',', ', ', '  ,', ' ,  '],
 
))
 
def test_extend_action_once(values, sep):
 
    action = cliutil.ExtendAction(['-t'], 'result')
 
    args = argparse.Namespace()
 
    action(None, args, sep.join(values), '-t')
 
    assert args.result == values
 

	
 
def test_extend_action_multi():
 
    action = cliutil.ExtendAction(['-t'], 'result')
 
    args = argparse.Namespace()
 
    action(None, args, 'foo,bar', '-t')
 
    action(None, args, 'baz, quux', '-t')
 
    assert args.result == ['foo', 'bar', 'baz', 'quux']
 

	
 
def test_extend_action_from_default():
 
    action = cliutil.ExtendAction(['-t'], 'result')
 
    args = argparse.Namespace(result=['foo'])
 
    action(None, args, 'bar , baz', '-t')
 
    assert args.result == ['foo', 'bar', 'baz']
 

	
 
@pytest.mark.parametrize('pattern,expected', [
 
    (',', ['foo', ' bar']),
 
    (r'\s+', ['foo,', 'bar']),
 
])
 
def test_extend_action_custom_pattern(pattern, expected):
 
    action = cliutil.ExtendAction(['-t'], 'result', const=pattern)
 
    args = argparse.Namespace()
 
    action(None, args, 'foo, bar', '-t')
 
    assert args.result == expected
0 comments (0 inline, 0 general)