Files @ efe9bd88550d
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/test_rtutil.py

Brett Smith
ledger: Change default report dates.

The old defaults were optimized for the audit report.
The new defaults provide more helpful ad hoc reports.
The latter will be run more often and more quickly, so it's
worth optimizing the defaults for them.
"""Test RT integration"""
# Copyright © 2020  Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import contextlib
import itertools
import logging
import re

import pytest

from . import testutil

from conservancy_beancount import rtutil

DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]

EXPECTED_URLS = [
    (1, None, 'Ticket/Display.html?id=1'),
    (1, 2, 'Ticket/Display.html?id=1#txn-1'),
    (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
    (1, 99, None),
    (2, 1, None),
    (2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'),
    (2, 13, 'Ticket/Display.html?id=2#txn-11'),
    (2, 14, 'Ticket/Display.html?id=2#txn-11'),  # statement.txt
    (3, None, 'Ticket/Display.html?id=3'),
    (9, None, None),
]

EXPECTED_URLS_MAP = {
    (ticket_id, attachment_id): url
    for ticket_id, attachment_id, url in EXPECTED_URLS
}

@pytest.fixture(scope='module')
def rt():
    client = testutil.RTClient()
    return rtutil.RT(client)

@pytest.fixture
def new_client():
    class RTClient(testutil.RTClient):
        TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
    return RTClient()

@contextlib.contextmanager
def nullcontext(thing):
    yield thing

def new_cache(database=':memory:'):
    db = rtutil.RTLinkCache.setup(database)
    if db is None:
        print("NOTE: did not set up database cache at {}".format(database))
        return nullcontext(db)
    else:
        return contextlib.closing(db)

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_url(rt, ticket_id, attachment_id, expected):
    if expected is not None:
        expected = DEFAULT_RT_URL + expected
    assert rt.url(ticket_id, attachment_id) == expected

@pytest.mark.parametrize('attachment_id,first_link_only', itertools.product(
    [245, None],
    [True, False],
))
def test_metadata_regexp(rt, attachment_id, first_link_only):
    if attachment_id is None:
        match_links = ['rt:220', 'rt://ticket/220']
    else:
        match_links = [f'rt:220/{attachment_id}',
                       f'rt://ticket/220/attachments/{attachment_id}']
    regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only)
    for link in match_links:
        assert re.search(regexp, link)
        assert re.search(regexp, link + ' link2')
        assert re.search(regexp, link + '0') is None
        assert re.search(regexp, 'a' + link) is None
        end_match = re.search(regexp, 'link0 ' + link)
        if first_link_only:
            assert end_match is None
        else:
            assert end_match

@pytest.mark.parametrize('attachment_id', [
    13,
    None,
])
def test_url_caches(new_client, attachment_id):
    if attachment_id is None:
        fragment = ''
    else:
        fragment = '#txn-11'
    expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment)
    rt = rtutil.RT(new_client)
    assert rt.url(2, attachment_id) == expected
    new_client.TICKET_DATA.clear()
    assert rt.url(2, attachment_id) == expected

@pytest.mark.parametrize('mimetype,extension', [
    ('application/pdf', 'pdf'),
    ('image/png', 'png'),
    ('message/rfc822', 'eml'),
    ('x-test/x-unknown', 'bin'),
])
def test_url_default_filename(new_client, mimetype, extension):
    new_client.TICKET_DATA['1'] = [('9', '(Unnamed)', mimetype, '50.5k')]
    rt = rtutil.RT(new_client)
    expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
    assert rt.url(1, 9) == expected

@pytest.mark.parametrize('rt_fmt,nonrt_fmt,missing_fmt', [
    ('{}', '{}', '{}',),
    ('<{}>', '[{}]', '({})'),
])
def test_iter_urls(rt, rt_fmt, nonrt_fmt, missing_fmt):
    expected_map = {
        'rt:{}{}'.format(tid, '' if aid is None else f'/{aid}'): url
        for tid, aid, url in EXPECTED_URLS
    }
    expected_map['https://example.com'] = None
    expected_map['invoice.pdf'] = None
    keys = list(expected_map)
    urls = rt.iter_urls(keys, rt_fmt, nonrt_fmt, missing_fmt)
    for key, actual in itertools.zip_longest(keys, urls):
        expected = expected_map[key]
        if expected is None:
            if key.startswith('rt:'):
                expected = missing_fmt.format(key)
            else:
                expected = nonrt_fmt.format(key)
        else:
            expected = rt_fmt.format(DEFAULT_RT_URL + expected)
        assert actual == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_exists(rt, ticket_id, attachment_id, expected):
    expected = False if expected is None else True
    assert rt.exists(ticket_id, attachment_id) is expected

def test_exists_caches(new_client):
    rt = rtutil.RT(new_client)
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)
    new_client.TICKET_DATA.clear()
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)

@pytest.mark.parametrize('link,expected', [
    ('rt:1/2', ('1', '2')),
    ('rt:123/456', ('123', '456')),
    ('rt:12345', ('12345', None)),
    ('rt:12346/', ('12346', None)),
    ('rt:12346/789', ('12346', '789')),
    ('rt:12346/780/', ('12346', '780')),
    ('rt://ticket/1', ('1', None)),
    ('rt://ticket/1/', ('1', None)),
    ('rt://ticket/1234/attachments/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachments/5678/', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678/', ('1234', '5678')),
    ('rt:', None),
    ('rt://', None),
    ('rt:example.org', None),
    ('rt:example.org/1', None),
    ('rt://example.org', None),
    ('rt://example.org/1', None),
    ('https://example.org/rt/Ticket/Display.html?id=123', None),
])
def test_parse(rt, link, expected):
    assert rt.parse(link) == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', [
    ('12', None, 'rt:12'),
    (34, None, 'rt:34'),
    ('56', '78', 'rt:56/78'),
    (90, 880, 'rt:90/880'),
])
def test_unparse(rt, ticket_id, attachment_id, expected):
    assert rt.unparse(ticket_id, attachment_id) == expected

def test_uncommon_server_url_parsing():
    url = 'https://example.org/REST/1.0/'
    client = testutil.RTClient(url + 'REST/1.0/')
    rt = rtutil.RT(client)
    assert rt.url(1).startswith(url)

def test_shared_cache(new_client):
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache() as cachedb:
        rt1 = rtutil.RT(new_client, cachedb)
        assert rt1.url(ticket_id) == expected
        new_client.TICKET_DATA.clear()
        rt2 = rtutil.RT(new_client, cachedb)
        assert rt2.url(ticket_id) == expected
        assert not rt2.exists(ticket_id + 1)
        assert rt1 is not rt2

def test_no_shared_cache(new_client):
    with new_cache() as cache1, new_cache() as cache2:
        rt1 = rtutil.RT(new_client, cache1)
        rt2 = rtutil.RT(new_client, cache2)
        assert rt1.exists(1)
        new_client.TICKET_DATA.clear()
        assert not rt2.exists(1)
        assert rt1.exists(1)

def test_read_only_cache(new_client, tmp_path, caplog):
    caplog.set_level(logging.DEBUG, logger='conservancy_beancount.rtutil')
    db_path = tmp_path / 'test.db'
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache(db_path) as cache1:
        rt1 = rtutil.RT(new_client, cache1)
        assert rt1.url(ticket_id) == expected
    new_client.TICKET_DATA.clear()
    db_path.chmod(0o400)
    with new_cache(db_path) as cache2:
        rt2 = rtutil.RT(new_client, cache2)
        assert rt2.url(ticket_id) == expected
        assert rt2.url(ticket_id + 1) is None

def test_results_not_found_only_in_transient_cache(new_client):
    with new_cache() as cache:
        rt1 = rtutil.RT(new_client, cache)
        rt2 = rtutil.RT(new_client, cache)
        assert not rt1.exists(9)
        new_client.TICKET_DATA['9'] = [('99', '(Unnamed)', 'text/plain', '0b')]
        assert not rt1.exists(9)
        assert rt2.exists(9)

def test_txn_with_urls(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'doc1.txt rt:1/4 doc2.txt',
    }
    txn = testutil.Transaction(**txn_meta, postings=[
        ('Income:Donations', -10, {'receipt': 'rt:2/13 donation.txt'}),
        ('Assets:Cash', 10, {'receipt': 'cash.png rt:2/14'}),
    ])
    actual = rt.txn_with_urls(txn)
    def check(source, key, ticket_id, attachment_id=None):
        url_path = EXPECTED_URLS_MAP[(ticket_id, attachment_id)]
        assert f'<{DEFAULT_RT_URL}{url_path}>' in source.meta[key]
    expected_keys = set(txn_meta)
    expected_keys.update(['filename', 'lineno'])
    assert set(actual.meta) == expected_keys
    check(actual, 'rt-id', 1)
    assert actual.meta['contract'] == txn_meta['contract']
    assert actual.meta['statement'].startswith('doc1.txt ')
    check(actual, 'statement', 1, 4)
    check(actual.postings[0], 'receipt', 2, 13)
    assert actual.postings[0].meta['receipt'].endswith(' donation.txt')
    check(actual.postings[1], 'receipt', 2, 14)
    assert actual.postings[1].meta['receipt'].startswith('cash.png ')
    # Check the original transaction is unchanged
    for key, expected in txn_meta.items():
        assert txn.meta[key] == expected
    assert txn.postings[0].meta['receipt'] == 'rt:2/13 donation.txt'
    assert txn.postings[1].meta['receipt'] == 'cash.png rt:2/14'

def test_txn_with_urls_with_fmts(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'rt:1/99 rt:1/4 stmt.txt',
    }
    txn = testutil.Transaction(**txn_meta)
    actual = rt.txn_with_urls(txn, '<{}>', '[{}]', '({})')
    rt_id_path = EXPECTED_URLS_MAP[(1, None)]
    assert actual.meta['rt-id'] == f'<{DEFAULT_RT_URL}{rt_id_path}>'
    assert actual.meta['contract'] == '[RepoLink.pdf]'
    statement_path = EXPECTED_URLS_MAP[(1, 4)]
    assert actual.meta['statement'] == ' '.join([
        '(rt:1/99)',
        f'<{DEFAULT_RT_URL}{statement_path}>',
        '[stmt.txt]',
    ])