"""Test RT integration""" # Copyright © 2020 Brett Smith # License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0 # # Full copyright and licensing details can be found at toplevel file # LICENSE.txt in the repository. import contextlib import datetime import itertools import logging import re import pytest from . import testutil from conservancy_beancount import rtutil DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9] EXPECTED_URLS = [ (1, None, 'Ticket/Display.html?id=1'), (1, 2, 'Ticket/Display.html?id=1#txn-1'), (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'), (1, 99, None), (2, 1, None), (2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'), (2, 13, 'Ticket/Display.html?id=2#txn-11'), (2, 14, 'Ticket/Display.html?id=2#txn-11'), # statement.txt (3, None, 'Ticket/Display.html?id=3'), (9, None, None), ] EXPECTED_URLS_MAP = { (ticket_id, attachment_id): url for ticket_id, attachment_id, url in EXPECTED_URLS } @pytest.fixture(scope='module') def rt(): client = testutil.RTClient() return rtutil.RT(client) @pytest.fixture def new_client(): class RTClient(testutil.RTClient): TICKET_DATA = testutil.RTClient.TICKET_DATA.copy() return RTClient() @contextlib.contextmanager def nullcontext(thing): yield thing def new_cache(database=':memory:'): db = rtutil.RTLinkCache.setup(database) if db is None: print("NOTE: did not set up database cache at {}".format(database)) return nullcontext(db) else: return contextlib.closing(db) @pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS) def test_url(rt, ticket_id, attachment_id, expected): if expected is not None: expected = DEFAULT_RT_URL + expected assert rt.url(ticket_id, attachment_id) == expected @pytest.mark.parametrize('attachment_id,first_link_only', itertools.product( [245, None], [True, False], )) def test_metadata_regexp(rt, attachment_id, first_link_only): if attachment_id is None: match_links = ['rt:220', 'rt://ticket/220'] else: match_links = [f'rt:220/{attachment_id}', f'rt://ticket/220/attachments/{attachment_id}'] regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only) for link in match_links: assert re.search(regexp, link) assert re.search(regexp, link + ' link2') assert re.search(regexp, link + '0') is None assert re.search(regexp, 'a' + link) is None end_match = re.search(regexp, 'link0 ' + link) if first_link_only: assert end_match is None else: assert end_match @pytest.mark.parametrize('attachment_id', [ 13, None, ]) def test_url_caches(new_client, attachment_id): if attachment_id is None: fragment = '' else: fragment = '#txn-11' expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment) rt = rtutil.RT(new_client) assert rt.url(2, attachment_id) == expected new_client.TICKET_DATA.clear() assert rt.url(2, attachment_id) == expected @pytest.mark.parametrize('mimetype,extension', [ ('application/pdf', 'pdf'), ('image/png', 'png'), ('message/rfc822', 'eml'), ('x-test/x-unknown', 'bin'), ]) def test_url_default_filename(new_client, mimetype, extension): new_client.TICKET_DATA['1'] = [('9', '(Unnamed)', mimetype, '50.5k')] rt = rtutil.RT(new_client) expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension) assert rt.url(1, 9) == expected @pytest.mark.parametrize('rt_fmt,nonrt_fmt,missing_fmt', [ ('{}', '{}', '{}',), ('<{}>', '[{}]', '({})'), ]) def test_iter_urls(rt, rt_fmt, nonrt_fmt, missing_fmt): expected_map = { 'rt:{}{}'.format(tid, '' if aid is None else f'/{aid}'): url for tid, aid, url in EXPECTED_URLS } expected_map['https://example.com'] = None expected_map['invoice.pdf'] = None keys = list(expected_map) urls = rt.iter_urls(keys, rt_fmt, nonrt_fmt, missing_fmt) for key, actual in itertools.zip_longest(keys, urls): expected = expected_map[key] if expected is None: if key.startswith('rt:'): expected = missing_fmt.format(key) else: expected = nonrt_fmt.format(key) else: expected = rt_fmt.format(DEFAULT_RT_URL + expected) assert actual == expected @pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS) def test_exists(rt, ticket_id, attachment_id, expected): expected = False if expected is None else True assert rt.exists(ticket_id, attachment_id) is expected def test_exists_caches(new_client): rt = rtutil.RT(new_client) assert rt.exists(1, 3) assert rt.exists(2) assert not rt.exists(1, 99) assert not rt.exists(9) new_client.TICKET_DATA.clear() assert rt.exists(1, 3) assert rt.exists(2) assert not rt.exists(1, 99) assert not rt.exists(9) @pytest.mark.parametrize('link,expected', [ ('rt:1/2', ('1', '2')), ('rt:123/456', ('123', '456')), ('rt:12345', ('12345', None)), ('rt:12346/', ('12346', None)), ('rt:12346/789', ('12346', '789')), ('rt:12346/780/', ('12346', '780')), ('rt://ticket/1', ('1', None)), ('rt://ticket/1/', ('1', None)), ('rt://ticket/1234/attachments/5678', ('1234', '5678')), ('rt://ticket/1234/attachments/5678/', ('1234', '5678')), ('rt://ticket/1234/attachment/5678', ('1234', '5678')), ('rt://ticket/1234/attachment/5678/', ('1234', '5678')), ('rt:', None), ('rt://', None), ('rt:example.org', None), ('rt:example.org/1', None), ('rt://example.org', None), ('rt://example.org/1', None), ('https://example.org/rt/Ticket/Display.html?id=123', None), ]) def test_parse(rt, link, expected): assert rt.parse(link) == expected @pytest.mark.parametrize('ticket_id,attachment_id,expected', [ ('12', None, 'rt:12'), (34, None, 'rt:34'), ('56', '78', 'rt:56/78'), (90, 880, 'rt:90/880'), ]) def test_unparse(rt, ticket_id, attachment_id, expected): assert rt.unparse(ticket_id, attachment_id) == expected def test_uncommon_server_url_parsing(): url = 'https://example.org/REST/1.0/' client = testutil.RTClient(url + 'REST/1.0/') rt = rtutil.RT(client) assert rt.url(1).startswith(url) def test_shared_cache(new_client): ticket_id, _, expected = EXPECTED_URLS[0] expected = DEFAULT_RT_URL + expected with new_cache() as cachedb: rt1 = rtutil.RT(new_client, cachedb) assert rt1.url(ticket_id) == expected new_client.TICKET_DATA.clear() rt2 = rtutil.RT(new_client, cachedb) assert rt2.url(ticket_id) == expected assert not rt2.exists(ticket_id + 1) assert rt1 is not rt2 def test_no_shared_cache(new_client): with new_cache() as cache1, new_cache() as cache2: rt1 = rtutil.RT(new_client, cache1) rt2 = rtutil.RT(new_client, cache2) assert rt1.exists(1) new_client.TICKET_DATA.clear() assert not rt2.exists(1) assert rt1.exists(1) def test_read_only_cache(new_client, tmp_path, caplog): caplog.set_level(logging.DEBUG, logger='conservancy_beancount.rtutil') db_path = tmp_path / 'test.db' ticket_id, _, expected = EXPECTED_URLS[0] expected = DEFAULT_RT_URL + expected with new_cache(db_path) as cache1: rt1 = rtutil.RT(new_client, cache1) assert rt1.url(ticket_id) == expected new_client.TICKET_DATA.clear() db_path.chmod(0o400) with new_cache(db_path) as cache2: rt2 = rtutil.RT(new_client, cache2) assert rt2.url(ticket_id) == expected assert rt2.url(ticket_id + 1) is None def test_results_not_found_only_in_transient_cache(new_client): with new_cache() as cache: rt1 = rtutil.RT(new_client, cache) rt2 = rtutil.RT(new_client, cache) assert not rt1.exists(9) new_client.TICKET_DATA['9'] = [('99', '(Unnamed)', 'text/plain', '0b')] assert not rt1.exists(9) assert rt2.exists(9) def test_txn_with_urls(rt): txn_meta = { 'rt-id': 'rt:1', 'contract': 'RepoLink.pdf', 'statement': 'doc1.txt rt:1/4 doc2.txt', } txn = testutil.Transaction(**txn_meta, postings=[ ('Income:Donations', -10, {'receipt': 'rt:2/13 donation.txt'}), ('Assets:Cash', 10, {'receipt': 'cash.png rt:2/14'}), ]) actual = rt.txn_with_urls(txn) def check(source, key, ticket_id, attachment_id=None): url_path = EXPECTED_URLS_MAP[(ticket_id, attachment_id)] assert f'<{DEFAULT_RT_URL}{url_path}>' in source.meta[key] expected_keys = set(txn_meta) expected_keys.update(['filename', 'lineno']) assert set(actual.meta) == expected_keys check(actual, 'rt-id', 1) assert actual.meta['contract'] == txn_meta['contract'] assert actual.meta['statement'].startswith('doc1.txt ') check(actual, 'statement', 1, 4) check(actual.postings[0], 'receipt', 2, 13) assert actual.postings[0].meta['receipt'].endswith(' donation.txt') check(actual.postings[1], 'receipt', 2, 14) assert actual.postings[1].meta['receipt'].startswith('cash.png ') # Check the original transaction is unchanged for key, expected in txn_meta.items(): assert txn.meta[key] == expected assert txn.postings[0].meta['receipt'] == 'rt:2/13 donation.txt' assert txn.postings[1].meta['receipt'] == 'cash.png rt:2/14' def test_txn_with_urls_with_fmts(rt): txn_meta = { 'rt-id': 'rt:1', 'contract': 'RepoLink.pdf', 'statement': 'rt:1/99 rt:1/4 stmt.txt', } txn = testutil.Transaction(**txn_meta) actual = rt.txn_with_urls(txn, '<{}>', '[{}]', '({})') rt_id_path = EXPECTED_URLS_MAP[(1, None)] assert actual.meta['rt-id'] == f'<{DEFAULT_RT_URL}{rt_id_path}>' assert actual.meta['contract'] == '[RepoLink.pdf]' statement_path = EXPECTED_URLS_MAP[(1, 4)] assert actual.meta['statement'] == ' '.join([ '(rt:1/99)', f'<{DEFAULT_RT_URL}{statement_path}>', '[stmt.txt]', ]) @pytest.mark.parametrize('arg,exp_num,exp_offset', [ # These correspond to the different datetime formats available through # RT's user settings. ('Mon Mar 1 01:01:01 2021', 1, None), ('2021-03-02 02:02:02', 2, None), ('2021-03-03T03:03:03-0500', 3, -18000), ('Thu, 4 Mar 2021 04:04:04 -0600', 4, -21600), ('Fri, 5 Mar 2021 05:05:05 GMT', 5, 0), ('20210306T060606Z', 6, 0), ('Sun, Mar 7, 2021 07:07:07 AM', 7, None), ('Sun, Mar 14, 2021 02:14:14 PM', 14, None), ]) def test_rt_datetime(arg, exp_num, exp_offset): actual = rtutil.RTDateTime(arg) assert actual.year == 2021 assert actual.month == 3 assert actual.day == exp_num assert actual.hour == exp_num assert actual.minute == exp_num assert actual.second == exp_num if exp_offset is None: assert actual.tzinfo is None else: assert actual.tzinfo.utcoffset(None).total_seconds() == exp_offset @pytest.mark.parametrize('arg', ['Not set', '', None]) def test_rt_datetime_empty(arg): actual = rtutil.RTDateTime(arg) assert actual == datetime.datetime.min assert actual.tzinfo is None