Changeset - cd1b28ae3eb1
[Not reviewed]
0 3 1
Brett Smith - 4 years ago 2020-06-09 13:04:27
brettcsmith@brettcsmith.org
cliutil: Add generalized SearchTerm class.

This makes the same filtering easily available to other reporting tools for
consistency.
4 files changed with 261 insertions and 107 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/cliutil.py
Show inline comments
 
"""cliutil - Utilities for CLI tools"""
 
PKGNAME = 'conservancy_beancount'
 
LICENSE = """
 
Copyright © 2020  Brett Smith
 

	
 
This program is free software: you can redistribute it and/or modify
 
it under the terms of the GNU Affero General Public License as published by
 
the Free Software Foundation, either version 3 of the License, or
 
(at your option) any later version.
 

	
 
This program is distributed in the hope that it will be useful,
 
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
GNU Affero General Public License for more details.
 

	
 
You should have received a copy of the GNU Affero General Public License
 
along with this program.  If not, see <https://www.gnu.org/licenses/>."""
 

	
 
import argparse
 
import enum
 
import inspect
 
import logging
 
import operator
 
import os
 
import pkg_resources
 
import re
 
import signal
 
import sys
 
import traceback
 
import types
 

	
 
from pathlib import Path
 

	
 
from . import data
 
from . import filters
 
from . import rtutil
 

	
 
from typing import (
 
    Any,
 
    Callable,
 
    Iterable,
 
    NamedTuple,
 
    NoReturn,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Type,
 
    Union,
 
)
 
from .beancount_types import (
 
    MetaKey,
 
)
 

	
 
VERSION = pkg_resources.require(PKGNAME)[0].version
 

	
 
class ExceptHook:
 
    def __init__(self,
 
                 logger: Optional[logging.Logger]=None,
 
                 default_exitcode: int=3,
 
    ) -> None:
 
        if logger is None:
 
            logger = logging.getLogger()
 
        self.logger = logger
 
        self.default_exitcode = default_exitcode
 

	
 
    def __call__(self,
 
                 exc_type: Type[BaseException],
 
                 exc_value: BaseException,
 
                 exc_tb: types.TracebackType,
 
    ) -> NoReturn:
 
        exitcode = self.default_exitcode
 
        if isinstance(exc_value, KeyboardInterrupt):
 
            signal.signal(signal.SIGINT, signal.SIG_DFL)
 
            os.kill(0, signal.SIGINT)
 
            signal.pause()
 
        elif isinstance(exc_value, OSError):
 
            exitcode += 1
 
            msg = "I/O error: {e.filename}: {e.strerror}".format(e=exc_value)
 
        else:
 
            parts = [type(exc_value).__name__, *exc_value.args]
 
            msg = "internal " + ": ".join(parts)
 
        self.logger.critical(msg)
 
        self.logger.debug(
 
            ''.join(traceback.format_exception(exc_type, exc_value, exc_tb)),
 
        )
 
        raise SystemExit(exitcode)
 

	
 

	
 
class InfoAction(argparse.Action):
 
    def __call__(self,
 
                 parser: argparse.ArgumentParser,
 
                 namespace: argparse.Namespace,
 
                 values: Union[Sequence[Any], str, None]=None,
 
                 option_string: Optional[str]=None,
 
    ) -> NoReturn:
 
        if isinstance(self.const, str):
 
            info = self.const
 
            exitcode = 0
 
        else:
 
            info, exitcode = self.const
 
        print(info)
 
        raise SystemExit(exitcode)
 

	
 

	
 
class LogLevel(enum.IntEnum):
 
    DEBUG = logging.DEBUG
 
    INFO = logging.INFO
 
    WARNING = logging.WARNING
 
    ERROR = logging.ERROR
 
    CRITICAL = logging.CRITICAL
 
    WARN = WARNING
 
    ERR = ERROR
 
    CRIT = CRITICAL
 

	
 
    @classmethod
 
    def from_arg(cls, arg: str) -> int:
 
        try:
 
            return cls[arg.upper()].value
 
        except KeyError:
 
            raise ValueError(f"unknown loglevel {arg!r}") from None
 

	
 
    @classmethod
 
    def choices(cls) -> Iterable[str]:
 
        for level in sorted(cls, key=operator.attrgetter('value')):
 
            yield level.name.lower()
 

	
 

	
 
class SearchTerm(NamedTuple):
 
    """NamedTuple representing a user's metadata filter
 

	
 
    SearchTerm knows how to parse and store posting metadata filters provided
 
    by the user in `key=value` format. Reporting tools can use this to filter
 
    postings that match the user's criteria, to report on subsets of the books.
 

	
 
    Typical usage looks like::
 

	
 
      argument_parser.add_argument(
 
        'search_terms',
 
        type=SearchTerm.arg_parser(),
 
        …,
 
      )
 

	
 
      args = argument_parser.parse_args(…)
 
      for query in args.search_terms:
 
        postings = query.filter_postings(postings)
 
    """
 
    meta_key: MetaKey
 
    pattern: str
 

	
 
    @classmethod
 
    def arg_parser(cls,
 
                   default_key: Optional[str]=None,
 
                   ticket_default_key: Optional[str]=None,
 
    ) -> Callable[[str], 'SearchTerm']:
 
        """Build a SearchTerm parser
 

	
 
        This method returns a function that can parse strings in ``key=value``
 
        format and return a corresponding SearchTerm.
 

	
 
        If you specify a default key, then strings that just specify a ``value``
 
        will be parsed as if they said ``default_key=value``. Otherwise,
 
        parsing strings without a metadata key will raise a ValueError.
 

	
 
        If you specify a default key ticket links, then values in the format
 
        ``number``, ``rt:number``, or ``rt://ticket/number`` will be parsed as
 
        if they said ``ticket_default_key=value``.
 
        """
 
        if ticket_default_key is None:
 
            ticket_default_key = default_key
 
        def parse_search_term(arg: str) -> 'SearchTerm':
 
            key: Optional[str] = None
 
            if re.match(r'^[a-z][-\w]*=', arg):
 
                key, _, raw_link = arg.partition('=')
 
            else:
 
                raw_link = arg
 
            rt_ids = rtutil.RT.parse(raw_link)
 
            if rt_ids is None:
 
                rt_ids = rtutil.RT.parse('rt:' + raw_link)
 
            if rt_ids is None:
 
                if key is None:
 
                    key = default_key
 
                pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
 
            else:
 
                ticket_id, attachment_id = rt_ids
 
                if key is None:
 
                    if attachment_id is None:
 
                        key = ticket_default_key
 
                    else:
 
                        key = default_key
 
                pattern = rtutil.RT.metadata_regexp(
 
                    ticket_id,
 
                    attachment_id,
 
                    first_link_only=key == 'rt-id' and attachment_id is None,
 
                )
 
            if key is None:
 
                raise ValueError(f"invalid search term {arg!r}: no metadata key")
 
            return cls(key, pattern)
 
        return parse_search_term
 

	
 
    def filter_postings(self, postings: Iterable[data.Posting]) -> Iterable[data.Posting]:
 
        return filters.filter_meta_match(
 
            postings, self.meta_key, re.compile(self.pattern),
 
        )
 

	
 

	
 
def add_loglevel_argument(parser: argparse.ArgumentParser,
 
                          default: LogLevel=LogLevel.INFO) -> argparse.Action:
 
    return parser.add_argument(
 
        '--loglevel',
 
        metavar='LEVEL',
 
        default=default.value,
 
        type=LogLevel.from_arg,
 
        help="Show logs at this level and above."
 
        f" Specify one of {', '.join(LogLevel.choices())}."
 
        f" Default {default.name.lower()}.",
 
    )
 

	
 
def add_version_argument(parser: argparse.ArgumentParser) -> argparse.Action:
 
    progname = parser.prog or sys.argv[0]
 
    return parser.add_argument(
 
        '--version', '--copyright', '--license',
 
        action=InfoAction,
 
        nargs=0,
 
        const=f"{progname} version {VERSION}\n{LICENSE}",
 
        help="Show program version and license information",
 
    )
 

	
 
def is_main_script(prog_name: str) -> bool:
 
    """Return true if the caller is the "main" program."""
 
    stack = iter(inspect.stack(context=False))
 
    next(stack)  # Discard the frame for calling this function
 
    caller_filename = next(stack).filename
 
    return all(frame.filename == caller_filename
 
               or Path(frame.filename).stem == prog_name
 
               for frame in stack)
 

	
 
def setup_logger(logger: Union[str, logging.Logger]='',
 
                 loglevel: int=logging.INFO,
 
                 stream: TextIO=sys.stderr,
 
                 fmt: str='%(name)s: %(levelname)s: %(message)s',
 
) -> logging.Logger:
 
    if isinstance(logger, str):
 
        logger = logging.getLogger(logger)
 
    formatter = logging.Formatter(fmt)
 
    handler = logging.StreamHandler(stream)
 
    handler.setFormatter(formatter)
 
    logger.addHandler(handler)
 
    logger.setLevel(loglevel)
 
    return logger
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -529,276 +529,244 @@ class OutgoingReport(BaseReport):
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            self.logger.error(
 
                "can't generate outgoings report for %s because no RT ticket available: %s",
 
                posts.invoice, errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
 
                or ''
 
            )
 
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
 

	
 
        balance_s = posts.end_balance.format(None)
 
        raw_balance = -posts.balance()
 
        if raw_balance != posts.end_balance:
 
            balance_s = f'{raw_balance} ({balance_s})'
 

	
 
        contract_links = posts.all_meta_links('contract')
 
        if contract_links:
 
            contract_s = ' , '.join(self.rt_wrapper.iter_urls(
 
                contract_links, missing_fmt='<BROKEN RT LINK: {}>',
 
            ))
 
        else:
 
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
 
        projects = [v for v in posts.meta_values('project')
 
                    if isinstance(v, str)]
 

	
 
        yield "PAYMENT FOR APPROVAL:"
 
        yield f"REQUESTOR: {requestor}"
 
        yield f"TOTAL TO PAY: {balance_s}"
 
        yield f"AGREEMENT: {contract_s}"
 
        yield f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}"
 
        yield f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}"
 
        yield f"PROJECT: {', '.join(projects)}"
 
        yield "\nBEANCOUNT ENTRIES:\n"
 

	
 
        last_txn: Optional[Transaction] = None
 
        for post in posts:
 
            txn = post.meta.txn
 
            if txn is not last_txn:
 
                last_txn = txn
 
                txn = self.rt_wrapper.txn_with_urls(txn, '{}')
 
                yield bc_printer.format_entry(txn)
 

	
 

	
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 
    @classmethod
 
    def default_for(cls, groups: PostGroups) -> 'ReportType':
 
        if len(groups) == 1 and all(
 
                group.accrual_type is AccrualAccount.PAYABLE
 
                and not group.is_paid()
 
                for group in groups.values()
 
        ):
 
            return cls.OUTGOING
 
        else:
 
            return cls.BALANCE
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    CONSISTENCY_ERRORS = 2
 
    REPORT_ERRORS = 4
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
class SearchTerm(NamedTuple):
 
    meta_key: MetaKey
 
    pattern: str
 

	
 
    @classmethod
 
    def parse(cls, s: str) -> 'SearchTerm':
 
        key_match = re.match(r'^[a-z][-\w]*=', s)
 
        key: Optional[str]
 
        if key_match:
 
            key, _, raw_link = s.partition('=')
 
        else:
 
            key = None
 
            raw_link = s
 
        rt_ids = rtutil.RT.parse(raw_link)
 
        if rt_ids is None:
 
            rt_ids = rtutil.RT.parse('rt:' + raw_link)
 
        if rt_ids is None:
 
            if key is None:
 
                key = 'invoice'
 
            pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
 
        else:
 
            ticket_id, attachment_id = rt_ids
 
            if key is None:
 
                key = 'rt-id' if attachment_id is None else 'invoice'
 
            pattern = rtutil.RT.metadata_regexp(
 
                ticket_id,
 
                attachment_id,
 
                first_link_only=key == 'rt-id' and attachment_id is None,
 
            )
 
        return cls(key, pattern)
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[SearchTerm],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for meta_key, pattern in search_terms:
 
        postings = filters.filter_meta_match(postings, meta_key, re.compile(pattern))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, one of `aging`, `balance`, or
 
`outgoing`. If not specified, the default is `aging` when no search terms are
 
given, `outgoing` for search terms that return a single outstanding payable,
 
and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
 
        type=int,
 
        default=-1,
 
        help="""How far back to search the books for related transactions.
 
You can either specify a fiscal year, or a negative offset from the current
 
fiscal year, to start loading entries from. The default is -1 (start from the
 
previous fiscal year).
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for the balance and outgoing reports, and a generated
 
filename for other reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search',
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    args.search_terms = [SearchTerm.parse(s) for s in args.search]
 
    if args.report_type is None and not args.search_terms:
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def get_output_path(output_path: Optional[Path],
 
                    default_path: Path=STANDARD_PATH,
 
) -> Optional[Path]:
 
    if output_path is None:
 
        output_path = default_path
 
    if output_path == STANDARD_PATH:
 
        return None
 
    else:
 
        return output_path
 

	
 
def get_output_bin(path: Optional[Path], stdout: TextIO) -> BinaryIO:
 
    if path is None:
 
        return open(stdout.fileno(), 'wb')
 
    else:
 
        return path.open('wb')
 

	
 
def get_output_text(path: Optional[Path], stdout: TextIO) -> TextIO:
 
    if path is None:
 
        return stdout
 
    else:
 
        return path.open('w')
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    if cliutil.is_main_script(PROGNAME):
 
        global logger
 
        logger = logging.getLogger(PROGNAME)
 
        sys.excepthook = cliutil.ExceptHook(logger)
 
    args = parse_arguments(arglist)
 
    cliutil.setup_logger(logger, args.loglevel, stderr)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries: Entries = []
 
        source = {
 
            'filename': str(config.config_file_path()),
 
            'lineno': 1,
 
        }
 
        load_errors: Errors = [Error(source, "no books to load in configuration", None)]
 
    elif args.report_type is ReportType.AGING:
 
        entries, load_errors, _ = books_loader.load_all()
 
    else:
 
        entries, load_errors, _ = books_loader.load_all(args.since)
 
    filters.remove_opening_balance_txn(entries)
 

	
 
    returncode = 0
 
    postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
 
    groups: PostGroups = dict(AccrualPostings.group_by_meta(postings, 'invoice'))
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
    for related in groups.values():
 
        for error in related.report_inconsistencies():
 
            bc_printer.print_error(error, file=stderr)
 
            returncode |= ReturnFlag.CONSISTENCY_ERRORS
 
    if not groups:
 
        logger.warning("no matching entries found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 

	
 
    groups = {
 
        key: posts
 
        for source_posts in groups.values()
 
        for key, posts in source_posts.make_consistent()
 
    }
 
    if args.report_type is not ReportType.AGING:
 
        groups = {
 
            key: posts for key, posts in groups.items() if not posts.is_paid()
 
        } or groups
 

	
 
    if args.report_type is None:
 
        args.report_type = ReportType.default_for(groups)
 
    report: Optional[BaseReport] = None
 
    output_path: Optional[Path] = None
 
    if args.report_type is ReportType.AGING:
 
        rt_client = config.rt_client()
 
        if rt_client is None:
 
            logger.error("unable to generate aging report: RT client is required")
 
        else:
 
            now = datetime.datetime.now()
 
            default_path = Path(now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods'))
 
            output_path = get_output_path(args.output_file, default_path)
 
            out_bin = get_output_bin(output_path, stdout)
 
            report = AgingReport(rt_client, out_bin)
 
    elif args.report_type is ReportType.OUTGOING:
 
        rt_client = config.rt_client()
 
        if rt_client is None:
tests/test_cliutil_searchterm.py
Show inline comments
 
new file 100644
 
"""test_cliutil_searchterm - Unit tests for cliutil.SearchTerm"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import re
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import cliutil
 
from conservancy_beancount import data
 

	
 
TICKET_LINKS = [
 
    '123',
 
    'rt:123',
 
    'rt://ticket/123',
 
]
 

	
 
ATTACHMENT_LINKS = [
 
    '123/456',
 
    'rt:123/456',
 
    'rt://ticket/123/attachments/456',
 
]
 

	
 
REPOSITORY_LINKS = [
 
    '789.pdf',
 
    'Documents/789.pdf',
 
]
 

	
 
RT_LINKS = TICKET_LINKS + ATTACHMENT_LINKS
 
ALL_LINKS = RT_LINKS + REPOSITORY_LINKS
 

	
 
INVALID_METADATA_KEYS = [
 
    # Must start with a-z
 
    ';key',
 
    ' key',
 
    'ákey',
 
    'Key',
 
    # Must only contain alphanumerics, -, and _
 
    'key;',
 
    'key ',
 
    'a.key',
 
]
 

	
 
@pytest.fixture(scope='module')
 
def defaults_parser():
 
    return cliutil.SearchTerm.arg_parser('document', 'rt-id')
 

	
 
@pytest.fixture(scope='module')
 
def one_default_parser():
 
    return cliutil.SearchTerm.arg_parser('default')
 

	
 
@pytest.fixture(scope='module')
 
def no_default_parser():
 
    return cliutil.SearchTerm.arg_parser()
 

	
 
def check_link_regexp(regexp, match_s, first_link_only=False):
 
    assert regexp
 
    assert re.search(regexp, match_s)
 
    assert re.search(regexp, match_s + ' postlink')
 
    assert re.search(regexp, match_s + '0') is None
 
    assert re.search(regexp, '1' + match_s) is None
 
    end_match = re.search(regexp, 'prelink ' + match_s)
 
    if first_link_only:
 
        assert end_match is None
 
    else:
 
        assert end_match
 

	
 
@pytest.mark.parametrize('arg', TICKET_LINKS)
 
def test_search_term_parse_ticket(defaults_parser, arg):
 
    key, regexp = defaults_parser(arg)
 
    assert key == 'rt-id'
 
    check_link_regexp(regexp, 'rt:123', first_link_only=True)
 
    check_link_regexp(regexp, 'rt://ticket/123', first_link_only=True)
 

	
 
@pytest.mark.parametrize('arg', ATTACHMENT_LINKS)
 
def test_search_term_parse_attachment(defaults_parser, arg):
 
    key, regexp = defaults_parser(arg)
 
    assert key == 'document'
 
    check_link_regexp(regexp, 'rt:123/456')
 
    check_link_regexp(regexp, 'rt://ticket/123/attachments/456')
 

	
 
@pytest.mark.parametrize('key,query', testutil.combine_values(
 
    ['approval', 'contract', 'invoice'],
 
    RT_LINKS,
 
))
 
def test_search_term_parse_metadata_rt_shortcut(defaults_parser, key, query):
 
    actual_key, regexp = defaults_parser(f'{key}={query}')
 
    assert actual_key == key
 
    if query.endswith('/456'):
 
        check_link_regexp(regexp, 'rt:123/456')
 
        check_link_regexp(regexp, 'rt://ticket/123/attachments/456')
 
    else:
 
        check_link_regexp(regexp, 'rt:123')
 
        check_link_regexp(regexp, 'rt://ticket/123')
 

	
 
@pytest.mark.parametrize('search_prefix', [
 
    '',
 
    'approval=',
 
    'contract=',
 
    'invoice=',
 
])
 
def test_search_term_parse_repo_link(defaults_parser, search_prefix):
 
    document = '1234.pdf'
 
    actual_key, regexp = defaults_parser(f'{search_prefix}{document}')
 
    if search_prefix:
 
        expect_key = search_prefix.rstrip('=')
 
    else:
 
        expect_key = 'document'
 
    assert actual_key == expect_key
 
    check_link_regexp(regexp, document)
 

	
 
@pytest.mark.parametrize('search,unmatched', [
 
    ('1234.pdf', '1234_pdf'),
 
])
 
def test_search_term_parse_regexp_escaping(defaults_parser, search, unmatched):
 
    _, regexp = defaults_parser(search)
 
    assert re.search(regexp, unmatched) is None
 

	
 
@pytest.mark.parametrize('key', INVALID_METADATA_KEYS)
 
def test_non_metadata_key(one_default_parser, key):
 
    document = f'{key}=890'
 
    key, pattern = one_default_parser(document)
 
    assert key == 'default'
 
    check_link_regexp(pattern, document)
 

	
 
@pytest.mark.parametrize('arg', ALL_LINKS)
 
def test_default_parser(one_default_parser, arg):
 
    key, _ = one_default_parser(arg)
 
    assert key == 'default'
 

	
 
@pytest.mark.parametrize('arg', ALL_LINKS + [
 
    f'{nonkey}={link}' for nonkey, link in testutil.combine_values(
 
        INVALID_METADATA_KEYS, ALL_LINKS,
 
    )
 
])
 
def test_no_key(no_default_parser, arg):
 
    with pytest.raises(ValueError):
 
        key, pattern = no_default_parser(arg)
 

	
 
@pytest.mark.parametrize('key', ['zero', 'one', 'two'])
 
def test_filter_postings(key):
 
    txn = testutil.Transaction(postings=[
 
        ('Income:Other', 3, {'one': '1', 'two': '2'}),
 
        ('Income:Other', 2, {'two': '2'}),
 
        ('Income:Other', 1, {'one': '1'}),
 
    ])
 
    search = cliutil.SearchTerm(key, '.')
 
    actual = list(search.filter_postings(data.Posting.from_txn(txn)))
 
    assert len(actual) == 0 if key == 'zero' else 2
 
    assert all(post.meta.get(key) for post in actual)
tests/test_reports_accrual.py
Show inline comments
 
"""test_reports_accrual - Unit tests for accrual report"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import copy
 
import datetime
 
import io
 
import itertools
 
import logging
 
import operator
 
import re
 

	
 
import babel.numbers
 
import odf.opendocument
 
import odf.table
 
import odf.text
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from decimal import Decimal
 
from typing import NamedTuple, Optional, Sequence
 

	
 
from beancount.core import data as bc_data
 
from beancount import loader as bc_loader
 
from conservancy_beancount import cliutil
 
from conservancy_beancount import data
 
from conservancy_beancount import rtutil
 
from conservancy_beancount.reports import accrual
 
from conservancy_beancount.reports import core
 

	
 
_accruals_load = bc_loader.load_file(testutil.test_path('books/accruals.beancount'))
 
ACCRUAL_TXNS = [
 
    entry for entry in _accruals_load[0]
 
    if hasattr(entry, 'narration')
 
    and entry.narration != 'Opening balances'
 
]
 
ACCRUALS_COUNT = sum(
 
    1
 
    for txn in ACCRUAL_TXNS
 
    for post in txn.postings
 
    if post.account.startswith(('Assets:Receivable:', 'Liabilities:Payable:'))
 
)
 

	
 
ACCOUNTS = [
 
    'Assets:Receivable:Accounts',
 
    'Assets:Receivable:Loans',
 
    'Liabilities:Payable:Accounts',
 
    'Liabilities:Payable:Vacation',
 
]
 

	
 
CONSISTENT_METADATA = [
 
    'contract',
 
    'purchase-order',
 
]
 

	
 
class AgingRow(NamedTuple):
 
    date: datetime.date
 
    entity: Sequence[str]
 
    amount: Optional[Sequence[bc_data.Amount]]
 
    at_cost: bc_data.Amount
 
    rt_id: Sequence[str]
 
    invoice: Sequence[str]
 

	
 
    @classmethod
 
    def make_simple(cls, date, entity, at_cost, invoice, rt_id=None, orig_amount=None):
 
        if isinstance(date, str):
 
            date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
 
        if not isinstance(at_cost, tuple):
 
            at_cost = testutil.Amount(at_cost)
 
        if rt_id is None:
 
            rt_id, _, _ = invoice.partition('/')
 
        return cls(date, [entity], orig_amount, at_cost, [rt_id], [invoice])
 

	
 
    def check_row_match(self, sheet_row):
 
        cells = testutil.ODSCell.from_row(sheet_row)
 
        assert len(cells) == len(self)
 
        cells = iter(cells)
 
        assert next(cells).value == self.date
 
        assert next(cells).text == '\0'.join(self.entity)
 
        assert next(cells).text == '\0'.join(
 
            babel.numbers.format_currency(number, currency, format_type='accounting')
 
            for number, currency in self.amount or ()
 
        )
 
        usd_cell = next(cells)
 
        assert usd_cell.value_type == 'currency'
 
        assert usd_cell.value == self.at_cost.number
 
        for index, cell in enumerate(cells):
 
            links = cell.getElementsByType(odf.text.A)
 
            assert len(links) == len(cell.childNodes)
 
        assert index >= 1
 

	
 

	
 
AGING_AP = [
 
    AgingRow.make_simple('2010-03-06', 'EarlyBird', -125, 'rt:44/440'),
 
    AgingRow.make_simple('2010-03-30', 'EarlyBird', 75, 'rt:490/4900'),
 
    AgingRow.make_simple('2010-04-30', 'Vendor', 200, 'FIXME'),
 
    AgingRow.make_simple('2010-06-10', 'Lawyer', 280, 'rt:510/6100'),
 
    AgingRow.make_simple('2010-06-18', 'EuroGov', 1100, 'rt:520/5200',
 
                         orig_amount=[testutil.Amount(1000, 'EUR')]),
 
]
 

	
 
AGING_AR = [
 
    AgingRow.make_simple('2010-03-05', 'EarlyBird', -500, 'rt:40/400'),
 
    AgingRow.make_simple('2010-05-15', 'MatchingProgram', 1500,
 
                         'rt://ticket/515/attachments/5150'),
 
]
 

	
 
class RTClient(testutil.RTClient):
 
    TICKET_DATA = {
 
        '40': [
 
            ('400', 'invoice feb.csv', 'text/csv', '40.4k'),
 
        ],
 
        '44': [
 
            ('440', 'invoice feb.csv', 'text/csv', '40.4k'),
 
        ],
 
        '490': [],
 
        '505': [],
 
        '510': [
 
            ('4000', 'contract.pdf', 'application/pdf', '1.4m'),
 
            ('5100', 'invoice april.pdf', 'application/pdf', '1.5m'),
 
            ('5105', 'payment.png', 'image/png', '51.5k'),
 
            ('6100', 'invoice may.pdf', 'application/pdf', '1.6m'),
 
        ],
 
        '515': [],
 
        '520': [],
 
    }
 

	
 

	
 
@pytest.fixture
 
def accrual_postings():
 
    return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS))
 

	
 
def check_link_regexp(regexp, match_s, first_link_only=False):
 
    assert regexp
 
    assert re.search(regexp, match_s)
 
    assert re.search(regexp, match_s + ' postlink')
 
    assert re.search(regexp, match_s + '0') is None
 
    assert re.search(regexp, '1' + match_s) is None
 
    end_match = re.search(regexp, 'prelink ' + match_s)
 
    if first_link_only:
 
        assert end_match is None
 
    else:
 
        assert end_match
 

	
 
def accruals_by_meta(postings, value, key='invoice', wrap_type=iter):
 
    return wrap_type(
 
        post for post in postings
 
        if post.meta.get(key) == value
 
        and post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
 
    )
 

	
 
def find_row_by_text(row_source, want_text):
 
    for row in row_source:
 
        try:
 
            found_row = row.childNodes[0].text == want_text
 
        except IndexError:
 
            found_row = False
 
        if found_row:
 
            return row
 
    return None
 

	
 
def check_aging_sheet(sheet, aging_rows, date, accrue_date):
 
    if not aging_rows:
 
        return
 
    if isinstance(accrue_date, int):
 
        accrue_date = date + datetime.timedelta(days=accrue_date)
 
    rows = iter(sheet.getElementsByType(odf.table.TableRow))
 
    for row in rows:
 
        if "Aging Report" in row.text:
 
            break
 
    else:
 
        assert None, "Header row not found"
 
    assert f"Accrued by {accrue_date.isoformat()}" in row.text
 
    assert f"Unpaid by {date.isoformat()}" in row.text
 
    expect_rows = iter(aging_rows)
 
    row0 = find_row_by_text(rows, aging_rows[0].date.isoformat())
 
    next(expect_rows).check_row_match(row0)
 
    for actual, expected in zip(rows, expect_rows):
 
        expected.check_row_match(actual)
 
    for row in rows:
 
        if row.text.startswith("Total Aged Over "):
 
            break
 
    else:
 
        assert None, "Totals rows not found"
 
    actual_sum = Decimal(row.childNodes[-1].value)
 
    for row in rows:
 
        if row.text.startswith("Total Aged Over "):
 
            actual_sum += Decimal(row.childNodes[-1].value)
 
        else:
 
            break
 
    assert actual_sum == sum(
 
        row.at_cost.number
 
        for row in aging_rows
 
        if row.date <= accrue_date
 
        and row.at_cost.number > 0
 
    )
 

	
 
def check_aging_ods(ods_file,
 
                    date=None,
 
                    recv_rows=AGING_AR,
 
                    pay_rows=AGING_AP,
 
):
 
    if date is None:
 
        date = datetime.date.today()
 
    ods_file.seek(0)
 
    ods = odf.opendocument.load(ods_file)
 
    sheets = ods.spreadsheet.getElementsByType(odf.table.Table)
 
    assert len(sheets) == 2
 
    check_aging_sheet(sheets[0], recv_rows, date, -60)
 
    check_aging_sheet(sheets[1], pay_rows, date, -30)
 

	
 
@pytest.mark.parametrize('link_fmt', [
 
    '{}',
 
    'rt:{}',
 
    'rt://ticket/{}',
 
])
 
def test_search_term_parse_rt_shortcuts(link_fmt):
 
    key, regexp = accrual.SearchTerm.parse(link_fmt.format(220))
 
    assert key == 'rt-id'
 
    check_link_regexp(regexp, 'rt:220', first_link_only=True)
 
    check_link_regexp(regexp, 'rt://ticket/220', first_link_only=True)
 

	
 
@pytest.mark.parametrize('link_fmt', [
 
    '{}/{}',
 
    'rt:{}/{}',
 
    'rt://ticket/{}/attachments/{}',
 
])
 
def test_search_term_parse_invoice_shortcuts(link_fmt):
 
    key, regexp = accrual.SearchTerm.parse(link_fmt.format(330, 660))
 
    assert key == 'invoice'
 
    check_link_regexp(regexp, 'rt:330/660')
 
    check_link_regexp(regexp, 'rt://ticket/330/attachments/660')
 

	
 
@pytest.mark.parametrize('key', [
 
    'approval',
 
    'contract',
 
    'invoice',
 
])
 
def test_search_term_parse_metadata_rt_shortcut(key):
 
    actual_key, regexp = accrual.SearchTerm.parse(f'{key}=440/420')
 
    assert actual_key == key
 
    check_link_regexp(regexp, 'rt:440/420')
 
    check_link_regexp(regexp, 'rt://ticket/440/attachments/420')
 

	
 
@pytest.mark.parametrize('key', [
 
    None,
 
    'approval',
 
    'contract',
 
    'invoice',
 
])
 
def test_search_term_parse_repo_link(key):
 
    document = '1234.pdf'
 
    if key is None:
 
        key = 'invoice'
 
        search = document
 
    else:
 
        search = f'{key}={document}'
 
    actual_key, regexp = accrual.SearchTerm.parse(search)
 
    assert actual_key == key
 
    check_link_regexp(regexp, document)
 

	
 
@pytest.mark.parametrize('search,unmatched', [
 
    ('1234.pdf', '1234_pdf'),
 
])
 
def test_search_term_parse_regexp_escaping(search, unmatched):
 
    _, regexp = accrual.SearchTerm.parse(search)
 
    assert re.search(regexp, unmatched) is None
 

	
 
@pytest.mark.parametrize('search_terms,expect_count,check_func', [
 
    ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
 
        'Assets:Receivable:', 'Liabilities:Payable:',
 
    )),
 
    ([('rt-id', '^rt:505$')], 2, lambda post: post.meta['entity'] == 'DonorA'),
 
    ([('invoice', r'^rt:\D+515/')], 1, lambda post: post.meta['entity'] == 'MatchingProgram'),
 
    ([('entity', '^Lawyer$')], 3, lambda post: post.meta['rt-id'] == 'rt:510'),
 
    ([('entity', '^Lawyer$'), ('contract', '^rt:510/')], 2,
 
     lambda post: post.meta['invoice'].startswith('rt:510/')),
 
    ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False),
 
])
 
def test_filter_search(accrual_postings, search_terms, expect_count, check_func):
 
    search_terms = [cliutil.SearchTerm._make(query) for query in search_terms]
 
    actual = list(accrual.filter_search(accrual_postings, search_terms))
 
    if expect_count < ACCRUALS_COUNT:
 
        assert ACCRUALS_COUNT > len(actual) >= expect_count
 
    else:
 
        assert len(actual) == ACCRUALS_COUNT
 
    for post in actual:
 
        assert check_func(post)
 

	
 
@pytest.mark.parametrize('arg,expected', [
 
    ('aging', accrual.AgingReport),
 
    ('balance', accrual.BalanceReport),
 
    ('outgoing', accrual.OutgoingReport),
 
    ('age', accrual.AgingReport),
 
    ('bal', accrual.BalanceReport),
 
    ('out', accrual.OutgoingReport),
 
    ('outgoings', accrual.OutgoingReport),
 
])
 
def test_report_type_by_name(arg, expected):
 
    assert accrual.ReportType.by_name(arg.lower()).value is expected
 
    assert accrual.ReportType.by_name(arg.title()).value is expected
 
    assert accrual.ReportType.by_name(arg.upper()).value is expected
 

	
 
@pytest.mark.parametrize('arg', [
 
    'unknown',
 
    'blance',
 
    'outgong',
 
])
 
def test_report_type_by_unknown_name(arg):
 
    # Raising ValueError helps argparse generate good messages.
 
    with pytest.raises(ValueError):
 
        accrual.ReportType.by_name(arg)
 

	
 
@pytest.mark.parametrize('acct_name', ACCOUNTS)
 
def test_accrual_postings_consistent_account(acct_name):
 
    meta = {'invoice': '{acct_name} invoice.pdf'}
 
    txn = testutil.Transaction(postings=[
 
        (acct_name, 50, meta),
 
        (acct_name, 25, meta),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.account == acct_name
 
    assert related.accounts == {acct_name}
 

	
 
@pytest.mark.parametrize('meta_key,acct_name', testutil.combine_values(
 
    CONSISTENT_METADATA,
 
    ACCOUNTS,
 
))
 
def test_accrual_postings_consistent_metadata(meta_key, acct_name):
 
    meta_value = f'{meta_key}.pdf'
 
    meta = {
 
        meta_key: meta_value,
 
        'invoice': f'invoice with {meta_key}.pdf',
 
    }
 
    txn = testutil.Transaction(postings=[
 
        (acct_name, 70, meta),
 
        (acct_name, 35, meta),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    attr_name = meta_key.replace('-', '_')
 
    assert getattr(related, attr_name) == meta_value
 
    assert getattr(related, f'{attr_name}s') == {meta_value}
 

	
 
def test_accrual_postings_entity():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 25, {'entity': 'Accruee'}),
 
        (ACCOUNTS[0], -15, {'entity': 'Payee15'}),
 
        (ACCOUNTS[0], -10, {'entity': 'Payee10'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.accrued_entities == {'Accruee'}
 
    assert related.paid_entities == {'Payee10', 'Payee15'}
 

	
 
def test_accrual_postings_entities():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 25, {'entity': 'Accruee'}),
 
        (ACCOUNTS[0], -15, {'entity': 'Payee15'}),
 
        (ACCOUNTS[0], -10, {'entity': 'Payee10'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    actual = related.entities()
 
    assert next(actual, None) == 'Accruee'
 
    assert set(actual) == {'Payee10', 'Payee15'}
 

	
 
def test_accrual_postings_entities_no_duplicates():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 25, {'entity': 'Accruee'}),
 
        (ACCOUNTS[0], -15, {'entity': 'Accruee'}),
 
        (ACCOUNTS[0], -10, {'entity': 'Other'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    actual = related.entities()
 
    assert next(actual, None) == 'Accruee'
 
    assert next(actual, None) == 'Other'
 
    assert next(actual, None) is None
 

	
 
def test_accrual_postings_inconsistent_account():
0 comments (0 inline, 0 general)