Changeset - cd1b28ae3eb1
[Not reviewed]
0 3 1
Brett Smith - 4 years ago 2020-06-09 13:04:27
brettcsmith@brettcsmith.org
cliutil: Add generalized SearchTerm class.

This makes the same filtering easily available to other reporting tools for
consistency.
4 files changed with 261 insertions and 107 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/cliutil.py
Show inline comments
...
 
@@ -23,6 +23,7 @@ import logging
 
import operator
 
import os
 
import pkg_resources
 
import re
 
import signal
 
import sys
 
import traceback
...
 
@@ -30,9 +31,15 @@ import types
 

	
 
from pathlib import Path
 

	
 
from . import data
 
from . import filters
 
from . import rtutil
 

	
 
from typing import (
 
    Any,
 
    Callable,
 
    Iterable,
 
    NamedTuple,
 
    NoReturn,
 
    Optional,
 
    Sequence,
...
 
@@ -40,6 +47,9 @@ from typing import (
 
    Type,
 
    Union,
 
)
 
from .beancount_types import (
 
    MetaKey,
 
)
 

	
 
VERSION = pkg_resources.require(PKGNAME)[0].version
 

	
...
 
@@ -114,6 +124,85 @@ class LogLevel(enum.IntEnum):
 
        for level in sorted(cls, key=operator.attrgetter('value')):
 
            yield level.name.lower()
 

	
 

	
 
class SearchTerm(NamedTuple):
 
    """NamedTuple representing a user's metadata filter
 

	
 
    SearchTerm knows how to parse and store posting metadata filters provided
 
    by the user in `key=value` format. Reporting tools can use this to filter
 
    postings that match the user's criteria, to report on subsets of the books.
 

	
 
    Typical usage looks like::
 

	
 
      argument_parser.add_argument(
 
        'search_terms',
 
        type=SearchTerm.arg_parser(),
 
        …,
 
      )
 

	
 
      args = argument_parser.parse_args(…)
 
      for query in args.search_terms:
 
        postings = query.filter_postings(postings)
 
    """
 
    meta_key: MetaKey
 
    pattern: str
 

	
 
    @classmethod
 
    def arg_parser(cls,
 
                   default_key: Optional[str]=None,
 
                   ticket_default_key: Optional[str]=None,
 
    ) -> Callable[[str], 'SearchTerm']:
 
        """Build a SearchTerm parser
 

	
 
        This method returns a function that can parse strings in ``key=value``
 
        format and return a corresponding SearchTerm.
 

	
 
        If you specify a default key, then strings that just specify a ``value``
 
        will be parsed as if they said ``default_key=value``. Otherwise,
 
        parsing strings without a metadata key will raise a ValueError.
 

	
 
        If you specify a default key ticket links, then values in the format
 
        ``number``, ``rt:number``, or ``rt://ticket/number`` will be parsed as
 
        if they said ``ticket_default_key=value``.
 
        """
 
        if ticket_default_key is None:
 
            ticket_default_key = default_key
 
        def parse_search_term(arg: str) -> 'SearchTerm':
 
            key: Optional[str] = None
 
            if re.match(r'^[a-z][-\w]*=', arg):
 
                key, _, raw_link = arg.partition('=')
 
            else:
 
                raw_link = arg
 
            rt_ids = rtutil.RT.parse(raw_link)
 
            if rt_ids is None:
 
                rt_ids = rtutil.RT.parse('rt:' + raw_link)
 
            if rt_ids is None:
 
                if key is None:
 
                    key = default_key
 
                pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
 
            else:
 
                ticket_id, attachment_id = rt_ids
 
                if key is None:
 
                    if attachment_id is None:
 
                        key = ticket_default_key
 
                    else:
 
                        key = default_key
 
                pattern = rtutil.RT.metadata_regexp(
 
                    ticket_id,
 
                    attachment_id,
 
                    first_link_only=key == 'rt-id' and attachment_id is None,
 
                )
 
            if key is None:
 
                raise ValueError(f"invalid search term {arg!r}: no metadata key")
 
            return cls(key, pattern)
 
        return parse_search_term
 

	
 
    def filter_postings(self, postings: Iterable[data.Posting]) -> Iterable[data.Posting]:
 
        return filters.filter_meta_match(
 
            postings, self.meta_key, re.compile(self.pattern),
 
        )
 

	
 

	
 
def add_loglevel_argument(parser: argparse.ArgumentParser,
 
                          default: LogLevel=LogLevel.INFO) -> argparse.Action:
 
    return parser.add_argument(
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -622,46 +622,13 @@ class ReturnFlag(enum.IntFlag):
 
    REPORT_ERRORS = 4
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
class SearchTerm(NamedTuple):
 
    meta_key: MetaKey
 
    pattern: str
 

	
 
    @classmethod
 
    def parse(cls, s: str) -> 'SearchTerm':
 
        key_match = re.match(r'^[a-z][-\w]*=', s)
 
        key: Optional[str]
 
        if key_match:
 
            key, _, raw_link = s.partition('=')
 
        else:
 
            key = None
 
            raw_link = s
 
        rt_ids = rtutil.RT.parse(raw_link)
 
        if rt_ids is None:
 
            rt_ids = rtutil.RT.parse('rt:' + raw_link)
 
        if rt_ids is None:
 
            if key is None:
 
                key = 'invoice'
 
            pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
 
        else:
 
            ticket_id, attachment_id = rt_ids
 
            if key is None:
 
                key = 'rt-id' if attachment_id is None else 'invoice'
 
            pattern = rtutil.RT.metadata_regexp(
 
                ticket_id,
 
                attachment_id,
 
                first_link_only=key == 'rt-id' and attachment_id is None,
 
            )
 
        return cls(key, pattern)
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[SearchTerm],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for meta_key, pattern in search_terms:
 
        postings = filters.filter_meta_match(postings, meta_key, re.compile(pattern))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
...
 
@@ -696,7 +663,9 @@ filename for other reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search',
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
...
 
@@ -705,7 +674,6 @@ metadata to match. A single ticket number is a shortcut for
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    args.search_terms = [SearchTerm.parse(s) for s in args.search]
 
    if args.report_type is None and not args.search_terms:
 
        args.report_type = ReportType.AGING
 
    return args
tests/test_cliutil_searchterm.py
Show inline comments
 
new file 100644
 
"""test_cliutil_searchterm - Unit tests for cliutil.SearchTerm"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import re
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import cliutil
 
from conservancy_beancount import data
 

	
 
TICKET_LINKS = [
 
    '123',
 
    'rt:123',
 
    'rt://ticket/123',
 
]
 

	
 
ATTACHMENT_LINKS = [
 
    '123/456',
 
    'rt:123/456',
 
    'rt://ticket/123/attachments/456',
 
]
 

	
 
REPOSITORY_LINKS = [
 
    '789.pdf',
 
    'Documents/789.pdf',
 
]
 

	
 
RT_LINKS = TICKET_LINKS + ATTACHMENT_LINKS
 
ALL_LINKS = RT_LINKS + REPOSITORY_LINKS
 

	
 
INVALID_METADATA_KEYS = [
 
    # Must start with a-z
 
    ';key',
 
    ' key',
 
    'ákey',
 
    'Key',
 
    # Must only contain alphanumerics, -, and _
 
    'key;',
 
    'key ',
 
    'a.key',
 
]
 

	
 
@pytest.fixture(scope='module')
 
def defaults_parser():
 
    return cliutil.SearchTerm.arg_parser('document', 'rt-id')
 

	
 
@pytest.fixture(scope='module')
 
def one_default_parser():
 
    return cliutil.SearchTerm.arg_parser('default')
 

	
 
@pytest.fixture(scope='module')
 
def no_default_parser():
 
    return cliutil.SearchTerm.arg_parser()
 

	
 
def check_link_regexp(regexp, match_s, first_link_only=False):
 
    assert regexp
 
    assert re.search(regexp, match_s)
 
    assert re.search(regexp, match_s + ' postlink')
 
    assert re.search(regexp, match_s + '0') is None
 
    assert re.search(regexp, '1' + match_s) is None
 
    end_match = re.search(regexp, 'prelink ' + match_s)
 
    if first_link_only:
 
        assert end_match is None
 
    else:
 
        assert end_match
 

	
 
@pytest.mark.parametrize('arg', TICKET_LINKS)
 
def test_search_term_parse_ticket(defaults_parser, arg):
 
    key, regexp = defaults_parser(arg)
 
    assert key == 'rt-id'
 
    check_link_regexp(regexp, 'rt:123', first_link_only=True)
 
    check_link_regexp(regexp, 'rt://ticket/123', first_link_only=True)
 

	
 
@pytest.mark.parametrize('arg', ATTACHMENT_LINKS)
 
def test_search_term_parse_attachment(defaults_parser, arg):
 
    key, regexp = defaults_parser(arg)
 
    assert key == 'document'
 
    check_link_regexp(regexp, 'rt:123/456')
 
    check_link_regexp(regexp, 'rt://ticket/123/attachments/456')
 

	
 
@pytest.mark.parametrize('key,query', testutil.combine_values(
 
    ['approval', 'contract', 'invoice'],
 
    RT_LINKS,
 
))
 
def test_search_term_parse_metadata_rt_shortcut(defaults_parser, key, query):
 
    actual_key, regexp = defaults_parser(f'{key}={query}')
 
    assert actual_key == key
 
    if query.endswith('/456'):
 
        check_link_regexp(regexp, 'rt:123/456')
 
        check_link_regexp(regexp, 'rt://ticket/123/attachments/456')
 
    else:
 
        check_link_regexp(regexp, 'rt:123')
 
        check_link_regexp(regexp, 'rt://ticket/123')
 

	
 
@pytest.mark.parametrize('search_prefix', [
 
    '',
 
    'approval=',
 
    'contract=',
 
    'invoice=',
 
])
 
def test_search_term_parse_repo_link(defaults_parser, search_prefix):
 
    document = '1234.pdf'
 
    actual_key, regexp = defaults_parser(f'{search_prefix}{document}')
 
    if search_prefix:
 
        expect_key = search_prefix.rstrip('=')
 
    else:
 
        expect_key = 'document'
 
    assert actual_key == expect_key
 
    check_link_regexp(regexp, document)
 

	
 
@pytest.mark.parametrize('search,unmatched', [
 
    ('1234.pdf', '1234_pdf'),
 
])
 
def test_search_term_parse_regexp_escaping(defaults_parser, search, unmatched):
 
    _, regexp = defaults_parser(search)
 
    assert re.search(regexp, unmatched) is None
 

	
 
@pytest.mark.parametrize('key', INVALID_METADATA_KEYS)
 
def test_non_metadata_key(one_default_parser, key):
 
    document = f'{key}=890'
 
    key, pattern = one_default_parser(document)
 
    assert key == 'default'
 
    check_link_regexp(pattern, document)
 

	
 
@pytest.mark.parametrize('arg', ALL_LINKS)
 
def test_default_parser(one_default_parser, arg):
 
    key, _ = one_default_parser(arg)
 
    assert key == 'default'
 

	
 
@pytest.mark.parametrize('arg', ALL_LINKS + [
 
    f'{nonkey}={link}' for nonkey, link in testutil.combine_values(
 
        INVALID_METADATA_KEYS, ALL_LINKS,
 
    )
 
])
 
def test_no_key(no_default_parser, arg):
 
    with pytest.raises(ValueError):
 
        key, pattern = no_default_parser(arg)
 

	
 
@pytest.mark.parametrize('key', ['zero', 'one', 'two'])
 
def test_filter_postings(key):
 
    txn = testutil.Transaction(postings=[
 
        ('Income:Other', 3, {'one': '1', 'two': '2'}),
 
        ('Income:Other', 2, {'two': '2'}),
 
        ('Income:Other', 1, {'one': '1'}),
 
    ])
 
    search = cliutil.SearchTerm(key, '.')
 
    actual = list(search.filter_postings(data.Posting.from_txn(txn)))
 
    assert len(actual) == 0 if key == 'zero' else 2
 
    assert all(post.meta.get(key) for post in actual)
tests/test_reports_accrual.py
Show inline comments
...
 
@@ -37,6 +37,7 @@ from typing import NamedTuple, Optional, Sequence
 

	
 
from beancount.core import data as bc_data
 
from beancount import loader as bc_loader
 
from conservancy_beancount import cliutil
 
from conservancy_beancount import data
 
from conservancy_beancount import rtutil
 
from conservancy_beancount.reports import accrual
...
 
@@ -144,18 +145,6 @@ class RTClient(testutil.RTClient):
 
def accrual_postings():
 
    return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS))
 

	
 
def check_link_regexp(regexp, match_s, first_link_only=False):
 
    assert regexp
 
    assert re.search(regexp, match_s)
 
    assert re.search(regexp, match_s + ' postlink')
 
    assert re.search(regexp, match_s + '0') is None
 
    assert re.search(regexp, '1' + match_s) is None
 
    end_match = re.search(regexp, 'prelink ' + match_s)
 
    if first_link_only:
 
        assert end_match is None
 
    else:
 
        assert end_match
 

	
 
def accruals_by_meta(postings, value, key='invoice', wrap_type=iter):
 
    return wrap_type(
 
        post for post in postings
...
 
@@ -223,63 +212,6 @@ def check_aging_ods(ods_file,
 
    check_aging_sheet(sheets[0], recv_rows, date, -60)
 
    check_aging_sheet(sheets[1], pay_rows, date, -30)
 

	
 
@pytest.mark.parametrize('link_fmt', [
 
    '{}',
 
    'rt:{}',
 
    'rt://ticket/{}',
 
])
 
def test_search_term_parse_rt_shortcuts(link_fmt):
 
    key, regexp = accrual.SearchTerm.parse(link_fmt.format(220))
 
    assert key == 'rt-id'
 
    check_link_regexp(regexp, 'rt:220', first_link_only=True)
 
    check_link_regexp(regexp, 'rt://ticket/220', first_link_only=True)
 

	
 
@pytest.mark.parametrize('link_fmt', [
 
    '{}/{}',
 
    'rt:{}/{}',
 
    'rt://ticket/{}/attachments/{}',
 
])
 
def test_search_term_parse_invoice_shortcuts(link_fmt):
 
    key, regexp = accrual.SearchTerm.parse(link_fmt.format(330, 660))
 
    assert key == 'invoice'
 
    check_link_regexp(regexp, 'rt:330/660')
 
    check_link_regexp(regexp, 'rt://ticket/330/attachments/660')
 

	
 
@pytest.mark.parametrize('key', [
 
    'approval',
 
    'contract',
 
    'invoice',
 
])
 
def test_search_term_parse_metadata_rt_shortcut(key):
 
    actual_key, regexp = accrual.SearchTerm.parse(f'{key}=440/420')
 
    assert actual_key == key
 
    check_link_regexp(regexp, 'rt:440/420')
 
    check_link_regexp(regexp, 'rt://ticket/440/attachments/420')
 

	
 
@pytest.mark.parametrize('key', [
 
    None,
 
    'approval',
 
    'contract',
 
    'invoice',
 
])
 
def test_search_term_parse_repo_link(key):
 
    document = '1234.pdf'
 
    if key is None:
 
        key = 'invoice'
 
        search = document
 
    else:
 
        search = f'{key}={document}'
 
    actual_key, regexp = accrual.SearchTerm.parse(search)
 
    assert actual_key == key
 
    check_link_regexp(regexp, document)
 

	
 
@pytest.mark.parametrize('search,unmatched', [
 
    ('1234.pdf', '1234_pdf'),
 
])
 
def test_search_term_parse_regexp_escaping(search, unmatched):
 
    _, regexp = accrual.SearchTerm.parse(search)
 
    assert re.search(regexp, unmatched) is None
 

	
 
@pytest.mark.parametrize('search_terms,expect_count,check_func', [
 
    ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
 
        'Assets:Receivable:', 'Liabilities:Payable:',
...
 
@@ -292,6 +224,7 @@ def test_search_term_parse_regexp_escaping(search, unmatched):
 
    ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False),
 
])
 
def test_filter_search(accrual_postings, search_terms, expect_count, check_func):
 
    search_terms = [cliutil.SearchTerm._make(query) for query in search_terms]
 
    actual = list(accrual.filter_search(accrual_postings, search_terms))
 
    if expect_count < ACCRUALS_COUNT:
 
        assert ACCRUALS_COUNT > len(actual) >= expect_count
0 comments (0 inline, 0 general)