Changeset - 7335282e5a64
[Not reviewed]
0 3 0
Brett Smith - 3 years ago 2021-03-11 18:52:31
brettcsmith@brettcsmith.org
rtutil: Add RTDateTime class.

See comments for rationale.
3 files changed with 58 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/rtutil.py
Show inline comments
 
"""RT client utilities"""
 
# Copyright © 2020  Brett Smith
 
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import datetime
 
import functools
 
import logging
 
import mimetypes
 
import os
 
import re
 
import sqlite3
 
import urllib.parse as urlparse
 

	
 
import dateutil.parser
 
import rt
 

	
 
from pathlib import Path
 

	
 
from . import data
 
from beancount.core import data as bc_data
 

	
 
from typing import (
 
    cast,
 
    overload,
 
    Callable,
 
    Iterable,
 
    Iterator,
 
    MutableMapping,
 
    Optional,
 
    Set,
 
    Tuple,
 
    Union,
 
)
 
from .beancount_types import (
 
    Transaction,
 
)
 

	
 
RTId = Union[int, str]
 
TicketAttachmentIds = Tuple[str, Optional[str]]
 
_LinkCache = MutableMapping[TicketAttachmentIds, Optional[str]]
 
_URLLookup = Callable[..., Optional[str]]
 

	
 
class RTDateTime(datetime.datetime):
 
    """Construct datetime objects from strings returned by RT
 

	
 
    Typical usage looks like::
 

	
 
        ticket = rt_client.get_ticket(...)
 
        created = RTDateTime(ticket.get('Created'))
 
    """
 
    # Normally I'd just write a function to do this, but having a dedicated
 
    # class helps support query-report: the class can pull double duty to both
 
    # parse the data from RT, and determine proper output formatting.
 
    # The RT REST API returns datetimes in the user's configured timezone, and
 
    # there doesn't seem to be any API call that tells you what that is. You
 
    # have to live with the object being timezone-naive.
 
    def __new__(cls, source: str) -> 'RTDateTime':
 
        if not source or source == 'Not set':
 
            retval = datetime.datetime.min
 
        else:
 
            retval = dateutil.parser.parse(source)
 
        return cast(RTDateTime, retval)
 

	
 

	
 
class RTLinkCache(_LinkCache):
 
    """Cache RT links to disk
 

	
 
    This class provides a dict-like interface to a cache of RT links.
 
    Once an object is in RT, a link to it should never change.
 
    The only exception is when objects get shredded, and those objects
 
    shouldn't be referenced in books anyway.
 

	
 
    This implementation is backed by a sqlite database. You can call::
 

	
 
        db = RTLinkCache.setup(path)
 

	
 
    This method will try to open a sqlite database at the given path,
 
    and set up necessary tables, etc.
 
    If it succeeds, it returns a database connection you can use to
 
    initialize the cache.
 
    If it fails, it returns None, and the caller should use some other
 
    dict-like object (like a normal dict) for caching.
 
    You can give the result to the RT utility class either way,
 
    and it will do the right thing for itself::
 

	
 
        rt = RT(rt_client, db)
 
    """
 

	
 
    CREATE_TABLE_SQL = """CREATE TABLE IF NOT EXISTS RTLinkCache(
 
 ticket_id TEXT NOT NULL,
 
 attachment_id TEXT,
 
 url TEXT NOT NULL,
 
 PRIMARY KEY (ticket_id, attachment_id)
 
)"""
 
    logger = logging.getLogger('conservancy_beancount.rtutil.RTLinkCache')
 

	
 
    @classmethod
 
    def setup(cls, cache_path: Path) -> Optional[sqlite3.Connection]:
 
        try:
 
            db = sqlite3.connect(os.fspath(cache_path), isolation_level=None)
 
            cursor = db.cursor()
 
            cursor.execute(cls.CREATE_TABLE_SQL)
 
            cursor.execute('SELECT url FROM RTLinkCache LIMIT 1')
 
            have_data = cursor.fetchone() is not None
 
        except sqlite3.OperationalError:
 
            # If we couldn't get this far, sqlite provides no benefit.
 
            cls.logger.debug("setup: error loading %s", cache_path, exc_info=True)
 
            return None
 
        try:
 
            # There shouldn't be any records where url is NULL, so running this
 
            # DELETE pulls double duty for us: it tells us whether or not we
 
            # can write to the database and it enforces database integrity.
 
            cursor.execute('DELETE FROM RTLinkCache WHERE url IS NULL')
 
        except sqlite3.OperationalError:
 
            cls.logger.debug("setup: error writing %s", cache_path, exc_info=True)
 
            can_write = False
 
        else:
 
            can_write = True
 
        if not (can_write or have_data):
 
            # If there's nothing to read and no way to write, sqlite provides
 
            # no benefit.
 
            cls.logger.debug("setup: not using %s: nothing to read or write", cache_path)
 
            return None
 
        elif not can_write:
 
            # Set up an in-memory database that we can write to, seeded with
 
            # the data available to read.
 
            try:
 
                cursor.close()
 
                db.close()
 
                db = sqlite3.connect(':memory:', isolation_level=None)
 
                cursor = db.cursor()
 
                # It would better to use
 
                #   '{}?mode=ro'.format(cache_path.as_uri())
 
                # as the argument here, but that doesn't work on SUSE 15,
 
                # possibly because its sqlite doesn't recognize query
 
                # arguments (added to upstream sqlite in late 2016).
 
                cursor.execute('ATTACH DATABASE ? AS readsource',
 
                               (os.fspath(cache_path),))
 
                cursor.execute(cls.CREATE_TABLE_SQL)
 
                cursor.execute('INSERT INTO RTLinkCache SELECT * FROM readsource.RTLinkCache')
 
                cursor.execute('DETACH DATABASE readsource')
 
            except sqlite3.OperationalError as error:
 
                # We're back to the case of having nothing to read and no way
 
                # to write.
 
                cls.logger.debug("setup: error loading %s into memory", cache_path, exc_info=True)
 
                return None
 
            else:
 
                cls.logger.debug("setup: loaded %s into memory", cache_path)
 
        else:
 
            cls.logger.debug("setup: caching at %s", cache_path)
 
        cursor.close()
 
        db.commit()
 
        return db
 

	
 
    def __init__(self, cache_db: sqlite3.Connection) -> None:
 
        self._db = cache_db
 
        self._nourls: Set[TicketAttachmentIds] = set()
 

	
 
    def __iter__(self) -> Iterator[TicketAttachmentIds]:
 
        yield from self._db.execute('SELECT ticket_id, attachment_id FROM RTLinkCache')
 
        yield from self._nourls
 

	
 
    def __len__(self) -> int:
 
        cursor = self._db.execute('SELECT COUNT(*) FROM RTLinkCache')
 
        count: int = cursor.fetchone()[0]
 
        return count + len(self._nourls)
 

	
 
    def __getitem__(self, key: TicketAttachmentIds) -> Optional[str]:
 
        if key in self._nourls:
 
            return None
 
        cursor = self._db.execute(
 
            'SELECT url FROM RTLinkCache WHERE ticket_id = ? AND attachment_id IS ?',
 
            key,
 
        )
 
        row = cursor.fetchone()
 
        if row is None:
 
            raise KeyError(key)
 
        else:
 
            retval: str = row[0]
 
            return retval
 

	
 
    def __setitem__(self, key: TicketAttachmentIds, value: Optional[str]) -> None:
 
        if value is None:
 
            self._nourls.add(key)
 
        else:
 
            ticket_id, attachment_id = key
 
            try:
 
                self._db.execute(
 
                    'INSERT INTO RTLinkCache VALUES(?, ?, ?)',
 
                    (ticket_id, attachment_id, value),
 
                )
 
            except sqlite3.IntegrityError:
 
                # Another instance might've inserted the URL before we did.
 
                # That's fine as long as it's the same URL, which it should be.
 
                if value != self.get(key):
 
                    raise
 

	
 
    def __delitem__(self, key: TicketAttachmentIds) -> None:
 
        raise NotImplementedError("RTLinkCache.__delitem__")
 

	
 

	
 
class RT:
 
    """RT utility wrapper class
 

	
 
    Given an RT client object, this class provides common functionality for
 
    working with RT links in Beancount metadata:
 

	
 
    * Parse links
 
    * Verify that they refer to extant objects in RT
 
    * Convert metadata links to RT web links
 
    * Cache results, to reduce network requests.
 
      You can set up an RTLinkCache to cache links to disks over multiple runs.
 
      Refer to RTLinkCache's docstring for details and instructions.
 
    """
 

	
 
    PARSE_REGEXPS = [
 
        re.compile(r'^rt:([0-9]+)(?:/([0-9]+))?/?$'),
 
        re.compile(r'^rt://ticket/([0-9]+)(?:/attachments?/([0-9]+))?/?$'),
 
    ]
 

	
 
    def __init__(self, rt_client: rt.Rt, cache_db: Optional[sqlite3.Connection]=None) -> None:
 
        urlparts = urlparse.urlparse(rt_client.url)
 
        try:
 
            index = urlparts.path.rindex('/REST/')
 
        except ValueError:
 
            base_path = urlparts.path.rstrip('/') + '/'
 
        else:
 
            base_path = urlparts.path[:index + 1]
 
        self.url_base = urlparts._replace(path=base_path)
 
        self.rt = rt_client
 
        self._cache: _LinkCache
 
        if cache_db is None:
 
            self._cache = {}
 
        else:
 
            self._cache = RTLinkCache(cache_db)
 

	
 
    # mypy complains that the first argument isn't self, but this isn't meant
 
    # to be a method, it's just an internal decrator.
 
    def _cache_method(func: _URLLookup) -> _URLLookup:  # type:ignore[misc]
 
        @functools.wraps(func)
 
        def caching_wrapper(self: 'RT',
 
                            ticket_id: RTId,
 
                            attachment_id: Optional[RTId]=None,
 
        ) -> Optional[str]:
 
            cache_key = (str(ticket_id),
 
                         None if attachment_id is None else str(attachment_id))
 
            url: Optional[str]
 
            try:
 
                url = self._cache[cache_key]
 
            except KeyError:
 
                if attachment_id is None:
 
                    url = func(self, ticket_id)
 
                else:
 
                    url = func(self, ticket_id, attachment_id)
 
                self._cache[cache_key] = url
 
            return url
 
        return caching_wrapper
 

	
 
    def _extend_url(self,
 
                    path_tail: str,
 
                    fragment: Optional[str]=None,
 
                    **query: str,
 
    ) -> str:
 
        if fragment is None:
 
            fragment = self.url_base.fragment
 
        else:
 
            fragment = urlparse.quote(fragment)
 
        if query:
 
            query_s = urlparse.urlencode(query)
 
        else:
 
            query_s = self.url_base.query
 
        urlparts = self.url_base._replace(
 
            path=self.url_base.path + urlparse.quote(path_tail),
 
            query=query_s,
 
            fragment=fragment,
 
        )
 
        return urlparse.urlunparse(urlparts)
 

	
 
    def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
 
        if txn_id is None:
 
            fragment = None
 
        else:
 
            fragment = 'txn-{}'.format(txn_id)
 
        return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
 

	
 
    @_cache_method
 
    def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
 
        attachment = self.rt.get_attachment(ticket_id, attachment_id)
 
        if attachment is None:
 
            return None
 
        mimetype = attachment.get('ContentType', '')
 
        if mimetype.startswith('text/'):
 
            return self._ticket_url(ticket_id, attachment['Transaction'])
 
        else:
 
            filename = attachment.get('Filename', '')
 
            if not filename:
 
                filename = 'RT{} attachment {}{}'.format(
 
                    ticket_id,
 
                    attachment_id,
 
                    mimetypes.guess_extension(mimetype) or '.bin',
 
                )
 
            path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
 
                attachment,
 
                filename,
 
            )
 
            return self._extend_url(path_tail)
 

	
 
    def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
 
        return self.url(ticket_id, attachment_id) is not None
 

	
 
    def iter_urls(self,
 
                  links: Iterable[str],
 
                  rt_fmt: str='{}',
 
                  nonrt_fmt: str='{}',
 
                  missing_fmt: str='{}',
 
    ) -> Iterator[str]:
 
        """Iterate over metadata links, replacing RT references with web URLs
 

	
 
        This method iterates over metadata link strings (e.g., from
 
        Metadata.get_links()) and transforms them for web presentation.
 

	
 
        If the string is a valid RT reference, the corresponding web URL
 
        will be formatted with ``rt_fmt``.
 

	
 
        If the string is a well-formed RT reference but the object doesn't
 
        exist, it will be formatted with ``missing_fmt``.
 

	
 
        All other link strings will be formatted with ``nonrt_fmt``.
 

	
 
        """
 
        for link in links:
 
            parsed = self.parse(link)
 
            if parsed is None:
 
                yield nonrt_fmt.format(link)
 
            else:
 
                ticket_id, attachment_id = parsed
 
                url = self.url(ticket_id, attachment_id)
 
                if url is None:
 
                    yield missing_fmt.format(link)
 
                else:
 
                    yield rt_fmt.format(url)
 

	
 
    @classmethod
 
    def metadata_regexp(self,
 
                        ticket_id: RTId,
 
                        attachment_id: Optional[RTId]=None,
 
                        *,
 
                        first_link_only: bool=False
 
    ) -> str:
 
        """Return a pattern to find RT links in metadata
 

	
 
        Given a ticket ID and optional attachment ID, this method returns a
 
        regular expression pattern that will find matching RT links in a
 
        metadata value string, written in any format.
 

	
 
        If the keyword-only argument first_link_only is true, the pattern will
 
        only match the first link in a metadata string. Otherwise the pattern
 
        matches any link in the string (the default).
 
        """
 
        if first_link_only:
 
            prolog = r'^\s*'
 
        else:
 
            prolog = r'(?:^|\s)'
 
        if attachment_id is None:
 
            attachment = ''
 
        else:
 
            attachment = r'/(?:attachments?/)?{}'.format(attachment_id)
 
        ticket = r'rt:(?://ticket/)?{}'.format(ticket_id)
 
        epilog = r'/?(?:$|\s)'
 
        return f'{prolog}{ticket}{attachment}{epilog}'
 

	
 
    @classmethod
 
    def parse(cls, s: str) -> Optional[Tuple[str, Optional[str]]]:
 
        for regexp in cls.PARSE_REGEXPS:
 
            match = regexp.match(s)
 
            if match is not None:
 
                ticket_id, attachment_id = match.groups()
 
                return (ticket_id, attachment_id)
 
        return None
 

	
 
    @_cache_method
 
    def ticket_url(self, ticket_id: RTId) -> Optional[str]:
 
        if self.rt.get_ticket(ticket_id) is None:
 
            return None
 
        return self._ticket_url(ticket_id)
 

	
 
    @overload
 
    def _meta_with_urls(self,
 
                        meta: None,
 
                        rt_fmt: str,
 
                        nonrt_fmt: str,
 
                        missing_fmt: str,
 
    ) -> None: ...
 

	
 
    @overload
 
    def _meta_with_urls(self,
 
                        meta: bc_data.Meta,
 
                        rt_fmt: str,
 
                        nonrt_fmt: str,
 
                        missing_fmt: str,
 
    ) -> bc_data.Meta: ...
 

	
 
    def _meta_with_urls(self,
 
                        meta: Optional[bc_data.Meta],
 
                        rt_fmt: str,
 
                        nonrt_fmt: str,
 
                        missing_fmt: str,
 
    ) -> Optional[bc_data.Meta]:
 
        if meta is None:
 
            return None
 
        link_meta = data.Metadata(meta)
 
        retval = meta.copy()
 
        for key in data.LINK_METADATA:
 
            try:
 
                links = link_meta.get_links(key)
 
            except TypeError:
 
                links = ()
 
            if links:
 
                retval[key] = ' '.join(self.iter_urls(
 
                    links, rt_fmt, nonrt_fmt, missing_fmt,
 
                ))
 
        return retval
 

	
 
    def txn_with_urls(self, txn: Transaction,
 
                      rt_fmt: str='<{}>',
 
                      nonrt_fmt: str='{}',
 
                      missing_fmt: str='{}',
 
    ) -> Transaction:
 
        """Copy a transaction with RT references replaced with web URLs
 

	
 
        Given a Beancount Transaction, this method returns a Transaction
 
        that's identical, except any references to RT in the metadata for
 
        the Transaction and its Postings are replaced with web URLs.
 
        This is useful for reporting tools that want to format the
 
        transaction with URLs that are recognizable by other tools.
 

	
 
        The format string arguments have the same meaning as RT.iter_urls().
 
        See that docstring for details.
 
        """
 
        # mypy doesn't recognize that postings is a valid argument, probably a
 
        # bug in the NamedTuple→Directive→Transaction hierarchy.
 
        return txn._replace(  # type:ignore[call-arg]
 
            meta=self._meta_with_urls(txn.meta, rt_fmt, nonrt_fmt, missing_fmt),
 
            postings=[post._replace(meta=self._meta_with_urls(
 
                post.meta, rt_fmt, nonrt_fmt, missing_fmt,
 
            )) for post in txn.postings],
 
        )
 

	
setup.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.19.0',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'babel>=2.6',  # Debian:python3-babel
 
        'beancount>=2.2',  # Debian:beancount
 
        'GitPython>=2.0',  # Debian:python3-git
 
        # 1.4.1 crashes when trying to save some documents.
 
        'odfpy>=1.4.0,!=1.4.1',  # Debian:python3-odf
 
        'pdfminer.six>=20200101',
 
        'python-dateutil>=2.7',  # Debian:python3-dateutil
 
        'PyYAML>=3.0',  # Debian:python3-yaml
 
        'regex',  # Debian:python3-regex
 
        'rt>=2.0',
 
    ],
 
    setup_requires=[
 
        'pytest-mypy',
 
        'pytest-runner',  # Debian:python3-pytest-runner
 
    ],
 
    tests_require=[
 
        'mypy>=0.770',  # Debian:python3-mypy
 
        'pytest',  # Debian:python3-pytest
 
    ],
 

	
 
    packages=[
 
        'conservancy_beancount',
 
        'conservancy_beancount.pdfforms',
 
        'conservancy_beancount.pdfforms.extract',
 
        'conservancy_beancount.plugin',
 
        'conservancy_beancount.reconcile',
 
        'conservancy_beancount.reports',
 
        'conservancy_beancount.tools',
 
    ],
 
    entry_points={
 
        'console_scripts': [
 
            'accrual-report = conservancy_beancount.reports.accrual:entry_point',
 
            'assemble-audit-reports = conservancy_beancount.tools.audit_report:entry_point',
 
            'balance-sheet-report = conservancy_beancount.reports.balance_sheet:entry_point',
 
            'budget-report = conservancy_beancount.reports.budget:entry_point',
 
            'bean-sort = conservancy_beancount.tools.sort_entries:entry_point',
 
            'extract-odf-links = conservancy_beancount.tools.extract_odf_links:entry_point',
 
            'fund-report = conservancy_beancount.reports.fund:entry_point',
 
            'ledger-report = conservancy_beancount.reports.ledger:entry_point',
 
            'opening-balances = conservancy_beancount.tools.opening_balances:entry_point',
 
            'pdfform-extract = conservancy_beancount.pdfforms.extract:entry_point',
 
            'pdfform-extract-irs990scheduleA = conservancy_beancount.pdfforms.extract.irs990scheduleA:entry_point',
 
            'pdfform-fill = conservancy_beancount.pdfforms.fill:entry_point',
 
            'query-report = conservancy_beancount.reports.query:entry_point',
 
            'reconcile-paypal = conservancy_beancount.reconcile.paypal:entry_point',
 
            'reconcile-statement = conservancy_beancount.reconcile.statement:entry_point',
 
            'split-ods-links = conservancy_beancount.tools.split_ods_links:entry_point',
 
        ],
 
    },
 
)
tests/test_rtutil.py
Show inline comments
 
"""Test RT integration"""
 
# Copyright © 2020  Brett Smith
 
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import contextlib
 
import datetime
 
import itertools
 
import logging
 
import re
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import rtutil
 

	
 
DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]
 

	
 
EXPECTED_URLS = [
 
    (1, None, 'Ticket/Display.html?id=1'),
 
    (1, 2, 'Ticket/Display.html?id=1#txn-1'),
 
    (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
 
    (1, 99, None),
 
    (2, 1, None),
 
    (2, 10, 'Ticket/Attachment/7/10/Company_invoice-2020030405_as-sent.pdf'),
 
    (2, 13, 'Ticket/Display.html?id=2#txn-11'),
 
    (2, 14, 'Ticket/Display.html?id=2#txn-11'),  # statement.txt
 
    (3, None, 'Ticket/Display.html?id=3'),
 
    (9, None, None),
 
]
 

	
 
EXPECTED_URLS_MAP = {
 
    (ticket_id, attachment_id): url
 
    for ticket_id, attachment_id, url in EXPECTED_URLS
 
}
 

	
 
@pytest.fixture(scope='module')
 
def rt():
 
    client = testutil.RTClient()
 
    return rtutil.RT(client)
 

	
 
@pytest.fixture
 
def new_client():
 
    class RTClient(testutil.RTClient):
 
        TICKET_DATA = testutil.RTClient.TICKET_DATA.copy()
 
    return RTClient()
 

	
 
@contextlib.contextmanager
 
def nullcontext(thing):
 
    yield thing
 

	
 
def new_cache(database=':memory:'):
 
    db = rtutil.RTLinkCache.setup(database)
 
    if db is None:
 
        print("NOTE: did not set up database cache at {}".format(database))
 
        return nullcontext(db)
 
    else:
 
        return contextlib.closing(db)
 

	
 
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
 
def test_url(rt, ticket_id, attachment_id, expected):
 
    if expected is not None:
 
        expected = DEFAULT_RT_URL + expected
 
    assert rt.url(ticket_id, attachment_id) == expected
 

	
 
@pytest.mark.parametrize('attachment_id,first_link_only', itertools.product(
 
    [245, None],
 
    [True, False],
 
))
 
def test_metadata_regexp(rt, attachment_id, first_link_only):
 
    if attachment_id is None:
 
        match_links = ['rt:220', 'rt://ticket/220']
 
    else:
 
        match_links = [f'rt:220/{attachment_id}',
 
                       f'rt://ticket/220/attachments/{attachment_id}']
 
    regexp = rt.metadata_regexp(220, attachment_id, first_link_only=first_link_only)
 
    for link in match_links:
 
        assert re.search(regexp, link)
 
        assert re.search(regexp, link + ' link2')
 
        assert re.search(regexp, link + '0') is None
 
        assert re.search(regexp, 'a' + link) is None
 
        end_match = re.search(regexp, 'link0 ' + link)
 
        if first_link_only:
 
            assert end_match is None
 
        else:
 
            assert end_match
 

	
 
@pytest.mark.parametrize('attachment_id', [
 
    13,
 
    None,
 
])
 
def test_url_caches(new_client, attachment_id):
 
    if attachment_id is None:
 
        fragment = ''
 
    else:
 
        fragment = '#txn-11'
 
    expected = '{}Ticket/Display.html?id=2{}'.format(DEFAULT_RT_URL, fragment)
 
    rt = rtutil.RT(new_client)
 
    assert rt.url(2, attachment_id) == expected
 
    new_client.TICKET_DATA.clear()
 
    assert rt.url(2, attachment_id) == expected
 

	
 
@pytest.mark.parametrize('mimetype,extension', [
 
    ('application/pdf', 'pdf'),
 
    ('image/png', 'png'),
 
    ('message/rfc822', 'eml'),
 
    ('x-test/x-unknown', 'bin'),
 
])
 
def test_url_default_filename(new_client, mimetype, extension):
 
    new_client.TICKET_DATA['1'] = [('9', '(Unnamed)', mimetype, '50.5k')]
 
    rt = rtutil.RT(new_client)
 
    expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
 
    assert rt.url(1, 9) == expected
 

	
 
@pytest.mark.parametrize('rt_fmt,nonrt_fmt,missing_fmt', [
 
    ('{}', '{}', '{}',),
 
    ('<{}>', '[{}]', '({})'),
 
])
 
def test_iter_urls(rt, rt_fmt, nonrt_fmt, missing_fmt):
 
    expected_map = {
 
        'rt:{}{}'.format(tid, '' if aid is None else f'/{aid}'): url
 
        for tid, aid, url in EXPECTED_URLS
 
    }
 
    expected_map['https://example.com'] = None
 
    expected_map['invoice.pdf'] = None
 
    keys = list(expected_map)
 
    urls = rt.iter_urls(keys, rt_fmt, nonrt_fmt, missing_fmt)
 
    for key, actual in itertools.zip_longest(keys, urls):
 
        expected = expected_map[key]
 
        if expected is None:
 
            if key.startswith('rt:'):
 
                expected = missing_fmt.format(key)
 
            else:
 
                expected = nonrt_fmt.format(key)
 
        else:
 
            expected = rt_fmt.format(DEFAULT_RT_URL + expected)
 
        assert actual == expected
 

	
 
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
 
def test_exists(rt, ticket_id, attachment_id, expected):
 
    expected = False if expected is None else True
 
    assert rt.exists(ticket_id, attachment_id) is expected
 

	
 
def test_exists_caches(new_client):
 
    rt = rtutil.RT(new_client)
 
    assert rt.exists(1, 3)
 
    assert rt.exists(2)
 
    assert not rt.exists(1, 99)
 
    assert not rt.exists(9)
 
    new_client.TICKET_DATA.clear()
 
    assert rt.exists(1, 3)
 
    assert rt.exists(2)
 
    assert not rt.exists(1, 99)
 
    assert not rt.exists(9)
 

	
 
@pytest.mark.parametrize('link,expected', [
 
    ('rt:1/2', ('1', '2')),
 
    ('rt:123/456', ('123', '456')),
 
    ('rt:12345', ('12345', None)),
 
    ('rt:12346/', ('12346', None)),
 
    ('rt:12346/789', ('12346', '789')),
 
    ('rt:12346/780/', ('12346', '780')),
 
    ('rt://ticket/1', ('1', None)),
 
    ('rt://ticket/1/', ('1', None)),
 
    ('rt://ticket/1234/attachments/5678', ('1234', '5678')),
 
    ('rt://ticket/1234/attachments/5678/', ('1234', '5678')),
 
    ('rt://ticket/1234/attachment/5678', ('1234', '5678')),
 
    ('rt://ticket/1234/attachment/5678/', ('1234', '5678')),
 
    ('rt:', None),
 
    ('rt://', None),
 
    ('rt:example.org', None),
 
    ('rt:example.org/1', None),
 
    ('rt://example.org', None),
 
    ('rt://example.org/1', None),
 
    ('https://example.org/rt/Ticket/Display.html?id=123', None),
 
])
 
def test_parse(rt, link, expected):
 
    assert rt.parse(link) == expected
 

	
 
@pytest.mark.parametrize('ticket_id,attachment_id,expected', [
 
    ('12', None, 'rt:12'),
 
    (34, None, 'rt:34'),
 
    ('56', '78', 'rt:56/78'),
 
    (90, 880, 'rt:90/880'),
 
])
 
def test_unparse(rt, ticket_id, attachment_id, expected):
 
    assert rt.unparse(ticket_id, attachment_id) == expected
 

	
 
def test_uncommon_server_url_parsing():
 
    url = 'https://example.org/REST/1.0/'
 
    client = testutil.RTClient(url + 'REST/1.0/')
 
    rt = rtutil.RT(client)
 
    assert rt.url(1).startswith(url)
 

	
 
def test_shared_cache(new_client):
 
    ticket_id, _, expected = EXPECTED_URLS[0]
 
    expected = DEFAULT_RT_URL + expected
 
    with new_cache() as cachedb:
 
        rt1 = rtutil.RT(new_client, cachedb)
 
        assert rt1.url(ticket_id) == expected
 
        new_client.TICKET_DATA.clear()
 
        rt2 = rtutil.RT(new_client, cachedb)
 
        assert rt2.url(ticket_id) == expected
 
        assert not rt2.exists(ticket_id + 1)
 
        assert rt1 is not rt2
 

	
 
def test_no_shared_cache(new_client):
 
    with new_cache() as cache1, new_cache() as cache2:
 
        rt1 = rtutil.RT(new_client, cache1)
 
        rt2 = rtutil.RT(new_client, cache2)
 
        assert rt1.exists(1)
 
        new_client.TICKET_DATA.clear()
 
        assert not rt2.exists(1)
 
        assert rt1.exists(1)
 

	
 
def test_read_only_cache(new_client, tmp_path, caplog):
 
    caplog.set_level(logging.DEBUG, logger='conservancy_beancount.rtutil')
 
    db_path = tmp_path / 'test.db'
 
    ticket_id, _, expected = EXPECTED_URLS[0]
 
    expected = DEFAULT_RT_URL + expected
 
    with new_cache(db_path) as cache1:
 
        rt1 = rtutil.RT(new_client, cache1)
 
        assert rt1.url(ticket_id) == expected
 
    new_client.TICKET_DATA.clear()
 
    db_path.chmod(0o400)
 
    with new_cache(db_path) as cache2:
 
        rt2 = rtutil.RT(new_client, cache2)
 
        assert rt2.url(ticket_id) == expected
 
        assert rt2.url(ticket_id + 1) is None
 

	
 
def test_results_not_found_only_in_transient_cache(new_client):
 
    with new_cache() as cache:
 
        rt1 = rtutil.RT(new_client, cache)
 
        rt2 = rtutil.RT(new_client, cache)
 
        assert not rt1.exists(9)
 
        new_client.TICKET_DATA['9'] = [('99', '(Unnamed)', 'text/plain', '0b')]
 
        assert not rt1.exists(9)
 
        assert rt2.exists(9)
 

	
 
def test_txn_with_urls(rt):
 
    txn_meta = {
 
        'rt-id': 'rt:1',
 
        'contract': 'RepoLink.pdf',
 
        'statement': 'doc1.txt rt:1/4 doc2.txt',
 
    }
 
    txn = testutil.Transaction(**txn_meta, postings=[
 
        ('Income:Donations', -10, {'receipt': 'rt:2/13 donation.txt'}),
 
        ('Assets:Cash', 10, {'receipt': 'cash.png rt:2/14'}),
 
    ])
 
    actual = rt.txn_with_urls(txn)
 
    def check(source, key, ticket_id, attachment_id=None):
 
        url_path = EXPECTED_URLS_MAP[(ticket_id, attachment_id)]
 
        assert f'<{DEFAULT_RT_URL}{url_path}>' in source.meta[key]
 
    expected_keys = set(txn_meta)
 
    expected_keys.update(['filename', 'lineno'])
 
    assert set(actual.meta) == expected_keys
 
    check(actual, 'rt-id', 1)
 
    assert actual.meta['contract'] == txn_meta['contract']
 
    assert actual.meta['statement'].startswith('doc1.txt ')
 
    check(actual, 'statement', 1, 4)
 
    check(actual.postings[0], 'receipt', 2, 13)
 
    assert actual.postings[0].meta['receipt'].endswith(' donation.txt')
 
    check(actual.postings[1], 'receipt', 2, 14)
 
    assert actual.postings[1].meta['receipt'].startswith('cash.png ')
 
    # Check the original transaction is unchanged
 
    for key, expected in txn_meta.items():
 
        assert txn.meta[key] == expected
 
    assert txn.postings[0].meta['receipt'] == 'rt:2/13 donation.txt'
 
    assert txn.postings[1].meta['receipt'] == 'cash.png rt:2/14'
 

	
 
def test_txn_with_urls_with_fmts(rt):
 
    txn_meta = {
 
        'rt-id': 'rt:1',
 
        'contract': 'RepoLink.pdf',
 
        'statement': 'rt:1/99 rt:1/4 stmt.txt',
 
    }
 
    txn = testutil.Transaction(**txn_meta)
 
    actual = rt.txn_with_urls(txn, '<{}>', '[{}]', '({})')
 
    rt_id_path = EXPECTED_URLS_MAP[(1, None)]
 
    assert actual.meta['rt-id'] == f'<{DEFAULT_RT_URL}{rt_id_path}>'
 
    assert actual.meta['contract'] == '[RepoLink.pdf]'
 
    statement_path = EXPECTED_URLS_MAP[(1, 4)]
 
    assert actual.meta['statement'] == ' '.join([
 
        '(rt:1/99)',
 
        f'<{DEFAULT_RT_URL}{statement_path}>',
 
        '[stmt.txt]',
 
    ])
 

	
 
@pytest.mark.parametrize('arg,exp_num,exp_offset', [
 
    # These correspond to the different datetime formats available through
 
    # RT's user settings.
 
    ('Mon Mar 1 01:01:01 2021', 1, None),
 
    ('2021-03-02 02:02:02', 2, None),
 
    ('2021-03-03T03:03:03-0500', 3, -18000),
 
    ('Thu, 4 Mar 2021 04:04:04 -0600', 4, -21600),
 
    ('Fri, 5 Mar 2021 05:05:05 GMT', 5, 0),
 
    ('20210306T060606Z', 6, 0),
 
    ('Sun, Mar 7, 2021 07:07:07 AM', 7, None),
 
    ('Sun, Mar 14, 2021 02:14:14 PM', 14, None),
 
])
 
def test_rt_datetime(arg, exp_num, exp_offset):
 
    actual = rtutil.RTDateTime(arg)
 
    assert actual.year == 2021
 
    assert actual.month == 3
 
    assert actual.day == exp_num
 
    assert actual.hour == exp_num
 
    assert actual.minute == exp_num
 
    assert actual.second == exp_num
 
    if exp_offset is None:
 
        assert actual.tzinfo is None
 
    else:
 
        assert actual.tzinfo.utcoffset(None).total_seconds() == exp_offset
 

	
 
@pytest.mark.parametrize('arg', ['Not set', '', None])
 
def test_rt_datetime_empty(arg):
 
    actual = rtutil.RTDateTime(arg)
 
    assert actual == datetime.datetime.min
 
    assert actual.tzinfo is None
0 comments (0 inline, 0 general)