Files @ 6a3d64ff2250
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/test_rtutil.py

Brett Smith
fields: Change FieldType capitalization.

This is friendlier to the YAML input and consistent with FieldFlags.
Less consistent with the rest of the codebase, but local consistency matters
more IMO.
"""Test RT integration"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import contextlib
import itertools
import logging
import re

import pytest

from . import testutil

from conservancy_beancount import rtutil

DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]

EXPECTED_URLS = [
    (1, None, 'Ticket/Display.html?id=1'),
    (1, 2, 'Ticket/Display.html?id=1#txn-1'),
    (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
    (1, 99, None),
    (2, 1, None),
    (2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'),
    (2, 13, 'Ticket/Display.html?id=2#txn-11'),
    (2, 14, 'Ticket/Display.html?id=2#txn-11'),  # statement.txt
    (3, None, 'Ticket/Display.html?id=3'),
    (9, None, None),
]

EXPECTED_URLS_MAP = {
    (ticket_id, attachment_id): url
    for ticket_id, attachment_id, url in EXPECTED_URLS
}

@pytest.fixture(scope='module')
def rt():
    client = testutil.RTClient()
    return rtutil.RT(client)

@pytest.fixture
def new_client():
    class RTClient(testutil.RTClient):
        TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
    return RTClient()

@contextlib.contextmanager
def nullcontext(thing):
    yield thing

def new_cache(database=':memory:'):
    db = rtutil.RTLinkCache.setup(database)
    if db is None:
        print("NOTE: did not set up database cache at {}".format(database))
        return nullcontext(db)
    else:
        return contextlib.closing(db)

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_url(rt, ticket_id, attachment_id, expected):
    if expected is not None:
        expected = DEFAULT_RT_URL + expected
    assert rt.url(ticket_id, attachment_id) == expected

@pytest.mark.parametrize('attachment_id,first_link_only', itertools.product(
    [245, None],
    [True, False],
))
def test_metadata_regexp(rt, attachment_id, first_link_only):
    if attachment_id is None:
        match_links = ['rt:220', 'rt://ticket/220']
    else:
        match_links = [f'rt:220/{attachment_id}',
                       f'rt://ticket/220/attachments/{attachment_id}']
    regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only)
    for link in match_links:
        assert re.search(regexp, link)
        assert re.search(regexp, link + ' link2')
        assert re.search(regexp, link + '0') is None
        assert re.search(regexp, 'a' + link) is None
        end_match = re.search(regexp, 'link0 ' + link)
        if first_link_only:
            assert end_match is None
        else:
            assert end_match

@pytest.mark.parametrize('attachment_id', [
    13,
    None,
])
def test_url_caches(new_client, attachment_id):
    if attachment_id is None:
        fragment = ''
    else:
        fragment = '#txn-11'
    expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment)
    rt = rtutil.RT(new_client)
    assert rt.url(2, attachment_id) == expected
    new_client.TICKET_DATA.clear()
    assert rt.url(2, attachment_id) == expected

@pytest.mark.parametrize('mimetype,extension', [
    ('application/pdf', 'pdf'),
    ('image/png', 'png'),
    ('message/rfc822', 'eml'),
    ('x-test/x-unknown', 'bin'),
])
def test_url_default_filename(new_client, mimetype, extension):
    new_client.TICKET_DATA['1'] = [('9', '(Unnamed)', mimetype, '50.5k')]
    rt = rtutil.RT(new_client)
    expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
    assert rt.url(1, 9) == expected

@pytest.mark.parametrize('rt_fmt,nonrt_fmt,missing_fmt', [
    ('{}', '{}', '{}',),
    ('<{}>', '[{}]', '({})'),
])
def test_iter_urls(rt, rt_fmt, nonrt_fmt, missing_fmt):
    expected_map = {
        'rt:{}{}'.format(tid, '' if aid is None else f'/{aid}'): url
        for tid, aid, url in EXPECTED_URLS
    }
    expected_map['https://example.com'] = None
    expected_map['invoice.pdf'] = None
    keys = list(expected_map)
    urls = rt.iter_urls(keys, rt_fmt, nonrt_fmt, missing_fmt)
    for key, actual in itertools.zip_longest(keys, urls):
        expected = expected_map[key]
        if expected is None:
            if key.startswith('rt:'):
                expected = missing_fmt.format(key)
            else:
                expected = nonrt_fmt.format(key)
        else:
            expected = rt_fmt.format(DEFAULT_RT_URL + expected)
        assert actual == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_exists(rt, ticket_id, attachment_id, expected):
    expected = False if expected is None else True
    assert rt.exists(ticket_id, attachment_id) is expected

def test_exists_caches(new_client):
    rt = rtutil.RT(new_client)
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)
    new_client.TICKET_DATA.clear()
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)

@pytest.mark.parametrize('link,expected', [
    ('rt:1/2', ('1', '2')),
    ('rt:123/456', ('123', '456')),
    ('rt:12345', ('12345', None)),
    ('rt:12346/', ('12346', None)),
    ('rt:12346/789', ('12346', '789')),
    ('rt:12346/780/', ('12346', '780')),
    ('rt://ticket/1', ('1', None)),
    ('rt://ticket/1/', ('1', None)),
    ('rt://ticket/1234/attachments/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachments/5678/', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678/', ('1234', '5678')),
    ('rt:', None),
    ('rt://', None),
    ('rt:example.org', None),
    ('rt:example.org/1', None),
    ('rt://example.org', None),
    ('rt://example.org/1', None),
    ('https://example.org/rt/Ticket/Display.html?id=123', None),
])
def test_parse(rt, link, expected):
    assert rt.parse(link) == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', [
    ('12', None, 'rt:12'),
    (34, None, 'rt:34'),
    ('56', '78', 'rt:56/78'),
    (90, 880, 'rt:90/880'),
])
def test_unparse(rt, ticket_id, attachment_id, expected):
    assert rt.unparse(ticket_id, attachment_id) == expected

def test_uncommon_server_url_parsing():
    url = 'https://example.org/REST/1.0/'
    client = testutil.RTClient(url + 'REST/1.0/')
    rt = rtutil.RT(client)
    assert rt.url(1).startswith(url)

def test_shared_cache(new_client):
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache() as cachedb:
        rt1 = rtutil.RT(new_client, cachedb)
        assert rt1.url(ticket_id) == expected
        new_client.TICKET_DATA.clear()
        rt2 = rtutil.RT(new_client, cachedb)
        assert rt2.url(ticket_id) == expected
        assert not rt2.exists(ticket_id + 1)
        assert rt1 is not rt2

def test_no_shared_cache(new_client):
    with new_cache() as cache1, new_cache() as cache2:
        rt1 = rtutil.RT(new_client, cache1)
        rt2 = rtutil.RT(new_client, cache2)
        assert rt1.exists(1)
        new_client.TICKET_DATA.clear()
        assert not rt2.exists(1)
        assert rt1.exists(1)

def test_read_only_cache(new_client, tmp_path, caplog):
    caplog.set_level(logging.DEBUG, logger='conservancy_beancount.rtutil')
    db_path = tmp_path / 'test.db'
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache(db_path) as cache1:
        rt1 = rtutil.RT(new_client, cache1)
        assert rt1.url(ticket_id) == expected
    new_client.TICKET_DATA.clear()
    db_path.chmod(0o400)
    with new_cache(db_path) as cache2:
        rt2 = rtutil.RT(new_client, cache2)
        assert rt2.url(ticket_id) == expected
        assert rt2.url(ticket_id + 1) is None

def test_results_not_found_only_in_transient_cache(new_client):
    with new_cache() as cache:
        rt1 = rtutil.RT(new_client, cache)
        rt2 = rtutil.RT(new_client, cache)
        assert not rt1.exists(9)
        new_client.TICKET_DATA['9'] = [('99', '(Unnamed)', 'text/plain', '0b')]
        assert not rt1.exists(9)
        assert rt2.exists(9)

def test_txn_with_urls(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'doc1.txt rt:1/4 doc2.txt',
    }
    txn = testutil.Transaction(**txn_meta, postings=[
        ('Income:Donations', -10, {'receipt': 'rt:2/13 donation.txt'}),
        ('Assets:Cash', 10, {'receipt': 'cash.png rt:2/14'}),
    ])
    actual = rt.txn_with_urls(txn)
    def check(source, key, ticket_id, attachment_id=None):
        url_path = EXPECTED_URLS_MAP[(ticket_id, attachment_id)]
        assert f'<{DEFAULT_RT_URL}{url_path}>' in source.meta[key]
    expected_keys = set(txn_meta)
    expected_keys.update(['filename', 'lineno'])
    assert set(actual.meta) == expected_keys
    check(actual, 'rt-id', 1)
    assert actual.meta['contract'] == txn_meta['contract']
    assert actual.meta['statement'].startswith('doc1.txt ')
    check(actual, 'statement', 1, 4)
    check(actual.postings[0], 'receipt', 2, 13)
    assert actual.postings[0].meta['receipt'].endswith(' donation.txt')
    check(actual.postings[1], 'receipt', 2, 14)
    assert actual.postings[1].meta['receipt'].startswith('cash.png ')
    # Check the original transaction is unchanged
    for key, expected in txn_meta.items():
        assert txn.meta[key] == expected
    assert txn.postings[0].meta['receipt'] == 'rt:2/13 donation.txt'
    assert txn.postings[1].meta['receipt'] == 'cash.png rt:2/14'

def test_txn_with_urls_with_fmts(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'rt:1/99 rt:1/4 stmt.txt',
    }
    txn = testutil.Transaction(**txn_meta)
    actual = rt.txn_with_urls(txn, '<{}>', '[{}]', '({})')
    rt_id_path = EXPECTED_URLS_MAP[(1, None)]
    assert actual.meta['rt-id'] == f'<{DEFAULT_RT_URL}{rt_id_path}>'
    assert actual.meta['contract'] == '[RepoLink.pdf]'
    statement_path = EXPECTED_URLS_MAP[(1, 4)]
    assert actual.meta['statement'] == ' '.join([
        '(rt:1/99)',
        f'<{DEFAULT_RT_URL}{statement_path}>',
        '[stmt.txt]',
    ])