Files @ 5784068904e8
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/test_rtutil.py - annotation

bkuhn
payroll-type — US:403b:Employee:Roth — needed separate since taxable

Since Roth contributions are taxable, there are some reports that
need to include these amounts in total salary (i.e., when running a
report that seeks to show total taxable income for an employee). As
such, we need a `payroll-type` specifically for Roth 403(b)
contributions.
d5a6141f6db6
d5a6141f6db6
1b7fdf4f3b00
d5a6141f6db6
1b7fdf4f3b00
1b7fdf4f3b00
d5a6141f6db6
a8407c7b6a56
7335282e5a64
d8507a1a35a7
66cd27e7f067
d8507a1a35a7
a8407c7b6a56
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
a9eab2d4ea5f
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
9fbc658aa627
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
f227593655ba
d5a6141f6db6
d5a6141f6db6
30e386f645ce
30e386f645ce
30e386f645ce
30e386f645ce
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
30e386f645ce
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d8507a1a35a7
d5a6141f6db6
f227593655ba
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
f227593655ba
f227593655ba
d5a6141f6db6
f227593655ba
d5a6141f6db6
f227593655ba
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
f227593655ba
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
f227593655ba
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
d5a6141f6db6
f227593655ba
d5a6141f6db6
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
9fbc658aa627
e8e713721628
4ca188611fcb
4ca188611fcb
4ca188611fcb
4ca188611fcb
4ca188611fcb
4ca188611fcb
4ca188611fcb
4ca188611fcb
4ca188611fcb
e8e713721628
e8e713721628
e8e713721628
e8e713721628
e8e713721628
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
66cd27e7f067
66cd27e7f067
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
a8407c7b6a56
999ca2c5e1fa
46ac91e86e90
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
999ca2c5e1fa
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
5a1f7122bd3d
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
7335282e5a64
"""Test RT integration"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import contextlib
import datetime
import itertools
import logging
import re

import pytest

from . import testutil

from conservancy_beancount import rtutil

DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]

EXPECTED_URLS = [
    (1, None, 'Ticket/Display.html?id=1'),
    (1, 2, 'Ticket/Display.html?id=1#txn-1'),
    (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
    (1, 99, None),
    (2, 1, None),
    (2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'),
    (2, 13, 'Ticket/Display.html?id=2#txn-11'),
    (2, 14, 'Ticket/Display.html?id=2#txn-11'),  # statement.txt
    (3, None, 'Ticket/Display.html?id=3'),
    (9, None, None),
]

EXPECTED_URLS_MAP = {
    (ticket_id, attachment_id): url
    for ticket_id, attachment_id, url in EXPECTED_URLS
}

@pytest.fixture(scope='module')
def rt():
    client = testutil.RTClient()
    return rtutil.RT(client)

@pytest.fixture
def new_client():
    class RTClient(testutil.RTClient):
        TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
    return RTClient()

@contextlib.contextmanager
def nullcontext(thing):
    yield thing

def new_cache(database=':memory:'):
    db = rtutil.RTLinkCache.setup(database)
    if db is None:
        print("NOTE: did not set up database cache at {}".format(database))
        return nullcontext(db)
    else:
        return contextlib.closing(db)

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_url(rt, ticket_id, attachment_id, expected):
    if expected is not None:
        expected = DEFAULT_RT_URL + expected
    assert rt.url(ticket_id, attachment_id) == expected

@pytest.mark.parametrize('attachment_id,first_link_only', itertools.product(
    [245, None],
    [True, False],
))
def test_metadata_regexp(rt, attachment_id, first_link_only):
    if attachment_id is None:
        match_links = ['rt:220', 'rt://ticket/220']
    else:
        match_links = [f'rt:220/{attachment_id}',
                       f'rt://ticket/220/attachments/{attachment_id}']
    regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only)
    for link in match_links:
        assert re.search(regexp, link)
        assert re.search(regexp, link + ' link2')
        assert re.search(regexp, link + '0') is None
        assert re.search(regexp, 'a' + link) is None
        end_match = re.search(regexp, 'link0 ' + link)
        if first_link_only:
            assert end_match is None
        else:
            assert end_match

@pytest.mark.parametrize('attachment_id', [
    13,
    None,
])
def test_url_caches(new_client, attachment_id):
    if attachment_id is None:
        fragment = ''
    else:
        fragment = '#txn-11'
    expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment)
    rt = rtutil.RT(new_client)
    assert rt.url(2, attachment_id) == expected
    new_client.TICKET_DATA.clear()
    assert rt.url(2, attachment_id) == expected

@pytest.mark.parametrize('mimetype,extension', [
    ('application/pdf', 'pdf'),
    ('image/png', 'png'),
    ('message/rfc822', 'eml'),
    ('x-test/x-unknown', 'bin'),
])
def test_url_default_filename(new_client, mimetype, extension):
    new_client.TICKET_DATA['1'] = [('9', '(Unnamed)', mimetype, '50.5k')]
    rt = rtutil.RT(new_client)
    expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
    assert rt.url(1, 9) == expected

@pytest.mark.parametrize('rt_fmt,nonrt_fmt,missing_fmt', [
    ('{}', '{}', '{}',),
    ('<{}>', '[{}]', '({})'),
])
def test_iter_urls(rt, rt_fmt, nonrt_fmt, missing_fmt):
    expected_map = {
        'rt:{}{}'.format(tid, '' if aid is None else f'/{aid}'): url
        for tid, aid, url in EXPECTED_URLS
    }
    expected_map['https://example.com'] = None
    expected_map['invoice.pdf'] = None
    keys = list(expected_map)
    urls = rt.iter_urls(keys, rt_fmt, nonrt_fmt, missing_fmt)
    for key, actual in itertools.zip_longest(keys, urls):
        expected = expected_map[key]
        if expected is None:
            if key.startswith('rt:'):
                expected = missing_fmt.format(key)
            else:
                expected = nonrt_fmt.format(key)
        else:
            expected = rt_fmt.format(DEFAULT_RT_URL + expected)
        assert actual == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
def test_exists(rt, ticket_id, attachment_id, expected):
    expected = False if expected is None else True
    assert rt.exists(ticket_id, attachment_id) is expected

def test_exists_caches(new_client):
    rt = rtutil.RT(new_client)
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)
    new_client.TICKET_DATA.clear()
    assert rt.exists(1, 3)
    assert rt.exists(2)
    assert not rt.exists(1, 99)
    assert not rt.exists(9)

@pytest.mark.parametrize('link,expected', [
    ('rt:1/2', ('1', '2')),
    ('rt:123/456', ('123', '456')),
    ('rt:12345', ('12345', None)),
    ('rt:12346/', ('12346', None)),
    ('rt:12346/789', ('12346', '789')),
    ('rt:12346/780/', ('12346', '780')),
    ('rt://ticket/1', ('1', None)),
    ('rt://ticket/1/', ('1', None)),
    ('rt://ticket/1234/attachments/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachments/5678/', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678', ('1234', '5678')),
    ('rt://ticket/1234/attachment/5678/', ('1234', '5678')),
    ('rt:', None),
    ('rt://', None),
    ('rt:example.org', None),
    ('rt:example.org/1', None),
    ('rt://example.org', None),
    ('rt://example.org/1', None),
    ('https://example.org/rt/Ticket/Display.html?id=123', None),
])
def test_parse(rt, link, expected):
    assert rt.parse(link) == expected

@pytest.mark.parametrize('ticket_id,attachment_id,expected', [
    ('12', None, 'rt:12'),
    (34, None, 'rt:34'),
    ('56', '78', 'rt:56/78'),
    (90, 880, 'rt:90/880'),
])
def test_unparse(rt, ticket_id, attachment_id, expected):
    assert rt.unparse(ticket_id, attachment_id) == expected

def test_uncommon_server_url_parsing():
    url = 'https://example.org/REST/1.0/'
    client = testutil.RTClient(url + 'REST/1.0/')
    rt = rtutil.RT(client)
    assert rt.url(1).startswith(url)

def test_shared_cache(new_client):
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache() as cachedb:
        rt1 = rtutil.RT(new_client, cachedb)
        assert rt1.url(ticket_id) == expected
        new_client.TICKET_DATA.clear()
        rt2 = rtutil.RT(new_client, cachedb)
        assert rt2.url(ticket_id) == expected
        assert not rt2.exists(ticket_id + 1)
        assert rt1 is not rt2

def test_no_shared_cache(new_client):
    with new_cache() as cache1, new_cache() as cache2:
        rt1 = rtutil.RT(new_client, cache1)
        rt2 = rtutil.RT(new_client, cache2)
        assert rt1.exists(1)
        new_client.TICKET_DATA.clear()
        assert not rt2.exists(1)
        assert rt1.exists(1)

def test_read_only_cache(new_client, tmp_path, caplog):
    caplog.set_level(logging.DEBUG, logger='conservancy_beancount.rtutil')
    db_path = tmp_path / 'test.db'
    ticket_id, _, expected = EXPECTED_URLS[0]
    expected = DEFAULT_RT_URL + expected
    with new_cache(db_path) as cache1:
        rt1 = rtutil.RT(new_client, cache1)
        assert rt1.url(ticket_id) == expected
    new_client.TICKET_DATA.clear()
    db_path.chmod(0o400)
    with new_cache(db_path) as cache2:
        rt2 = rtutil.RT(new_client, cache2)
        assert rt2.url(ticket_id) == expected
        assert rt2.url(ticket_id + 1) is None

def test_results_not_found_only_in_transient_cache(new_client):
    with new_cache() as cache:
        rt1 = rtutil.RT(new_client, cache)
        rt2 = rtutil.RT(new_client, cache)
        assert not rt1.exists(9)
        new_client.TICKET_DATA['9'] = [('99', '(Unnamed)', 'text/plain', '0b')]
        assert not rt1.exists(9)
        assert rt2.exists(9)

def test_txn_with_urls(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'doc1.txt rt:1/4 doc2.txt',
    }
    txn = testutil.Transaction(**txn_meta, postings=[
        ('Income:Donations', -10, {'receipt': 'rt:2/13 donation.txt'}),
        ('Assets:Cash', 10, {'receipt': 'cash.png rt:2/14'}),
    ])
    actual = rt.txn_with_urls(txn)
    def check(source, key, ticket_id, attachment_id=None):
        url_path = EXPECTED_URLS_MAP[(ticket_id, attachment_id)]
        assert f'<{DEFAULT_RT_URL}{url_path}>' in source.meta[key]
    expected_keys = set(txn_meta)
    expected_keys.update(['filename', 'lineno'])
    assert set(actual.meta) == expected_keys
    check(actual, 'rt-id', 1)
    assert actual.meta['contract'] == txn_meta['contract']
    assert actual.meta['statement'].startswith('doc1.txt ')
    check(actual, 'statement', 1, 4)
    check(actual.postings[0], 'receipt', 2, 13)
    assert actual.postings[0].meta['receipt'].endswith(' donation.txt')
    check(actual.postings[1], 'receipt', 2, 14)
    assert actual.postings[1].meta['receipt'].startswith('cash.png ')
    # Check the original transaction is unchanged
    for key, expected in txn_meta.items():
        assert txn.meta[key] == expected
    assert txn.postings[0].meta['receipt'] == 'rt:2/13 donation.txt'
    assert txn.postings[1].meta['receipt'] == 'cash.png rt:2/14'

def test_txn_with_urls_with_fmts(rt):
    txn_meta = {
        'rt-id': 'rt:1',
        'contract': 'RepoLink.pdf',
        'statement': 'rt:1/99 rt:1/4 stmt.txt',
    }
    txn = testutil.Transaction(**txn_meta)
    actual = rt.txn_with_urls(txn, '<{}>', '[{}]', '({})')
    rt_id_path = EXPECTED_URLS_MAP[(1, None)]
    assert actual.meta['rt-id'] == f'<{DEFAULT_RT_URL}{rt_id_path}>'
    assert actual.meta['contract'] == '[RepoLink.pdf]'
    statement_path = EXPECTED_URLS_MAP[(1, 4)]
    assert actual.meta['statement'] == ' '.join([
        '(rt:1/99)',
        f'<{DEFAULT_RT_URL}{statement_path}>',
        '[stmt.txt]',
    ])

@pytest.mark.parametrize('arg,exp_num,exp_offset', [
    # These correspond to the different datetime formats available through
    # RT's user settings.
    ('Mon Mar 1 01:01:01 2021', 1, None),
    ('2021-03-02 02:02:02', 2, None),
    ('2021-03-03T03:03:03-0500', 3, -18000),
    ('Thu, 4 Mar 2021 04:04:04 -0600', 4, -21600),
    ('Fri, 5 Mar 2021 05:05:05 GMT', 5, 0),
    ('20210306T060606Z', 6, 0),
    ('Sun, Mar 7, 2021 07:07:07 AM', 7, None),
    ('Sun, Mar 14, 2021 02:14:14 PM', 14, None),
])
def test_rt_datetime(arg, exp_num, exp_offset):
    actual = rtutil.RTDateTime(arg)
    assert actual.year == 2021
    assert actual.month == 3
    assert actual.day == exp_num
    assert actual.hour == exp_num
    assert actual.minute == exp_num
    assert actual.second == exp_num
    if exp_offset is None:
        assert actual.tzinfo is None
    else:
        assert actual.tzinfo.utcoffset(None).total_seconds() == exp_offset

@pytest.mark.parametrize('arg', ['Not set', '', None])
def test_rt_datetime_empty(arg):
    actual = rtutil.RTDateTime(arg)
    assert actual == datetime.datetime.min
    assert actual.tzinfo is None