Changeset - 7335282e5a64
[Not reviewed]
0 3 0
Brett Smith - 3 years ago 2021-03-11 18:52:31
brettcsmith@brettcsmith.org
rtutil: Add RTDateTime class.

See comments for rationale.
3 files changed with 58 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/rtutil.py
Show inline comments
 
"""RT client utilities"""
 
# Copyright © 2020  Brett Smith
 
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import datetime
 
import functools
 
import logging
 
import mimetypes
 
import os
 
import re
 
import sqlite3
 
import urllib.parse as urlparse
 

	
 
import dateutil.parser
 
import rt
 

	
 
from pathlib import Path
 

	
 
from . import data
 
from beancount.core import data as bc_data
 

	
 
from typing import (
 
    cast,
 
    overload,
 
    Callable,
 
    Iterable,
 
    Iterator,
 
    MutableMapping,
 
    Optional,
 
    Set,
 
    Tuple,
 
    Union,
 
)
 
from .beancount_types import (
 
    Transaction,
 
)
 

	
 
RTId = Union[int, str]
 
TicketAttachmentIds = Tuple[str, Optional[str]]
 
_LinkCache = MutableMapping[TicketAttachmentIds, Optional[str]]
 
_URLLookup = Callable[..., Optional[str]]
 

	
 
class RTDateTime(datetime.datetime):
 
    """Construct datetime objects from strings returned by RT
 

	
 
    Typical usage looks like::
 

	
 
        ticket = rt_client.get_ticket(...)
 
        created = RTDateTime(ticket.get('Created'))
 
    """
 
    # Normally I'd just write a function to do this, but having a dedicated
 
    # class helps support query-report: the class can pull double duty to both
 
    # parse the data from RT, and determine proper output formatting.
 
    # The RT REST API returns datetimes in the user's configured timezone, and
 
    # there doesn't seem to be any API call that tells you what that is. You
 
    # have to live with the object being timezone-naive.
 
    def __new__(cls, source: str) -> 'RTDateTime':
 
        if not source or source == 'Not set':
 
            retval = datetime.datetime.min
 
        else:
 
            retval = dateutil.parser.parse(source)
 
        return cast(RTDateTime, retval)
 

	
 

	
 
class RTLinkCache(_LinkCache):
 
    """Cache RT links to disk
 

	
 
    This class provides a dict-like interface to a cache of RT links.
 
    Once an object is in RT, a link to it should never change.
 
    The only exception is when objects get shredded, and those objects
 
    shouldn't be referenced in books anyway.
 

	
 
    This implementation is backed by a sqlite database. You can call::
 

	
 
        db = RTLinkCache.setup(path)
 

	
 
    This method will try to open a sqlite database at the given path,
 
    and set up necessary tables, etc.
 
    If it succeeds, it returns a database connection you can use to
 
    initialize the cache.
 
    If it fails, it returns None, and the caller should use some other
 
    dict-like object (like a normal dict) for caching.
 
    You can give the result to the RT utility class either way,
 
    and it will do the right thing for itself::
 

	
 
        rt = RT(rt_client, db)
 
    """
 

	
 
    CREATE_TABLE_SQL = """CREATE TABLE IF NOT EXISTS RTLinkCache(
 
 ticket_id TEXT NOT NULL,
 
 attachment_id TEXT,
 
 url TEXT NOT NULL,
 
 PRIMARY KEY (ticket_id, attachment_id)
 
)"""
 
    logger = logging.getLogger('conservancy_beancount.rtutil.RTLinkCache')
 

	
 
    @classmethod
 
    def setup(cls, cache_path: Path) -> Optional[sqlite3.Connection]:
 
        try:
 
            db = sqlite3.connect(os.fspath(cache_path), isolation_level=None)
 
            cursor = db.cursor()
 
            cursor.execute(cls.CREATE_TABLE_SQL)
 
            cursor.execute('SELECT url FROM RTLinkCache LIMIT 1')
 
            have_data = cursor.fetchone() is not None
 
        except sqlite3.OperationalError:
 
            # If we couldn't get this far, sqlite provides no benefit.
 
            cls.logger.debug("setup: error loading %s", cache_path, exc_info=True)
 
            return None
 
        try:
 
            # There shouldn't be any records where url is NULL, so running this
 
            # DELETE pulls double duty for us: it tells us whether or not we
 
            # can write to the database and it enforces database integrity.
 
            cursor.execute('DELETE FROM RTLinkCache WHERE url IS NULL')
 
        except sqlite3.OperationalError:
 
            cls.logger.debug("setup: error writing %s", cache_path, exc_info=True)
 
            can_write = False
 
        else:
 
            can_write = True
 
        if not (can_write or have_data):
 
            # If there's nothing to read and no way to write, sqlite provides
 
            # no benefit.
 
            cls.logger.debug("setup: not using %s: nothing to read or write", cache_path)
 
            return None
 
        elif not can_write:
 
            # Set up an in-memory database that we can write to, seeded with
 
            # the data available to read.
 
            try:
 
                cursor.close()
 
                db.close()
 
                db = sqlite3.connect(':memory:', isolation_level=None)
 
                cursor = db.cursor()
 
                # It would better to use
 
                #   '{}?mode=ro'.format(cache_path.as_uri())
 
                # as the argument here, but that doesn't work on SUSE 15,
 
                # possibly because its sqlite doesn't recognize query
 
                # arguments (added to upstream sqlite in late 2016).
 
                cursor.execute('ATTACH DATABASE ? AS readsource',
 
                               (os.fspath(cache_path),))
 
                cursor.execute(cls.CREATE_TABLE_SQL)
 
                cursor.execute('INSERT INTO RTLinkCache SELECT * FROM readsource.RTLinkCache')
 
                cursor.execute('DETACH DATABASE readsource')
 
            except sqlite3.OperationalError as error:
 
                # We're back to the case of having nothing to read and no way
 
                # to write.
 
                cls.logger.debug("setup: error loading %s into memory", cache_path, exc_info=True)
 
                return None
 
            else:
 
                cls.logger.debug("setup: loaded %s into memory", cache_path)
 
        else:
 
            cls.logger.debug("setup: caching at %s", cache_path)
 
        cursor.close()
 
        db.commit()
 
        return db
 

	
 
    def __init__(self, cache_db: sqlite3.Connection) -> None:
 
        self._db = cache_db
 
        self._nourls: Set[TicketAttachmentIds] = set()
 

	
 
    def __iter__(self) -> Iterator[TicketAttachmentIds]:
 
        yield from self._db.execute('SELECT ticket_id, attachment_id FROM RTLinkCache')
setup.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.19.0',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'babel>=2.6',  # Debian:python3-babel
 
        'beancount>=2.2',  # Debian:beancount
 
        'GitPython>=2.0',  # Debian:python3-git
 
        # 1.4.1 crashes when trying to save some documents.
 
        'odfpy>=1.4.0,!=1.4.1',  # Debian:python3-odf
 
        'pdfminer.six>=20200101',
 
        'python-dateutil>=2.7',  # Debian:python3-dateutil
 
        'PyYAML>=3.0',  # Debian:python3-yaml
 
        'regex',  # Debian:python3-regex
 
        'rt>=2.0',
 
    ],
 
    setup_requires=[
 
        'pytest-mypy',
 
        'pytest-runner',  # Debian:python3-pytest-runner
 
    ],
 
    tests_require=[
 
        'mypy>=0.770',  # Debian:python3-mypy
 
        'pytest',  # Debian:python3-pytest
 
    ],
 

	
 
    packages=[
 
        'conservancy_beancount',
 
        'conservancy_beancount.pdfforms',
 
        'conservancy_beancount.pdfforms.extract',
 
        'conservancy_beancount.plugin',
 
        'conservancy_beancount.reconcile',
 
        'conservancy_beancount.reports',
 
        'conservancy_beancount.tools',
 
    ],
 
    entry_points={
 
        'console_scripts': [
 
            'accrual-report = conservancy_beancount.reports.accrual:entry_point',
 
            'assemble-audit-reports = conservancy_beancount.tools.audit_report:entry_point',
 
            'balance-sheet-report = conservancy_beancount.reports.balance_sheet:entry_point',
 
            'budget-report = conservancy_beancount.reports.budget:entry_point',
 
            'bean-sort = conservancy_beancount.tools.sort_entries:entry_point',
 
            'extract-odf-links = conservancy_beancount.tools.extract_odf_links:entry_point',
 
            'fund-report = conservancy_beancount.reports.fund:entry_point',
 
            'ledger-report = conservancy_beancount.reports.ledger:entry_point',
 
            'opening-balances = conservancy_beancount.tools.opening_balances:entry_point',
 
            'pdfform-extract = conservancy_beancount.pdfforms.extract:entry_point',
 
            'pdfform-extract-irs990scheduleA = conservancy_beancount.pdfforms.extract.irs990scheduleA:entry_point',
 
            'pdfform-fill = conservancy_beancount.pdfforms.fill:entry_point',
 
            'query-report = conservancy_beancount.reports.query:entry_point',
 
            'reconcile-paypal = conservancy_beancount.reconcile.paypal:entry_point',
 
            'reconcile-statement = conservancy_beancount.reconcile.statement:entry_point',
 
            'split-ods-links = conservancy_beancount.tools.split_ods_links:entry_point',
 
        ],
 
    },
 
)
tests/test_rtutil.py
Show inline comments
 
"""Test RT integration"""
 
# Copyright © 2020  Brett Smith
 
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import contextlib
 
import datetime
 
import itertools
 
import logging
 
import re
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import rtutil
 

	
 
DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]
 

	
 
EXPECTED_URLS = [
 
    (1, None, 'Ticket/Display.html?id=1'),
 
    (1, 2, 'Ticket/Display.html?id=1#txn-1'),
 
    (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
 
    (1, 99, None),
 
    (2, 1, None),
 
    (2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'),
 
    (2, 13, 'Ticket/Display.html?id=2#txn-11'),
 
    (2, 14, 'Ticket/Display.html?id=2#txn-11'),  # statement.txt
 
    (3, None, 'Ticket/Display.html?id=3'),
 
    (9, None, None),
 
]
 

	
 
EXPECTED_URLS_MAP = {
 
    (ticket_id, attachment_id): url
 
    for ticket_id, attachment_id, url in EXPECTED_URLS
 
}
 

	
 
@pytest.fixture(scope='module')
 
def rt():
 
    client = testutil.RTClient()
 
    return rtutil.RT(client)
 

	
 
@pytest.fixture
 
def new_client():
 
    class RTClient(testutil.RTClient):
 
        TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
 
    return RTClient()
 

	
 
@contextlib.contextmanager
 
def nullcontext(thing):
 
    yield thing
 

	
 
def new_cache(database=':memory:'):
 
    db = rtutil.RTLinkCache.setup(database)
 
    if db is None:
 
        print("NOTE: did not set up database cache at {}".format(database))
 
        return nullcontext(db)
 
    else:
 
        return contextlib.closing(db)
 

	
 
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
 
def test_url(rt, ticket_id, attachment_id, expected):
 
    if expected is not None:
 
        expected = DEFAULT_RT_URL + expected
 
    assert rt.url(ticket_id, attachment_id) == expected
 

	
 
@pytest.mark.parametrize('attachment_id,first_link_only', itertools.product(
 
    [245, None],
 
    [True, False],
 
))
 
def test_metadata_regexp(rt, attachment_id, first_link_only):
 
    if attachment_id is None:
 
        match_links = ['rt:220', 'rt://ticket/220']
 
    else:
 
        match_links = [f'rt:220/{attachment_id}',
 
                       f'rt://ticket/220/attachments/{attachment_id}']
 
    regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only)
 
    for link in match_links:
 
        assert re.search(regexp, link)
 
        assert re.search(regexp, link + ' link2')
 
        assert re.search(regexp, link + '0') is None
 
        assert re.search(regexp, 'a' + link) is None
 
        end_match = re.search(regexp, 'link0 ' + link)
 
        if first_link_only:
 
            assert end_match is None
 
        else:
 
            assert end_match
 

	
 
@pytest.mark.parametrize('attachment_id', [
 
    13,
 
    None,
 
])
 
def test_url_caches(new_client, attachment_id):
 
    if attachment_id is None:
 
        fragment = ''
 
    else:
 
        fragment = '#txn-11'
 
    expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment)
 
    rt = rtutil.RT(new_client)
 
    assert rt.url(2, attachment_id) == expected
 
    new_client.TICKET_DATA.clear()
 
    assert rt.url(2, attachment_id) == expected
 

	
...
 
@@ -194,96 +195,127 @@ def test_uncommon_server_url_parsing():
 
    rt = rtutil.RT(client)
 
    assert rt.url(1).startswith(url)
 

	
 
def test_shared_cache(new_client):
 
    ticket_id, _, expected = EXPECTED_URLS[0]
 
    expected = DEFAULT_RT_URL + expected
 
    with new_cache() as cachedb:
 
        rt1 = rtutil.RT(new_client, cachedb)
 
        assert rt1.url(ticket_id) == expected
 
        new_client.TICKET_DATA.clear()
 
        rt2 = rtutil.RT(new_client, cachedb)
 
        assert rt2.url(ticket_id) == expected
 
        assert not rt2.exists(ticket_id + 1)
 
        assert rt1 is not rt2
 

	
 
def test_no_shared_cache(new_client):
 
    with new_cache() as cache1, new_cache() as cache2:
 
        rt1 = rtutil.RT(new_client, cache1)
 
        rt2 = rtutil.RT(new_client, cache2)
 
        assert rt1.exists(1)
 
        new_client.TICKET_DATA.clear()
 
        assert not rt2.exists(1)
 
        assert rt1.exists(1)
 

	
 
def test_read_only_cache(new_client, tmp_path, caplog):
 
    caplog.set_level(logging.DEBUG, logger='conservancy_beancount.rtutil')
 
    db_path = tmp_path / 'test.db'
 
    ticket_id, _, expected = EXPECTED_URLS[0]
 
    expected = DEFAULT_RT_URL + expected
 
    with new_cache(db_path) as cache1:
 
        rt1 = rtutil.RT(new_client, cache1)
 
        assert rt1.url(ticket_id) == expected
 
    new_client.TICKET_DATA.clear()
 
    db_path.chmod(0o400)
 
    with new_cache(db_path) as cache2:
 
        rt2 = rtutil.RT(new_client, cache2)
 
        assert rt2.url(ticket_id) == expected
 
        assert rt2.url(ticket_id + 1) is None
 

	
 
def test_results_not_found_only_in_transient_cache(new_client):
 
    with new_cache() as cache:
 
        rt1 = rtutil.RT(new_client, cache)
 
        rt2 = rtutil.RT(new_client, cache)
 
        assert not rt1.exists(9)
 
        new_client.TICKET_DATA['9'] = [('99', '(Unnamed)', 'text/plain', '0b')]
 
        assert not rt1.exists(9)
 
        assert rt2.exists(9)
 

	
 
def test_txn_with_urls(rt):
 
    txn_meta = {
 
        'rt-id': 'rt:1',
 
        'contract': 'RepoLink.pdf',
 
        'statement': 'doc1.txt rt:1/4 doc2.txt',
 
    }
 
    txn = testutil.Transaction(**txn_meta, postings=[
 
        ('Income:Donations', -10, {'receipt': 'rt:2/13 donation.txt'}),
 
        ('Assets:Cash', 10, {'receipt': 'cash.png rt:2/14'}),
 
    ])
 
    actual = rt.txn_with_urls(txn)
 
    def check(source, key, ticket_id, attachment_id=None):
 
        url_path = EXPECTED_URLS_MAP[(ticket_id, attachment_id)]
 
        assert f'<{DEFAULT_RT_URL}{url_path}>' in source.meta[key]
 
    expected_keys = set(txn_meta)
 
    expected_keys.update(['filename', 'lineno'])
 
    assert set(actual.meta) == expected_keys
 
    check(actual, 'rt-id', 1)
 
    assert actual.meta['contract'] == txn_meta['contract']
 
    assert actual.meta['statement'].startswith('doc1.txt ')
 
    check(actual, 'statement', 1, 4)
 
    check(actual.postings[0], 'receipt', 2, 13)
 
    assert actual.postings[0].meta['receipt'].endswith(' donation.txt')
 
    check(actual.postings[1], 'receipt', 2, 14)
 
    assert actual.postings[1].meta['receipt'].startswith('cash.png ')
 
    # Check the original transaction is unchanged
 
    for key, expected in txn_meta.items():
 
        assert txn.meta[key] == expected
 
    assert txn.postings[0].meta['receipt'] == 'rt:2/13 donation.txt'
 
    assert txn.postings[1].meta['receipt'] == 'cash.png rt:2/14'
 

	
 
def test_txn_with_urls_with_fmts(rt):
 
    txn_meta = {
 
        'rt-id': 'rt:1',
 
        'contract': 'RepoLink.pdf',
 
        'statement': 'rt:1/99 rt:1/4 stmt.txt',
 
    }
 
    txn = testutil.Transaction(**txn_meta)
 
    actual = rt.txn_with_urls(txn, '<{}>', '[{}]', '({})')
 
    rt_id_path = EXPECTED_URLS_MAP[(1, None)]
 
    assert actual.meta['rt-id'] == f'<{DEFAULT_RT_URL}{rt_id_path}>'
 
    assert actual.meta['contract'] == '[RepoLink.pdf]'
 
    statement_path = EXPECTED_URLS_MAP[(1, 4)]
 
    assert actual.meta['statement'] == ' '.join([
 
        '(rt:1/99)',
 
        f'<{DEFAULT_RT_URL}{statement_path}>',
 
        '[stmt.txt]',
 
    ])
 

	
 
@pytest.mark.parametrize('arg,exp_num,exp_offset', [
 
    # These correspond to the different datetime formats available through
 
    # RT's user settings.
 
    ('Mon Mar 1 01:01:01 2021', 1, None),
 
    ('2021-03-02 02:02:02', 2, None),
 
    ('2021-03-03T03:03:03-0500', 3, -18000),
 
    ('Thu, 4 Mar 2021 04:04:04 -0600', 4, -21600),
 
    ('Fri, 5 Mar 2021 05:05:05 GMT', 5, 0),
 
    ('20210306T060606Z', 6, 0),
 
    ('Sun, Mar 7, 2021 07:07:07 AM', 7, None),
 
    ('Sun, Mar 14, 2021 02:14:14 PM', 14, None),
 
])
 
def test_rt_datetime(arg, exp_num, exp_offset):
 
    actual = rtutil.RTDateTime(arg)
 
    assert actual.year == 2021
 
    assert actual.month == 3
 
    assert actual.day == exp_num
 
    assert actual.hour == exp_num
 
    assert actual.minute == exp_num
 
    assert actual.second == exp_num
 
    if exp_offset is None:
 
        assert actual.tzinfo is None
 
    else:
 
        assert actual.tzinfo.utcoffset(None).total_seconds() == exp_offset
 

	
 
@pytest.mark.parametrize('arg', ['Not set', '', None])
 
def test_rt_datetime_empty(arg):
 
    actual = rtutil.RTDateTime(arg)
 
    assert actual == datetime.datetime.min
 
    assert actual.tzinfo is None
0 comments (0 inline, 0 general)