Changeset - d5a6141f6db6
[Not reviewed]
0 1 2
Brett Smith - 4 years ago 2020-03-24 21:23:54
brettcsmith@brettcsmith.org
rtutil: Start module.

For now, this is basically the Python version of
ledger-tag-convert.plx. It knows how to create RT web links from
ticket and attachment IDs. It confirms that those objects actually
exist too. It may grow to encompass other functionality in the
future.
3 files changed with 294 insertions and 1 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/rtutil.py
Show inline comments
 
new file 100644
 
"""RT client utilities"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import functools
 
import mimetypes
 
import urllib.parse as urlparse
 

	
 
import rt
 

	
 
from typing import (
 
    Mapping,
 
    Optional,
 
    Tuple,
 
    Union,
 
)
 

	
 
AttachmentTuple = Tuple[str, str, str, str]
 
RTId = Union[int, str]
 

	
 
class RT:
 
    def __init__(self, rt_client: rt.Rt) -> None:
 
        self.rt = rt_client
 
        urlparts = urlparse.urlparse(rt_client.url)
 
        try:
 
            index = urlparts.path.index('/REST/')
 
        except ValueError:
 
            base_path = urlparts.path.rstrip('/') + '/'
 
        else:
 
            base_path = urlparts.path[:index + 1]
 
        self.url_base = urlparts._replace(path=base_path)
 

	
 
    def _extend_url(self,
 
                    path_tail: str,
 
                    fragment: Optional[str]=None,
 
                    **query: str,
 
    ) -> str:
 
        if fragment is None:
 
            fragment = self.url_base.fragment
 
        else:
 
            fragment = urlparse.quote(fragment)
 
        if query:
 
            query_s = urlparse.urlencode(query)
 
        else:
 
            query_s = self.url_base.query
 
        urlparts = self.url_base._replace(
 
            path=self.url_base.path + urlparse.quote(path_tail),
 
            query=query_s,
 
            fragment=fragment,
 
        )
 
        return urlparse.urlunparse(urlparts)
 

	
 
    def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
 
        if txn_id is None:
 
            fragment = None
 
        else:
 
            fragment = 'txn-{}'.format(txn_id)
 
        return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))
 

	
 
    @functools.lru_cache()
 
    def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
 
        attachment = self.rt.get_attachment(ticket_id, attachment_id)
 
        if attachment is None:
 
            return None
 
        mimetype = attachment.get('ContentType', '')
 
        if mimetype.startswith('text/'):
 
            return self._ticket_url(ticket_id, attachment['Transaction'])
 
        else:
 
            filename = attachment.get('Filename', '')
 
            if not filename:
 
                filename = 'RT{} attachment {}{}'.format(
 
                    ticket_id,
 
                    attachment_id,
 
                    mimetypes.guess_extension(mimetype) or '.bin',
 
                )
 
            path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
 
                attachment,
 
                filename,
 
            )
 
            return self._extend_url(path_tail)
 

	
 
    def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
 
        return self.url(ticket_id, attachment_id) is not None
 

	
 
    @functools.lru_cache()
 
    def ticket_url(self, ticket_id: RTId) -> Optional[str]:
 
        if self.rt.get_ticket(ticket_id) is None:
 
            return None
 
        return self._ticket_url(ticket_id)
 

	
 
    def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]:
 
        if attachment_id is None:
 
            return self.ticket_url(ticket_id)
 
        else:
 
            return self.attachment_url(ticket_id, attachment_id)
tests/test_rtutil.py
Show inline comments
 
new file 100644
 
"""Test RT integration"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import rtutil
 

	
 
DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9]
 

	
 
EXPECTED_URLS = [
 
    (1, None, 'Ticket/Display.html?id=1'),
 
    (1, 2, 'Ticket/Display.html?id=1#txn-1'),
 
    (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'),
 
    (1, 99, None),
 
    (2, 1, None),
 
    (2, 10, 'Ticket/Attachment/7/10/screenshot.png'),
 
    (2, 13, 'Ticket/Display.html?id=2#txn-11'),
 
    (2, 14, 'Ticket/Display.html?id=2#txn-11'),  # statement.txt
 
    (3, None, 'Ticket/Display.html?id=3'),
 
    (9, None, None),
 
]
 

	
 
@pytest.fixture
 
def rt():
 
    client = testutil.RTClient()
 
    return rtutil.RT(client)
 

	
 
@pytest.fixture
 
def new_client():
 
    class RTClient(testutil.RTClient):
 
        TICKET_DATA = {'1': [], '2': []}
 
    return RTClient()
 

	
 
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
 
def test_url(rt, ticket_id, attachment_id, expected):
 
    if expected is not None:
 
        expected = DEFAULT_RT_URL + expected
 
    assert rt.url(ticket_id, attachment_id) == expected
 

	
 
@pytest.mark.parametrize('attachment_id', [
 
    3,
 
    None,
 
])
 
def test_url_caches(new_client, attachment_id):
 
    new_client.TICKET_DATA['1'].append(('3', '(Unnamed)', 'text/plain', '3.0k'))
 
    if attachment_id is None:
 
        fragment = ''
 
    else:
 
        fragment = '#txn-3'
 
    expected = '{}Ticket/Display.html?id=1{}'.format(DEFAULT_RT_URL, fragment)
 
    rt = rtutil.RT(new_client)
 
    assert rt.url(1, attachment_id) == expected
 
    new_client.TICKET_DATA.clear()
 
    assert rt.url(1, attachment_id) == expected
 

	
 
@pytest.mark.parametrize('mimetype,extension', [
 
    ('application/pdf', 'pdf'),
 
    ('image/png', 'png'),
 
    ('message/rfc822', 'eml'),
 
    ('x-test/x-unknown', 'bin'),
 
])
 
def test_url_default_filename(new_client, mimetype, extension):
 
    new_client.TICKET_DATA['1'].append(('9', '(Unnamed)', mimetype, '50.5k'))
 
    rt = rtutil.RT(new_client)
 
    expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension)
 
    assert rt.url(1, 9) == expected
 

	
 
@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS)
 
def test_exists(rt, ticket_id, attachment_id, expected):
 
    expected = False if expected is None else True
 
    assert rt.exists(ticket_id, attachment_id) is expected
 

	
 
def test_exists_caches(new_client):
 
    new_client.TICKET_DATA['1'].append(('3', '(Unnamed)', 'text/plain', '3.0k'))
 
    rt = rtutil.RT(new_client)
 
    assert rt.exists(1, 3)
 
    assert rt.exists(2)
 
    assert not rt.exists(1, 9)
 
    assert not rt.exists(9)
 
    new_client.TICKET_DATA.clear()
 
    assert rt.exists(1, 3)
 
    assert rt.exists(2)
 
    assert not rt.exists(1, 9)
 
    assert not rt.exists(9)
tests/testutil.py
Show inline comments
...
 
@@ -15,6 +15,7 @@
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import itertools
 

	
 
import beancount.core.amount as bc_amount
 
import beancount.core.data as bc_data
...
 
@@ -114,9 +115,54 @@ class TestConfig:
 
        return self.repo_path
 

	
 

	
 
class _TicketBuilder:
 
    MESSAGE_ATTACHMENTS = [
 
        ('(Unnamed)', 'multipart/alternative', '0b'),
 
        ('(Unnamed)', 'text/plain', '1.2k'),
 
        ('(Unnamed)', 'text/html', '1.4k'),
 
    ]
 
    MISC_ATTACHMENTS = [
 
        ('Forwarded Message.eml', 'message/rfc822', '3.1k'),
 
        ('photo.jpg', 'image/jpeg', '65.2k'),
 
        ('document.pdf', 'application/pdf', '326k'),
 
        ('screenshot.png', 'image/png', '1.9m'),
 
        ('statement.txt', 'text/plain', '652b'),
 
    ]
 

	
 
    def __init__(self):
 
        self.id_seq = itertools.count(1)
 
        self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
 

	
 
    def new_attch(self, attch):
 
        return (str(next(self.id_seq)), *attch)
 

	
 
    def new_msg_with_attachments(self, attachments_count=1):
 
        for attch in self.MESSAGE_ATTACHMENTS:
 
            yield self.new_attch(attch)
 
        for _ in range(attachments_count):
 
            yield self.new_attch(next(self.misc_attchs))
 

	
 
    def new_messages(self, messages_count, attachments_count=None):
 
        for n in range(messages_count):
 
            if attachments_count is None:
 
                att_count = messages_count - n
 
            else:
 
                att_count = attachments_count
 
            yield from self.new_msg_with_attachments(att_count)
 

	
 

	
 
class RTClient:
 
    _builder = _TicketBuilder()
 
    DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
 
    TICKET_DATA = {
 
        '1': list(_builder.new_messages(1, 3)),
 
        '2': list(_builder.new_messages(2, 1)),
 
        '3': list(_builder.new_messages(3, 0)),
 
    }
 
    del _builder
 

	
 
    def __init__(self,
 
                 url,
 
                 url=DEFAULT_URL,
 
                 default_login=None,
 
                 default_password=None,
 
                 proxy=None,
...
 
@@ -145,3 +191,44 @@ class RTClient:
 
        self.login_result = bool(login and password and not password.startswith('bad'))
 
        self.last_login = (login, password, self.login_result)
 
        return self.login_result
 

	
 
    def get_attachments(self, ticket_id):
 
        try:
 
            return list(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            return None
 

	
 
    def get_attachment(self, ticket_id, attachment_id):
 
        try:
 
            att_seq = iter(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            None
 
        att_id = str(attachment_id)
 
        multipart_id = None
 
        for attch in att_seq:
 
            if attch[0] == att_id:
 
                break
 
            elif attch[2].startswith('multipart/'):
 
                multipart_id = attch[0]
 
        else:
 
            return None
 
        tx_id = multipart_id or att_id
 
        if attch[1] == '(Unnamed)':
 
            filename = ''
 
        else:
 
            filename = attch[1]
 
        return {
 
            'id': att_id,
 
            'ContentType': attch[2],
 
            'Filename': filename,
 
            'Transaction': tx_id,
 
        }
 

	
 
    def get_ticket(self, ticket_id):
 
        ticket_id_s = str(ticket_id)
 
        if ticket_id_s not in self.TICKET_DATA:
 
            return None
 
        return {
 
            'id': 'ticket/{}'.format(ticket_id_s),
 
            'numerical_id': ticket_id_s,
 
        }
0 comments (0 inline, 0 general)