Files @ 5784068904e8
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/test_rtutil.py

bkuhn
payroll-type — US:403b:Employee:Roth — needed separate since taxable

Since Roth contributions are taxable, there are some reports that
need to include these amounts in total salary (i.e., when running a
report that seeks to show total taxable income for an employee). As
such, we need a `payroll-type` specifically for Roth 403(b)
contributions.
"""Test RT integration"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import contextlib
import datetime
import itertools
import logging
import re

import pytest

from . import testutil

from conservancy_beancount import rtutil

DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]

EXPECTED_URLS = [
    (1, None, 'Ticket/Display.html?id=1'),
    (1, 2, 'Ticket/Display.html?id=1#txn-1'),
    (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
    (1, 99, None),
    (2, 1, None),
    (2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'),
    (2, 13, 'Ticket/Display.html?id=2#txn-11'),
    (2, 14, 'Ticket/Display.html?id=2#txn-11'),  # statement.txt
    (3, None, 'Ticket/Display.html?id=3'),
    (9, None, None),
]

EXPECTED_URLS_MAP = {
    (ticket_id, attachment_id): url
    for ticket_id, attachment_id, url in EXPECTED_URLS
}

@pytest.fixture(scope='module')
def rt():
    client = testutil.RTClient()
    return rtutil.RT(client)

@pytest.fixture
def new_client():
    class RTClient(testutil.RTClient):
        TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
    return RTClient()

@contextlib.contextmanager
def nullcontext(thing):
    yield thing

def new_cache(database=':memory:'):
    db = rtutil.RTLinkCache.setup(database)
    if db is None:
        print("NOTE: did not set up database cache at {}".format(database))
        return nullcontext(db)
    else:
        return contextlib.closing(db)

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_url(rt, ticket_id, attachment_id, expected):
    if expected is not None:
        expected = DEFAULT_RT_URL + expected
    assert rt.url(ticket_id, attachment_id) == expected

@pytest.mark.parametrize('attachment_id,first_link_only', itertools.product(
    [245, None],
    [True, False],
))
def test_metadata_regexp(rt, attachment_id, first_link_only):
    if attachment_id is None:
        match_links = ['rt:220', 'rt://ticket/220']
    else:
        match_links = [f'rt:220/{attachment_id}',
                       f'rt://ticket/220/attachments/{attachment_id}']
    regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only)
    for link in match_links:
        assert re.search(regexp, link)
        assert re.search(regexp, link + ' link2')
        assert re.search(regexp, link + '0') is None
        assert re.search(regexp, 'a' + link) is None
        end_match = re.search(regexp, 'link0 ' + link)
        if first_link_only:
            assert end_match is None
        else:
            assert end_match

@pytest.mark.parametrize('attachment_id', [
    13,
    None,
])
def test_url_caches(new_client, attachment_id):
    if attachment_id is None:
        fragment = ''
    else:
        fragment = '#txn-11'
    expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment)
    rt = rtutil.RT(new_client)
    assert rt.url(2, attachment_id) == expected
    new_client.TICKET_DATA.clear()
    assert rt.url(2, attachment_id) == expected

@pytest.mark.parametrize('mimetype,extension', [
    ('application/pdf', 'pdf'),
    ('image/png', 'png'),
    ('message/rfc822', 'eml'),
    ('x-test/x-unknown', 'bin'),
])
def test_url_default_filename(new_client, mimetype, extension):
    new_client.TICKET_DATA['1'] = [('9', '(Unnamed)', mimetype, '50.5k')]
    rt = rtutil.RT(new_client)
    expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
    assert rt.url(1, 9) == expected

@pytest.mark.parametrize('rt_fmt,nonrt_fmt,missing_fmt', [
    ('{}', '{}', '{}',),
    ('<{}>', '[{}]', '({})'),
])
def test_iter_urls(rt, rt_fmt, nonrt_fmt, missing_fmt):
    expected_map = {
        'rt:{}{}'.format(tid, '' if aid is None else f'/{aid}'): url
        for tid, aid, url in EXPECTED_URLS
    }
    expected_map['https://example.com'] = None
    expected_map['invoice.pdf'] = None
    keys = list(expected_map)
    urls = rt.iter_urls(keys, rt_fmt, nonrt_fmt, missing_fmt)
    for key, actual in itertools.zip_longest(keys, urls):
        expected = expected_map[key]
        if expected is None:
            if key.startswith('rt:'):
                expected = missing_fmt.format(key)
            else:
                expected = nonrt_fmt.format(key)
        else:
            expected = rt_fmt.format(DEFAULT_RT_URL + expected)
        assert actual == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_exists(rt, ticket_id, attachment_id, expected):
    expected = False if expected is None else True
    assert rt.exists(ticket_id, attachment_id) is expected

def test_exists_caches(new_client):
    rt = rtutil.RT(new_client)
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)
    new_client.TICKET_DATA.clear()
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)

@pytest.mark.parametrize('link,expected', [
    ('rt:1/2', ('1', '2')),
    ('rt:123/456', ('123', '456')),
    ('rt:12345', ('12345', None)),
    ('rt:12346/', ('12346', None)),
    ('rt:12346/789', ('12346', '789')),
    ('rt:12346/780/', ('12346', '780')),
    ('rt://ticket/1', ('1', None)),
    ('rt://ticket/1/', ('1', None)),
    ('rt://ticket/1234/attachments/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachments/5678/', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678/', ('1234', '5678')),
    ('rt:', None),
    ('rt://', None),
    ('rt:example.org', None),
    ('rt:example.org/1', None),
    ('rt://example.org', None),
    ('rt://example.org/1', None),
    ('https://example.org/rt/Ticket/Display.html?id=123', None),
])
def test_parse(rt, link, expected):
    assert rt.parse(link) == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', [
    ('12', None, 'rt:12'),
    (34, None, 'rt:34'),
    ('56', '78', 'rt:56/78'),
    (90, 880, 'rt:90/880'),
])
def test_unparse(rt, ticket_id, attachment_id, expected):
    assert rt.unparse(ticket_id, attachment_id) == expected

def test_uncommon_server_url_parsing():
    url = 'https://example.org/REST/1.0/'
    client = testutil.RTClient(url + 'REST/1.0/')
    rt = rtutil.RT(client)
    assert rt.url(1).startswith(url)

def test_shared_cache(new_client):
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache() as cachedb:
        rt1 = rtutil.RT(new_client, cachedb)
        assert rt1.url(ticket_id) == expected
        new_client.TICKET_DATA.clear()
        rt2 = rtutil.RT(new_client, cachedb)
        assert rt2.url(ticket_id) == expected
        assert not rt2.exists(ticket_id + 1)
        assert rt1 is not rt2

def test_no_shared_cache(new_client):
    with new_cache() as cache1, new_cache() as cache2:
        rt1 = rtutil.RT(new_client, cache1)
        rt2 = rtutil.RT(new_client, cache2)
        assert rt1.exists(1)
        new_client.TICKET_DATA.clear()
        assert not rt2.exists(1)
        assert rt1.exists(1)

def test_read_only_cache(new_client, tmp_path, caplog):
    caplog.set_level(logging.DEBUG, logger='conservancy_beancount.rtutil')
    db_path = tmp_path / 'test.db'
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache(db_path) as cache1:
        rt1 = rtutil.RT(new_client, cache1)
        assert rt1.url(ticket_id) == expected
    new_client.TICKET_DATA.clear()
    db_path.chmod(0o400)
    with new_cache(db_path) as cache2:
        rt2 = rtutil.RT(new_client, cache2)
        assert rt2.url(ticket_id) == expected
        assert rt2.url(ticket_id + 1) is None

def test_results_not_found_only_in_transient_cache(new_client):
    with new_cache() as cache:
        rt1 = rtutil.RT(new_client, cache)
        rt2 = rtutil.RT(new_client, cache)
        assert not rt1.exists(9)
        new_client.TICKET_DATA['9'] = [('99', '(Unnamed)', 'text/plain', '0b')]
        assert not rt1.exists(9)
        assert rt2.exists(9)

def test_txn_with_urls(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'doc1.txt rt:1/4 doc2.txt',
    }
    txn = testutil.Transaction(**txn_meta, postings=[
        ('Income:Donations', -10, {'receipt': 'rt:2/13 donation.txt'}),
        ('Assets:Cash', 10, {'receipt': 'cash.png rt:2/14'}),
    ])
    actual = rt.txn_with_urls(txn)
    def check(source, key, ticket_id, attachment_id=None):
        url_path = EXPECTED_URLS_MAP[(ticket_id, attachment_id)]
        assert f'<{DEFAULT_RT_URL}{url_path}>' in source.meta[key]
    expected_keys = set(txn_meta)
    expected_keys.update(['filename', 'lineno'])
    assert set(actual.meta) == expected_keys
    check(actual, 'rt-id', 1)
    assert actual.meta['contract'] == txn_meta['contract']
    assert actual.meta['statement'].startswith('doc1.txt ')
    check(actual, 'statement', 1, 4)
    check(actual.postings[0], 'receipt', 2, 13)
    assert actual.postings[0].meta['receipt'].endswith(' donation.txt')
    check(actual.postings[1], 'receipt', 2, 14)
    assert actual.postings[1].meta['receipt'].startswith('cash.png ')
    # Check the original transaction is unchanged
    for key, expected in txn_meta.items():
        assert txn.meta[key] == expected
    assert txn.postings[0].meta['receipt'] == 'rt:2/13 donation.txt'
    assert txn.postings[1].meta['receipt'] == 'cash.png rt:2/14'

def test_txn_with_urls_with_fmts(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'rt:1/99 rt:1/4 stmt.txt',
    }
    txn = testutil.Transaction(**txn_meta)
    actual = rt.txn_with_urls(txn, '<{}>', '[{}]', '({})')
    rt_id_path = EXPECTED_URLS_MAP[(1, None)]
    assert actual.meta['rt-id'] == f'<{DEFAULT_RT_URL}{rt_id_path}>'
    assert actual.meta['contract'] == '[RepoLink.pdf]'
    statement_path = EXPECTED_URLS_MAP[(1, 4)]
    assert actual.meta['statement'] == ' '.join([
        '(rt:1/99)',
        f'<{DEFAULT_RT_URL}{statement_path}>',
        '[stmt.txt]',
    ])

@pytest.mark.parametrize('arg,exp_num,exp_offset', [
    # These correspond to the different datetime formats available through
    # RT's user settings.
    ('Mon Mar 1 01:01:01 2021', 1, None),
    ('2021-03-02 02:02:02', 2, None),
    ('2021-03-03T03:03:03-0500', 3, -18000),
    ('Thu, 4 Mar 2021 04:04:04 -0600', 4, -21600),
    ('Fri, 5 Mar 2021 05:05:05 GMT', 5, 0),
    ('20210306T060606Z', 6, 0),
    ('Sun, Mar 7, 2021 07:07:07 AM', 7, None),
    ('Sun, Mar 14, 2021 02:14:14 PM', 14, None),
])
def test_rt_datetime(arg, exp_num, exp_offset):
    actual = rtutil.RTDateTime(arg)
    assert actual.year == 2021
    assert actual.month == 3
    assert actual.day == exp_num
    assert actual.hour == exp_num
    assert actual.minute == exp_num
    assert actual.second == exp_num
    if exp_offset is None:
        assert actual.tzinfo is None
    else:
        assert actual.tzinfo.utcoffset(None).total_seconds() == exp_offset

@pytest.mark.parametrize('arg', ['Not set', '', None])
def test_rt_datetime_empty(arg):
    actual = rtutil.RTDateTime(arg)
    assert actual == datetime.datetime.min
    assert actual.tzinfo is None