Files @ 5a8da108b983
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/test_rtutil.py

bsturmfels
statement_reconciler: Add initial Chase bank CSV statement matching

We currently don't have many examples to work with, so haven't done any
significant testing of the matching accuracy between statement and books.
"""Test RT integration"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import contextlib
import datetime
import itertools
import logging
import re

import pytest

from . import testutil

from conservancy_beancount import rtutil

DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]

EXPECTED_URLS = [
    (1, None, 'Ticket/Display.html?id=1'),
    (1, 2, 'Ticket/Display.html?id=1#txn-1'),
    (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
    (1, 99, None),
    (2, 1, None),
    (2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'),
    (2, 13, 'Ticket/Display.html?id=2#txn-11'),
    (2, 14, 'Ticket/Display.html?id=2#txn-11'),  # statement.txt
    (3, None, 'Ticket/Display.html?id=3'),
    (9, None, None),
]

EXPECTED_URLS_MAP = {
    (ticket_id, attachment_id): url
    for ticket_id, attachment_id, url in EXPECTED_URLS
}

@pytest.fixture(scope='module')
def rt():
    client = testutil.RTClient()
    return rtutil.RT(client)

@pytest.fixture
def new_client():
    class RTClient(testutil.RTClient):
        TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
    return RTClient()

@contextlib.contextmanager
def nullcontext(thing):
    yield thing

def new_cache(database=':memory:'):
    db = rtutil.RTLinkCache.setup(database)
    if db is None:
        print("NOTE: did not set up database cache at {}".format(database))
        return nullcontext(db)
    else:
        return contextlib.closing(db)

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_url(rt, ticket_id, attachment_id, expected):
    if expected is not None:
        expected = DEFAULT_RT_URL + expected
    assert rt.url(ticket_id, attachment_id) == expected

@pytest.mark.parametrize('attachment_id,first_link_only', itertools.product(
    [245, None],
    [True, False],
))
def test_metadata_regexp(rt, attachment_id, first_link_only):
    if attachment_id is None:
        match_links = ['rt:220', 'rt://ticket/220']
    else:
        match_links = [f'rt:220/{attachment_id}',
                       f'rt://ticket/220/attachments/{attachment_id}']
    regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only)
    for link in match_links:
        assert re.search(regexp, link)
        assert re.search(regexp, link + ' link2')
        assert re.search(regexp, link + '0') is None
        assert re.search(regexp, 'a' + link) is None
        end_match = re.search(regexp, 'link0 ' + link)
        if first_link_only:
            assert end_match is None
        else:
            assert end_match

@pytest.mark.parametrize('attachment_id', [
    13,
    None,
])
def test_url_caches(new_client, attachment_id):
    if attachment_id is None:
        fragment = ''
    else:
        fragment = '#txn-11'
    expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment)
    rt = rtutil.RT(new_client)
    assert rt.url(2, attachment_id) == expected
    new_client.TICKET_DATA.clear()
    assert rt.url(2, attachment_id) == expected

@pytest.mark.parametrize('mimetype,extension', [
    ('application/pdf', 'pdf'),
    ('image/png', 'png'),
    ('message/rfc822', 'eml'),
    ('x-test/x-unknown', 'bin'),
])
def test_url_default_filename(new_client, mimetype, extension):
    new_client.TICKET_DATA['1'] = [('9', '(Unnamed)', mimetype, '50.5k')]
    rt = rtutil.RT(new_client)
    expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
    assert rt.url(1, 9) == expected

@pytest.mark.parametrize('rt_fmt,nonrt_fmt,missing_fmt', [
    ('{}', '{}', '{}',),
    ('<{}>', '[{}]', '({})'),
])
def test_iter_urls(rt, rt_fmt, nonrt_fmt, missing_fmt):
    expected_map = {
        'rt:{}{}'.format(tid, '' if aid is None else f'/{aid}'): url
        for tid, aid, url in EXPECTED_URLS
    }
    expected_map['https://example.com'] = None
    expected_map['invoice.pdf'] = None
    keys = list(expected_map)
    urls = rt.iter_urls(keys, rt_fmt, nonrt_fmt, missing_fmt)
    for key, actual in itertools.zip_longest(keys, urls):
        expected = expected_map[key]
        if expected is None:
            if key.startswith('rt:'):
                expected = missing_fmt.format(key)
            else:
                expected = nonrt_fmt.format(key)
        else:
            expected = rt_fmt.format(DEFAULT_RT_URL + expected)
        assert actual == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_exists(rt, ticket_id, attachment_id, expected):
    expected = False if expected is None else True
    assert rt.exists(ticket_id, attachment_id) is expected

def test_exists_caches(new_client):
    rt = rtutil.RT(new_client)
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)
    new_client.TICKET_DATA.clear()
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)

@pytest.mark.parametrize('link,expected', [
    ('rt:1/2', ('1', '2')),
    ('rt:123/456', ('123', '456')),
    ('rt:12345', ('12345', None)),
    ('rt:12346/', ('12346', None)),
    ('rt:12346/789', ('12346', '789')),
    ('rt:12346/780/', ('12346', '780')),
    ('rt://ticket/1', ('1', None)),
    ('rt://ticket/1/', ('1', None)),
    ('rt://ticket/1234/attachments/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachments/5678/', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678/', ('1234', '5678')),
    ('rt:', None),
    ('rt://', None),
    ('rt:example.org', None),
    ('rt:example.org/1', None),
    ('rt://example.org', None),
    ('rt://example.org/1', None),
    ('https://example.org/rt/Ticket/Display.html?id=123', None),
])
def test_parse(rt, link, expected):
    assert rt.parse(link) == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', [
    ('12', None, 'rt:12'),
    (34, None, 'rt:34'),
    ('56', '78', 'rt:56/78'),
    (90, 880, 'rt:90/880'),
])
def test_unparse(rt, ticket_id, attachment_id, expected):
    assert rt.unparse(ticket_id, attachment_id) == expected

def test_uncommon_server_url_parsing():
    url = 'https://example.org/REST/1.0/'
    client = testutil.RTClient(url + 'REST/1.0/')
    rt = rtutil.RT(client)
    assert rt.url(1).startswith(url)

def test_shared_cache(new_client):
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache() as cachedb:
        rt1 = rtutil.RT(new_client, cachedb)
        assert rt1.url(ticket_id) == expected
        new_client.TICKET_DATA.clear()
        rt2 = rtutil.RT(new_client, cachedb)
        assert rt2.url(ticket_id) == expected
        assert not rt2.exists(ticket_id + 1)
        assert rt1 is not rt2

def test_no_shared_cache(new_client):
    with new_cache() as cache1, new_cache() as cache2:
        rt1 = rtutil.RT(new_client, cache1)
        rt2 = rtutil.RT(new_client, cache2)
        assert rt1.exists(1)
        new_client.TICKET_DATA.clear()
        assert not rt2.exists(1)
        assert rt1.exists(1)

def test_read_only_cache(new_client, tmp_path, caplog):
    caplog.set_level(logging.DEBUG, logger='conservancy_beancount.rtutil')
    db_path = tmp_path / 'test.db'
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache(db_path) as cache1:
        rt1 = rtutil.RT(new_client, cache1)
        assert rt1.url(ticket_id) == expected
    new_client.TICKET_DATA.clear()
    db_path.chmod(0o400)
    with new_cache(db_path) as cache2:
        rt2 = rtutil.RT(new_client, cache2)
        assert rt2.url(ticket_id) == expected
        assert rt2.url(ticket_id + 1) is None

def test_results_not_found_only_in_transient_cache(new_client):
    with new_cache() as cache:
        rt1 = rtutil.RT(new_client, cache)
        rt2 = rtutil.RT(new_client, cache)
        assert not rt1.exists(9)
        new_client.TICKET_DATA['9'] = [('99', '(Unnamed)', 'text/plain', '0b')]
        assert not rt1.exists(9)
        assert rt2.exists(9)

def test_txn_with_urls(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'doc1.txt rt:1/4 doc2.txt',
    }
    txn = testutil.Transaction(**txn_meta, postings=[
        ('Income:Donations', -10, {'receipt': 'rt:2/13 donation.txt'}),
        ('Assets:Cash', 10, {'receipt': 'cash.png rt:2/14'}),
    ])
    actual = rt.txn_with_urls(txn)
    def check(source, key, ticket_id, attachment_id=None):
        url_path = EXPECTED_URLS_MAP[(ticket_id, attachment_id)]
        assert f'<{DEFAULT_RT_URL}{url_path}>' in source.meta[key]
    expected_keys = set(txn_meta)
    expected_keys.update(['filename', 'lineno'])
    assert set(actual.meta) == expected_keys
    check(actual, 'rt-id', 1)
    assert actual.meta['contract'] == txn_meta['contract']
    assert actual.meta['statement'].startswith('doc1.txt ')
    check(actual, 'statement', 1, 4)
    check(actual.postings[0], 'receipt', 2, 13)
    assert actual.postings[0].meta['receipt'].endswith(' donation.txt')
    check(actual.postings[1], 'receipt', 2, 14)
    assert actual.postings[1].meta['receipt'].startswith('cash.png ')
    # Check the original transaction is unchanged
    for key, expected in txn_meta.items():
        assert txn.meta[key] == expected
    assert txn.postings[0].meta['receipt'] == 'rt:2/13 donation.txt'
    assert txn.postings[1].meta['receipt'] == 'cash.png rt:2/14'

def test_txn_with_urls_with_fmts(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'rt:1/99 rt:1/4 stmt.txt',
    }
    txn = testutil.Transaction(**txn_meta)
    actual = rt.txn_with_urls(txn, '<{}>', '[{}]', '({})')
    rt_id_path = EXPECTED_URLS_MAP[(1, None)]
    assert actual.meta['rt-id'] == f'<{DEFAULT_RT_URL}{rt_id_path}>'
    assert actual.meta['contract'] == '[RepoLink.pdf]'
    statement_path = EXPECTED_URLS_MAP[(1, 4)]
    assert actual.meta['statement'] == ' '.join([
        '(rt:1/99)',
        f'<{DEFAULT_RT_URL}{statement_path}>',
        '[stmt.txt]',
    ])

@pytest.mark.parametrize('arg,exp_num,exp_offset', [
    # These correspond to the different datetime formats available through
    # RT's user settings.
    ('Mon Mar 1 01:01:01 2021', 1, None),
    ('2021-03-02 02:02:02', 2, None),
    ('2021-03-03T03:03:03-0500', 3, -18000),
    ('Thu, 4 Mar 2021 04:04:04 -0600', 4, -21600),
    ('Fri, 5 Mar 2021 05:05:05 GMT', 5, 0),
    ('20210306T060606Z', 6, 0),
    ('Sun, Mar 7, 2021 07:07:07 AM', 7, None),
    ('Sun, Mar 14, 2021 02:14:14 PM', 14, None),
])
def test_rt_datetime(arg, exp_num, exp_offset):
    actual = rtutil.RTDateTime(arg)
    assert actual.year == 2021
    assert actual.month == 3
    assert actual.day == exp_num
    assert actual.hour == exp_num
    assert actual.minute == exp_num
    assert actual.second == exp_num
    if exp_offset is None:
        assert actual.tzinfo is None
    else:
        assert actual.tzinfo.utcoffset(None).total_seconds() == exp_offset

@pytest.mark.parametrize('arg', ['Not set', '', None])
def test_rt_datetime_empty(arg):
    actual = rtutil.RTDateTime(arg)
    assert actual == datetime.datetime.min
    assert actual.tzinfo is None