Changeset - cd578289c463
[Not reviewed]
0 6 0
Brett Smith - 4 years ago 2020-06-12 19:08:08
brettcsmith@brettcsmith.org
cliutil: Add make_entry_point() function.

This provides better logging setup, reduces the amount of boilerplate in
main, and replaces is_main_script().
6 files changed with 60 insertions and 43 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/cliutil.py
Show inline comments
 
"""cliutil - Utilities for CLI tools"""
 
PKGNAME = 'conservancy_beancount'
 
LICENSE = """
 
Copyright © 2020  Brett Smith
 

	
 
This program is free software: you can redistribute it and/or modify
 
it under the terms of the GNU Affero General Public License as published by
 
the Free Software Foundation, either version 3 of the License, or
 
(at your option) any later version.
 

	
 
This program is distributed in the hope that it will be useful,
 
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
GNU Affero General Public License for more details.
 

	
 
You should have received a copy of the GNU Affero General Public License
 
along with this program.  If not, see <https://www.gnu.org/licenses/>."""
 

	
 
import argparse
 
import enum
 
import inspect
 
import io
 
import logging
 
import operator
 
import os
 
import pkg_resources
 
import re
 
import signal
 
import sys
 
import traceback
 
import types
 

	
 
from pathlib import Path
 

	
 
from . import data
 
from . import filters
 
from . import rtutil
 

	
 
from typing import (
 
    cast,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    IO,
 
    Iterable,
...
 
@@ -211,71 +210,92 @@ class SearchTerm(NamedTuple):
 

	
 

	
 
def add_loglevel_argument(parser: argparse.ArgumentParser,
 
                          default: LogLevel=LogLevel.INFO) -> argparse.Action:
 
    return parser.add_argument(
 
        '--loglevel',
 
        metavar='LEVEL',
 
        default=default.value,
 
        type=LogLevel.from_arg,
 
        help="Show logs at this level and above."
 
        f" Specify one of {', '.join(LogLevel.choices())}."
 
        f" Default {default.name.lower()}.",
 
    )
 

	
 
def add_version_argument(parser: argparse.ArgumentParser) -> argparse.Action:
 
    progname = parser.prog or sys.argv[0]
 
    return parser.add_argument(
 
        '--version', '--copyright', '--license',
 
        action=InfoAction,
 
        nargs=0,
 
        const=f"{progname} version {VERSION}\n{LICENSE}",
 
        help="Show program version and license information",
 
    )
 

	
 
def is_main_script(prog_name: str) -> bool:
 
    """Return true if the caller is the "main" program."""
 
    stack = iter(inspect.stack(context=False))
 
    next(stack)  # Discard the frame for calling this function
 
    caller_filename = next(stack).filename
 
    return all(frame.filename == caller_filename
 
               or Path(frame.filename).stem == prog_name
 
               for frame in stack)
 
def make_entry_point(mod_name: str, prog_name: str=sys.argv[0]) -> Callable[[], int]:
 
    """Create an entry_point function for a tool
 

	
 
    The returned function is suitable for use as an entry_point in setup.py.
 
    It sets up the root logger and excepthook, then calls the module's main
 
    function.
 
    """
 
    def entry_point():  # type:ignore
 
        prog_mod = sys.modules[mod_name]
 
        setup_logger()
 
        prog_mod.logger = logging.getLogger(prog_name)
 
        sys.excepthook = ExceptHook(prog_mod.logger)
 
        return prog_mod.main()
 
    return entry_point
 

	
 
def setup_logger(logger: Union[str, logging.Logger]='',
 
                 loglevel: int=logging.INFO,
 
                 stream: TextIO=sys.stderr,
 
                 fmt: str='%(name)s: %(levelname)s: %(message)s',
 
) -> logging.Logger:
 
    """Set up a logger with a StreamHandler with the given format"""
 
    if isinstance(logger, str):
 
        logger = logging.getLogger(logger)
 
    formatter = logging.Formatter(fmt)
 
    handler = logging.StreamHandler(stream)
 
    handler.setFormatter(formatter)
 
    logger.addHandler(handler)
 
    logger.setLevel(loglevel)
 
    return logger
 

	
 
def set_loglevel(logger: logging.Logger, loglevel: int=logging.INFO) -> None:
 
    """Set the loglevel for a tool or module
 

	
 
    If the given logger is not under a hierarchy, this function sets the
 
    loglevel for the root logger, along with some specific levels for libraries
 
    used by reporting tools. Otherwise, it's the same as
 
    ``logger.setLevel(loglevel)``.
 
    """
 
    if '.' not in logger.name:
 
        logger = logging.getLogger()
 
        if loglevel <= logging.DEBUG:
 
            # At the debug level, the rt module logs the full body of every
 
            # request and response. That's too much.
 
            logging.getLogger('rt.rt').setLevel(logging.INFO)
 
    logger.setLevel(loglevel)
 

	
 
def bytes_output(path: Optional[Path]=None,
 
                 default: OutputFile=sys.stdout,
 
                 mode: str='w',
 
) -> BinaryIO:
 
    """Get a file-like object suitable for binary output
 

	
 
    If ``path`` is ``None`` or ``-``, returns a file-like object backed by
 
    ``default``. If ``default`` is a file descriptor or text IO object, this
 
    method returns a file-like object that writes to the same place.
 

	
 
    Otherwise, returns ``path.open(mode)``.
 
    """
 
    mode = f'{mode}b'
 
    if path is None or path == STDSTREAM_PATH:
 
        if isinstance(default, int):
 
            retval = open(default, mode)
 
        elif isinstance(default, TextIO):
 
            retval = default.buffer
 
        else:
 
            retval = default
 
    else:
 
        retval = path.open(mode)
 
    return cast(BinaryIO, retval)
 

	
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -642,54 +642,50 @@ The default is stdout for the balance and outgoing reports, and a generated
 
filename for other reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None and not args.search_terms:
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    if cliutil.is_main_script(PROGNAME):
 
        global logger
 
        logger = logging.getLogger(PROGNAME)
 
        sys.excepthook = cliutil.ExceptHook(logger)
 
    args = parse_arguments(arglist)
 
    cliutil.setup_logger(logger, args.loglevel, stderr)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    elif args.report_type is ReportType.AGING:
 
        entries, load_errors, _ = books_loader.load_all()
 
    else:
 
        entries, load_errors, _ = books_loader.load_all(args.since)
 
    filters.remove_opening_balance_txn(entries)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 

	
 
    postings = list(filter_search(
 
        data.Posting.from_entries(entries), args.search_terms,
 
    ))
 
    if not postings:
 
        logger.warning("no matching entries found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 
    # groups is a mapping of metadata value strings to AccrualPostings.
...
 
@@ -732,26 +728,28 @@ def main(arglist: Optional[Sequence[str]]=None,
 
        else:
 
            now = datetime.datetime.now()
 
            if args.output_file is None:
 
                args.output_file = Path(now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods'))
 
                logger.info("Writing report to %s", args.output_file)
 
            out_bin = cliutil.bytes_output(args.output_file, stdout)
 
            report = AgingReport(rt_wrapper, out_bin)
 
    elif args.report_type is ReportType.OUTGOING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate outgoing report: RT client is required")
 
        else:
 
            out_file = cliutil.text_output(args.output_file, stdout)
 
            report = OutgoingReport(rt_wrapper, out_file)
 
    else:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = BalanceReport(out_file)
 

	
 
    if report is None:
 
        returncode |= ReturnFlag.REPORT_ERRORS
 
    else:
 
        report.run(groups)
 
    return 0 if returncode == 0 else 16 + returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(main())
 
    exit(entry_point())
setup.py
Show inline comments
...
 
@@ -14,28 +14,28 @@ setup(
 
        'babel>=2.6',  # Debian:python3-babel
 
        'beancount>=2.2',  # Debian:beancount
 
        # 1.4.1 crashes when trying to save some documents.
 
        'odfpy>=1.4.0,!=1.4.1',  # Debian:python3-odf
 
        'PyYAML>=3.0',  # Debian:python3-yaml
 
        'regex',  # Debian:python3-regex
 
        'rt>=2.0',
 
    ],
 
    setup_requires=[
 
        'pytest-mypy',
 
        'pytest-runner',  # Debian:python3-pytest-runner
 
    ],
 
    tests_require=[
 
        'mypy>=0.770',  # Debian:python3-mypy
 
        'pytest',  # Debian:python3-pytest
 
    ],
 

	
 
    packages=[
 
        'conservancy_beancount',
 
        'conservancy_beancount.plugin',
 
        'conservancy_beancount.reports',
 
    ],
 
    entry_points={
 
        'console_scripts': [
 
            'accrual-report = conservancy_beancount.reports.accrual:main',
 
            'accrual-report = conservancy_beancount.reports.accrual:entry_point',
 
        ],
 
    },
 
)
tests/test_cliutil.py
Show inline comments
...
 
@@ -12,53 +12,48 @@
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import errno
 
import io
 
import inspect
 
import logging
 
import os
 
import re
 
import sys
 
import traceback
 

	
 
import pytest
 

	
 
from pathlib import Path
 

	
 
from conservancy_beancount import cliutil
 

	
 
FILE_NAMES = ['-foobar', '-foo.bin']
 
STREAM_PATHS = [None, Path('-')]
 

	
 
class AlwaysEqual:
 
    def __eq__(self, other):
 
        return True
 

	
 

	
 
class MockTraceback:
 
    def __init__(self, stack=None, index=0):
 
        if stack is None:
 
            stack = inspect.stack(context=False)
 
        self._stack = stack
 
        self._index = index
 
        frame_record = self._stack[self._index]
 
        self.tb_frame = frame_record.frame
 
        self.tb_lineno = frame_record.lineno
 

	
 
    @property
 
    def tb_next(self):
 
        try:
 
            return type(self)(self._stack, self._index + 1)
 
        except IndexError:
 
            return None
 

	
 

	
 
@pytest.fixture(scope='module')
 
def argparser():
 
    parser = argparse.ArgumentParser(prog='test_cliutil')
 
    cliutil.add_loglevel_argument(parser)
 
    cliutil.add_version_argument(parser)
 
    return parser
...
 
@@ -120,67 +115,59 @@ def test_excepthook_oserror(errnum, caplog):
 
@pytest.mark.parametrize('exc_type', [
 
    AttributeError,
 
    RuntimeError,
 
    ValueError,
 
])
 
def test_excepthook_bug(exc_type, caplog):
 
    error = exc_type("test message")
 
    with pytest.raises(SystemExit) as exc_check:
 
        cliutil.ExceptHook()(exc_type, error, None)
 
    assert exc_check.value.args[0] == 3
 
    assert caplog.records
 
    for log in caplog.records:
 
        assert log.levelname == 'CRITICAL'
 
        assert log.message == f"internal {exc_type.__name__}: {error.args[0]}"
 

	
 
def test_excepthook_traceback(caplog):
 
    error = KeyError('test')
 
    args = (type(error), error, MockTraceback())
 
    caplog.set_level(logging.DEBUG)
 
    with pytest.raises(SystemExit) as exc_check:
 
        cliutil.ExceptHook()(*args)
 
    assert caplog.records
 
    assert caplog.records[-1].message == ''.join(traceback.format_exception(*args))
 

	
 
@pytest.mark.parametrize('prog_name,expected', [
 
    ('', False),
 
    (AlwaysEqual(), True),
 
])
 
def test_is_main_script(prog_name, expected):
 
    assert cliutil.is_main_script(prog_name) == expected
 

	
 
@pytest.mark.parametrize('arg,expected', [
 
    ('debug', logging.DEBUG),
 
    ('info', logging.INFO),
 
    ('warning', logging.WARNING),
 
    ('warn', logging.WARNING),
 
    ('error', logging.ERROR),
 
    ('err', logging.ERROR),
 
    ('critical', logging.CRITICAL),
 
    ('crit', logging.CRITICAL),
 
])
 
def test_loglevel_argument(argparser, arg, expected):
 
    for method in ['lower', 'title', 'upper']:
 
        args = argparser.parse_args(['--loglevel', getattr(arg, method)()])
 
        assert args.loglevel is expected
 

	
 
def test_setup_logger():
 
    stream = io.StringIO()
 
    logger = cliutil.setup_logger(
 
        'test_cliutil', logging.INFO, stream, '%(name)s %(levelname)s: %(message)s',
 
        'test_cliutil', stream, '%(name)s %(levelname)s: %(message)s',
 
    )
 
    logger.debug("test debug")
 
    logger.info("test info")
 
    assert stream.getvalue() == "test_cliutil INFO: test info\n"
 
    logger.critical("test crit")
 
    assert stream.getvalue() == "test_cliutil CRITICAL: test crit\n"
 

	
 
@pytest.mark.parametrize('arg', [
 
    '--license',
 
    '--version',
 
    '--copyright',
 
])
 
def test_version_argument(argparser, capsys, arg):
 
    with pytest.raises(SystemExit) as exc_check:
 
        args = argparser.parse_args(['--version'])
 
    assert exc_check.value.args[0] == 0
 
    stdout, _ = capsys.readouterr()
 
    lines = iter(stdout.splitlines())
 
    assert re.match(r'^test_cliutil version \d+\.\d+\.\d+', next(lines, "<EOF>"))
tests/test_reports_accrual.py
Show inline comments
...
 
@@ -569,54 +569,55 @@ def test_aging_report_does_not_include_too_recent_postings(accrual_postings):
 
    # This date is after the Q3 posting, but too soon after for that to be
 
    # included in the aging report.
 
    date = datetime.date(2010, 10, 1)
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:470'
 
    ), date)
 
    # Date+amount are both from the Q2 posting only.
 
    check_aging_ods(output, date, [
 
        AgingRow.make_simple('2010-06-15', 'GrantCo', 5500, 'rt:470/4700',
 
                             project='Development Grant'),
 
    ], [])
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/accruals.beancount'),
 
            rt_client=RTClient(),
 
        )
 
    output = io.StringIO()
 
    errors = io.StringIO()
 
    retcode = accrual.main(arglist, output, errors, config)
 
    return retcode, output, errors
 

	
 
def check_main_fails(arglist, config, error_flags, error_patterns):
 
def check_main_fails(arglist, config, error_flags):
 
    retcode, output, errors = run_main(arglist, config)
 
    assert retcode > 16
 
    assert (retcode - 16) & error_flags
 
    check_output(errors, error_patterns)
 
    assert not output.getvalue()
 
    errors.seek(0)
 
    return errors
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['--report-type=balance', 'entity=EarlyBird'],
 
    ['--report-type=outgoing', 'entity=EarlyBird'],
 
])
 
def test_output_excludes_payments(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    output.seek(0)
 
    for line in output:
 
        assert not re.match(r'\brt:4\d\b', line)
 

	
 
@pytest.mark.parametrize('arglist,expect_invoice', [
 
    (['40'], 'rt:40/400'),
 
    (['44/440'], 'rt:44/440'),
 
])
 
def test_output_payments_when_only_match(arglist, expect_invoice):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        rf'^{re.escape(expect_invoice)}:$',
 
        r' outstanding since ',
...
 
@@ -665,45 +666,48 @@ def test_main_balance_report_because_no_rt_id():
 
        r'^\s+-50\.00 USD outstanding since 2010-06-20$',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    [],
 
    ['-t', 'aging', 'entity=Lawyer'],
 
])
 
def test_main_aging_report(tmp_path, arglist):
 
    if arglist:
 
        recv_rows = [row for row in AGING_AR if 'Lawyer' in row.entity]
 
        pay_rows = [row for row in AGING_AP if 'Lawyer' in row.entity]
 
    else:
 
        recv_rows = AGING_AR
 
        pay_rows = AGING_AP
 
    output_path = tmp_path / 'AgingReport.ods'
 
    arglist.insert(0, f'--output-file={output_path}')
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    assert not output.getvalue()
 
    with output_path.open('rb') as ods_file:
 
        check_aging_ods(ods_file, None, recv_rows, pay_rows)
 

	
 
def test_main_no_books():
 
    check_main_fails([], testutil.TestConfig(), 1 | 8, [
 
    errors = check_main_fails([], testutil.TestConfig(), 1 | 8)
 
    testutil.check_lines_match(iter(errors), [
 
        r':[01]: +no books to load in configuration\b',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['499'],
 
    ['505/99999'],
 
    ['entity=NonExistent'],
 
])
 
def test_main_no_matches(arglist):
 
    check_main_fails(arglist, None, 8, [
 
        r': WARNING: no matching entries found to report$',
 
def test_main_no_matches(arglist, caplog):
 
    check_main_fails(arglist, None, 8)
 
    testutil.check_logs_match(caplog, [
 
        ('WARNING', 'no matching entries found to report'),
 
    ])
 

	
 
def test_main_no_rt():
 
def test_main_no_rt(caplog):
 
    config = testutil.TestConfig(
 
        books_path=testutil.test_path('books/accruals.beancount'),
 
    )
 
    check_main_fails(['-t', 'out'], config, 4, [
 
        r': ERROR: unable to generate outgoing report: RT client is required\b',
 
    check_main_fails(['-t', 'out'], config, 4)
 
    testutil.check_logs_match(caplog, [
 
        ('ERROR', 'unable to generate outgoing report: RT client is required'),
 
    ])
tests/testutil.py
Show inline comments
...
 
@@ -48,48 +48,56 @@ def _ods_cell_value(cell):
 
    if value_type == 'currency' or value_type == 'float':
 
        return Decimal(cell.getAttribute('value'))
 
    elif value_type == 'date':
 
        return datetime.datetime.strptime(
 
            cell.getAttribute('datevalue'), '%Y-%m-%d',
 
        ).date()
 
    else:
 
        return cell.getAttribute('value')
 

	
 
def _ods_elem_text(elem):
 
    if isinstance(elem, odf.element.Text):
 
        return elem.data
 
    else:
 
        return '\0'.join(_ods_elem_text(child) for child in elem.childNodes)
 

	
 
odf.element.Element.value_type = property(_ods_cell_value_type)
 
odf.element.Element.value = property(_ods_cell_value)
 
odf.element.Element.text = property(_ods_elem_text)
 

	
 
def check_lines_match(lines, expect_patterns, source='output'):
 
    for pattern in expect_patterns:
 
        assert any(re.search(pattern, line) for line in lines), \
 
            f"{pattern!r} not found in {source}"
 

	
 
def check_logs_match(caplog, expected):
 
    records = iter(caplog.records)
 
    for exp_level, exp_msg in expected:
 
        exp_level = exp_level.upper()
 
        assert any(
 
            log.levelname == exp_level and log.message == exp_msg for log in records
 
        ), f"{exp_level} log {exp_msg!r} not found"
 

	
 
def check_post_meta(txn, *expected_meta, default=None):
 
    assert len(txn.postings) == len(expected_meta)
 
    for post, expected in zip(txn.postings, expected_meta):
 
        if not expected:
 
            assert not post.meta
 
        else:
 
            actual = None if post.meta is None else {
 
                key: post.meta.get(key, default) for key in expected
 
            }
 
            assert actual == expected
 

	
 
def combine_values(*value_seqs):
 
    stop = 0
 
    for seq in value_seqs:
 
        try:
 
            stop = max(stop, len(seq))
 
        except TypeError:
 
            pass
 
    return itertools.islice(
 
        zip(*(itertools.cycle(seq) for seq in value_seqs)),
 
        stop,
 
    )
 

	
 
def date_seq(date=FY_MID_DATE, step=1):
0 comments (0 inline, 0 general)