Changeset - d8507a1a35a7
[Not reviewed]
0 4 0
Brett Smith - 4 years ago 2020-04-23 14:27:47
brettcsmith@brettcsmith.org
rtutil: Add RTUtil.metadata_regexp() classmethod.

The accruals check script wants to be able to search RT links in
all kinds of metadata, not just rt-id as the filter currently
handles.
4 files changed with 55 insertions and 3 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/filters.py
Show inline comments
 
"""filters.py - Common filters for postings"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import re
 

	
 
from . import data
 
from . import rtutil
 

	
 
from typing import (
 
    Iterable,
 
    Pattern,
 
    Union,
 
)
 
from .beancount_types import (
 
    MetaKey,
 
    MetaValue,
 
)
 

	
 
Postings = Iterable[data.Posting]
 
Regexp = Union[str, Pattern]
 

	
 
def filter_meta_equal(postings: Postings, key: MetaKey, value: MetaValue) -> Postings:
 
    for post in postings:
 
        try:
 
            if post.meta[key] == value:
 
                yield post
 
        except KeyError:
 
            pass
 

	
 
def filter_meta_match(postings: Postings, key: MetaKey, regexp: Regexp) -> Postings:
 
    for post in postings:
 
        try:
 
            if re.search(regexp, post.meta[key]):
 
                yield post
 
        except (KeyError, TypeError):
 
            pass
 

	
 
def filter_for_rt_id(postings: Postings, ticket_id: Union[int, str]) -> Postings:
 
    """Filter postings with a primary RT ticket
 

	
 
    This functions yields postings where the *first* rt-id matches the given
 
    ticket number.
 
    """
 
    regexp = r'^\s*rt:(?://ticket/)?{}\b'.format(re.escape(str(ticket_id)))
 
    regexp = rtutil.RT.metadata_regexp(ticket_id, first_link_only=True)
 
    return filter_meta_match(postings, 'rt-id', regexp)
conservancy_beancount/rtutil.py
Show inline comments
...
 
@@ -221,68 +221,97 @@ class RT:
 
    ) -> str:
 
        if fragment is None:
 
            fragment = self.url_base.fragment
 
        else:
 
            fragment = urlparse.quote(fragment)
 
        if query:
 
            query_s = urlparse.urlencode(query)
 
        else:
 
            query_s = self.url_base.query
 
        urlparts = self.url_base._replace(
 
            path=self.url_base.path + urlparse.quote(path_tail),
 
            query=query_s,
 
            fragment=fragment,
 
        )
 
        return urlparse.urlunparse(urlparts)
 

	
 
    def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
 
        if txn_id is None:
 
            fragment = None
 
        else:
 
            fragment = 'txn-{}'.format(txn_id)
 
        return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
 

	
 
    @_cache_method
 
    def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
 
        attachment = self.rt.get_attachment(ticket_id, attachment_id)
 
        if attachment is None:
 
            return None
 
        mimetype = attachment.get('ContentType', '')
 
        if mimetype.startswith('text/'):
 
            return self._ticket_url(ticket_id, attachment['Transaction'])
 
        else:
 
            filename = attachment.get('Filename', '')
 
            if not filename:
 
                filename = 'RT{} attachment {}{}'.format(
 
                    ticket_id,
 
                    attachment_id,
 
                    mimetypes.guess_extension(mimetype) or '.bin',
 
                )
 
            path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
 
                attachment,
 
                filename,
 
            )
 
            return self._extend_url(path_tail)
 

	
 
    def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
 
        return self.url(ticket_id, attachment_id) is not None
 

	
 
    @classmethod
 
    def metadata_regexp(self,
 
                        ticket_id: RTId,
 
                        attachment_id: Optional[RTId]=None,
 
                        *,
 
                        first_link_only: bool=False
 
    ) -> str:
 
        """Return a pattern to find RT links in metadata
 

	
 
        Given a ticket ID and optional attachment ID, this method returns a
 
        regular expression pattern that will find matching RT links in a
 
        metadata value string, written in any format.
 

	
 
        If the keyword-only argument first_link_only is true, the pattern will
 
        only match the first link in a metadata string. Otherwise the pattern
 
        matches any link in the string (the default).
 
        """
 
        if first_link_only:
 
            prolog = r'^\s*'
 
        else:
 
            prolog = r'(?:^|\s)'
 
        if attachment_id is None:
 
            attachment = ''
 
        else:
 
            attachment = r'/(?:attachments?/)?{}'.format(attachment_id)
 
        ticket = r'rt:(?://ticket/)?{}'.format(ticket_id)
 
        epilog = r'/?(?:$|\s)'
 
        return f'{prolog}{ticket}{attachment}{epilog}'
 

	
 
    @classmethod
 
    def parse(cls, s: str) -> Optional[Tuple[str, Optional[str]]]:
 
        for regexp in cls.PARSE_REGEXPS:
 
            match = regexp.match(s)
 
            if match is not None:
 
                ticket_id, attachment_id = match.groups()
 
                return (ticket_id, attachment_id)
 
        return None
 

	
 
    @_cache_method
 
    def ticket_url(self, ticket_id: RTId) -> Optional[str]:
 
        if self.rt.get_ticket(ticket_id) is None:
 
            return None
 
        return self._ticket_url(ticket_id)
 

	
 
    def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]:
 
        if attachment_id is None:
 
            return self.ticket_url(ticket_id)
 
        else:
 
            return self.attachment_url(ticket_id, attachment_id)
tests/test_filters.py
Show inline comments
...
 
@@ -66,73 +66,71 @@ def check_filter(actual, entries, expected_indexes):
 
    ):
 
        assert actual_post[:-1] == expected_post[:-1]
 

	
 
@pytest.mark.parametrize('key,value,expected_indexes', [
 
    ('entity', 'Smith-Dakota', range(5)),
 
    ('receipt', 'CCReceipt.pdf', range(3)),
 
    ('receipt', 'CCPayment.pdf', range(3, 5)),
 
    ('receipt', 'CC', ()),
 
    ('statement', 'CheckingStatement.pdf', [4]),
 
    ('metadate', date(2020, 9, 2), range(3)),
 
    ('metadate', date(2020, 9, 4), range(3, 5)),
 
    ('BadKey', '', ()),
 
    ('emptykey', '', ()),
 
])
 
def test_filter_meta_equal(cc_txn_pair, key, value, expected_indexes):
 
    postings = data.Posting.from_entries(cc_txn_pair)
 
    actual = filters.filter_meta_equal(postings, key, value)
 
    check_filter(actual, cc_txn_pair, expected_indexes)
 

	
 
@pytest.mark.parametrize('key,regexp,expected_indexes', [
 
    ('entity', '^Smith-', range(5)),
 
    ('receipt', r'\.pdf$', range(5)),
 
    ('receipt', 'Receipt', range(3)),
 
    ('statement', '.', [4]),
 
    ('metadate', 'foo', ()),
 
    ('BadKey', '.', ()),
 
    ('emptykey', '.', ()),
 
])
 
def test_filter_meta_match(cc_txn_pair, key, regexp, expected_indexes):
 
    postings = data.Posting.from_entries(cc_txn_pair)
 
    actual = filters.filter_meta_match(postings, key, regexp)
 
    check_filter(actual, cc_txn_pair, expected_indexes)
 

	
 
@pytest.mark.parametrize('ticket_id,expected_indexes', [
 
    (550, range(5)),
 
    ('550', range(5)),
 
    (55, ()),
 
    ('55', ()),
 
    (50, ()),
 
    ('.', ()),
 
])
 
def test_filter_for_rt_id(cc_txn_pair, ticket_id, expected_indexes):
 
    postings = data.Posting.from_entries(cc_txn_pair)
 
    actual = filters.filter_for_rt_id(postings, ticket_id)
 
    check_filter(actual, cc_txn_pair, expected_indexes)
 

	
 
@pytest.mark.parametrize('rt_id', [
 
    'rt:450/',
 
    'rt:450/678',
 
    ' rt:450 rt:540',
 
    'rt://ticket/450',
 
    'rt://ticket/450/',
 
    'rt://ticket/450/678',
 
    ' rt://ticket/450',
 
    'rt://ticket/450 rt://ticket/540',
 
])
 
def test_filter_for_rt_id_syntax_variations(rt_id):
 
    entries = [testutil.Transaction(**{'rt-id': rt_id}, postings=[
 
        ('Income:Donations', -10),
 
        ('Assets:Cash', 10),
 
    ])]
 
    postings = data.Posting.from_entries(entries)
 
    actual = filters.filter_for_rt_id(postings, 450)
 
    check_filter(actual, entries, range(2))
 

	
 
def test_filter_for_rt_id_uses_first_link_only():
 
    entries = [testutil.Transaction(postings=[
 
        ('Income:Donations', -10, {'rt-id': 'rt:1 rt:350'}),
 
        ('Assets:Cash', 10, {'rt-id': 'rt://ticket/2 rt://ticket/350'}),
 
    ])]
 
    postings = data.Posting.from_entries(entries)
 
    actual = filters.filter_for_rt_id(postings, 350)
 
    check_filter(actual, entries, ()),
tests/test_rtutil.py
Show inline comments
 
"""Test RT integration"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import contextlib
 
import itertools
 
import re
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import rtutil
 

	
 
DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]
 

	
 
EXPECTED_URLS = [
 
    (1, None, 'Ticket/Display.html?id=1'),
 
    (1, 2, 'Ticket/Display.html?id=1#txn-1'),
 
    (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
 
    (1, 99, None),
 
    (2, 1, None),
 
    (2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'),
 
    (2, 13, 'Ticket/Display.html?id=2#txn-11'),
 
    (2, 14, 'Ticket/Display.html?id=2#txn-11'),  # statement.txt
 
    (3, None, 'Ticket/Display.html?id=3'),
 
    (9, None, None),
 
]
 

	
 
@pytest.fixture(scope='module')
 
def rt():
 
    client = testutil.RTClient()
 
    return rtutil.RT(client)
 

	
 
@pytest.fixture
 
def new_client():
 
    class RTClient(testutil.RTClient):
 
        TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
 
    return RTClient()
 

	
 
def new_cache(database=':memory:'):
 
    db = rtutil.RTLinkCache.setup(database)
 
    if db is None:
 
        print("NOTE: did not set up database cache at {}".format(database))
 
        return contextlib.nullcontext(db)
 
    else:
 
        return contextlib.closing(db)
 

	
 
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
 
def test_url(rt, ticket_id, attachment_id, expected):
 
    if expected is not None:
 
        expected = DEFAULT_RT_URL + expected
 
    assert rt.url(ticket_id, attachment_id) == expected
 

	
 
@pytest.mark.parametrize('attachment_id,first_link_only', itertools.product(
 
    [245, None],
 
    [True, False],
 
))
 
def test_metadata_regexp(rt, attachment_id, first_link_only):
 
    if attachment_id is None:
 
        match_links = ['rt:220', 'rt://ticket/220']
 
    else:
 
        match_links = [f'rt:220/{attachment_id}',
 
                       f'rt://ticket/220/attachments/{attachment_id}']
 
    regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only)
 
    for link in match_links:
 
        assert re.search(regexp, link)
 
        assert re.search(regexp, link + ' link2')
 
        assert re.search(regexp, link + '0') is None
 
        assert re.search(regexp, 'a' + link) is None
 
        end_match = re.search(regexp, 'link0 ' + link)
 
        if first_link_only:
 
            assert end_match is None
 
        else:
 
            assert end_match
 

	
 
@pytest.mark.parametrize('attachment_id', [
 
    13,
 
    None,
 
])
 
def test_url_caches(new_client, attachment_id):
 
    if attachment_id is None:
 
        fragment = ''
 
    else:
 
        fragment = '#txn-11'
 
    expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment)
 
    rt = rtutil.RT(new_client)
 
    assert rt.url(2, attachment_id) == expected
 
    new_client.TICKET_DATA.clear()
 
    assert rt.url(2, attachment_id) == expected
 

	
 
@pytest.mark.parametrize('mimetype,extension', [
 
    ('application/pdf', 'pdf'),
 
    ('image/png', 'png'),
 
    ('message/rfc822', 'eml'),
 
    ('x-test/x-unknown', 'bin'),
 
])
 
def test_url_default_filename(new_client, mimetype, extension):
 
    new_client.TICKET_DATA['1'] = [('9', '(Unnamed)', mimetype, '50.5k')]
 
    rt = rtutil.RT(new_client)
 
    expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
 
    assert rt.url(1, 9) == expected
 

	
 
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
 
def test_exists(rt, ticket_id, attachment_id, expected):
 
    expected = False if expected is None else True
 
    assert rt.exists(ticket_id, attachment_id) is expected
 

	
 
def test_exists_caches(new_client):
 
    rt = rtutil.RT(new_client)
 
    assert rt.exists(1, 3)
 
    assert rt.exists(2)
 
    assert not rt.exists(1, 99)
 
    assert not rt.exists(9)
 
    new_client.TICKET_DATA.clear()
 
    assert rt.exists(1, 3)
 
    assert rt.exists(2)
 
    assert not rt.exists(1, 99)
 
    assert not rt.exists(9)
 

	
 
@pytest.mark.parametrize('link,expected', [
 
    ('rt:1/2', ('1', '2')),
 
    ('rt:123/456', ('123', '456')),
 
    ('rt:12345', ('12345', None)),
0 comments (0 inline, 0 general)