diff --git a/conservancy_beancount/rtutil.py b/conservancy_beancount/rtutil.py new file mode 100644 index 0000000000000000000000000000000000000000..3bdc2f9ea8077e75fdf3db50df1f40c47dff661e --- /dev/null +++ b/conservancy_beancount/rtutil.py @@ -0,0 +1,107 @@ +"""RT client utilities""" +# Copyright © 2020 Brett Smith +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import functools +import mimetypes +import urllib.parse as urlparse + +import rt + +from typing import ( + Mapping, + Optional, + Tuple, + Union, +) + +AttachmentTuple = Tuple[str, str, str, str] +RTId = Union[int, str] + +class RT: + def __init__(self, rt_client: rt.Rt) -> None: + self.rt = rt_client + urlparts = urlparse.urlparse(rt_client.url) + try: + index = urlparts.path.index('/REST/') + except ValueError: + base_path = urlparts.path.rstrip('/') + '/' + else: + base_path = urlparts.path[:index + 1] + self.url_base = urlparts._replace(path=base_path) + + def _extend_url(self, + path_tail: str, + fragment: Optional[str]=None, + **query: str, + ) -> str: + if fragment is None: + fragment = self.url_base.fragment + else: + fragment = urlparse.quote(fragment) + if query: + query_s = urlparse.urlencode(query) + else: + query_s = self.url_base.query + urlparts = self.url_base._replace( + path=self.url_base.path + urlparse.quote(path_tail), + query=query_s, + fragment=fragment, + ) + return urlparse.urlunparse(urlparts) + + def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str: + if txn_id is None: + fragment = None + else: + fragment = 'txn-{}'.format(txn_id) + return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id)) + + @functools.lru_cache() + def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]: + attachment = self.rt.get_attachment(ticket_id, attachment_id) + if attachment is None: + return None + mimetype = attachment.get('ContentType', '') + if mimetype.startswith('text/'): + return self._ticket_url(ticket_id, attachment['Transaction']) + else: + filename = attachment.get('Filename', '') + if not filename: + filename = 'RT{} attachment {}{}'.format( + ticket_id, + attachment_id, + mimetypes.guess_extension(mimetype) or '.bin', + ) + path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format( + attachment, + filename, + ) + return self._extend_url(path_tail) + + def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool: + return self.url(ticket_id, attachment_id) is not None + + @functools.lru_cache() + def ticket_url(self, ticket_id: RTId) -> Optional[str]: + if self.rt.get_ticket(ticket_id) is None: + return None + return self._ticket_url(ticket_id) + + def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]: + if attachment_id is None: + return self.ticket_url(ticket_id) + else: + return self.attachment_url(ticket_id, attachment_id) diff --git a/tests/test_rtutil.py b/tests/test_rtutil.py new file mode 100644 index 0000000000000000000000000000000000000000..accb7af4c13267f46e71bad5890516b7b03c3e2d --- /dev/null +++ b/tests/test_rtutil.py @@ -0,0 +1,99 @@ +"""Test RT integration""" +# Copyright © 2020 Brett Smith +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import pytest + +from . import testutil + +from conservancy_beancount import rtutil + +DEFAULT_RT_URL = testutil.RTClient.DEFAULT_URL[:-9] + +EXPECTED_URLS = [ + (1, None, 'Ticket/Display.html?id=1'), + (1, 2, 'Ticket/Display.html?id=1#txn-1'), + (1, 4, 'Ticket/Attachment/1/4/Forwarded%20Message.eml'), + (1, 99, None), + (2, 1, None), + (2, 10, 'Ticket/Attachment/7/10/screenshot.png'), + (2, 13, 'Ticket/Display.html?id=2#txn-11'), + (2, 14, 'Ticket/Display.html?id=2#txn-11'), # statement.txt + (3, None, 'Ticket/Display.html?id=3'), + (9, None, None), +] + +@pytest.fixture +def rt(): + client = testutil.RTClient() + return rtutil.RT(client) + +@pytest.fixture +def new_client(): + class RTClient(testutil.RTClient): + TICKET_DATA = {'1': [], '2': []} + return RTClient() + +@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS) +def test_url(rt, ticket_id, attachment_id, expected): + if expected is not None: + expected = DEFAULT_RT_URL + expected + assert rt.url(ticket_id, attachment_id) == expected + +@pytest.mark.parametrize('attachment_id', [ + 3, + None, +]) +def test_url_caches(new_client, attachment_id): + new_client.TICKET_DATA['1'].append(('3', '(Unnamed)', 'text/plain', '3.0k')) + if attachment_id is None: + fragment = '' + else: + fragment = '#txn-3' + expected = '{}Ticket/Display.html?id=1{}'.format(DEFAULT_RT_URL, fragment) + rt = rtutil.RT(new_client) + assert rt.url(1, attachment_id) == expected + new_client.TICKET_DATA.clear() + assert rt.url(1, attachment_id) == expected + +@pytest.mark.parametrize('mimetype,extension', [ + ('application/pdf', 'pdf'), + ('image/png', 'png'), + ('message/rfc822', 'eml'), + ('x-test/x-unknown', 'bin'), +]) +def test_url_default_filename(new_client, mimetype, extension): + new_client.TICKET_DATA['1'].append(('9', '(Unnamed)', mimetype, '50.5k')) + rt = rtutil.RT(new_client) + expected = '{}Ticket/Attachment/9/9/RT1%20attachment%209.{}'.format(DEFAULT_RT_URL, extension) + assert rt.url(1, 9) == expected + +@pytest.mark.parametrize('ticket_id,attachment_id,expected', EXPECTED_URLS) +def test_exists(rt, ticket_id, attachment_id, expected): + expected = False if expected is None else True + assert rt.exists(ticket_id, attachment_id) is expected + +def test_exists_caches(new_client): + new_client.TICKET_DATA['1'].append(('3', '(Unnamed)', 'text/plain', '3.0k')) + rt = rtutil.RT(new_client) + assert rt.exists(1, 3) + assert rt.exists(2) + assert not rt.exists(1, 9) + assert not rt.exists(9) + new_client.TICKET_DATA.clear() + assert rt.exists(1, 3) + assert rt.exists(2) + assert not rt.exists(1, 9) + assert not rt.exists(9) diff --git a/tests/testutil.py b/tests/testutil.py index bb30ffd69af77c206da44de6c718fbb251eb244f..c85e901a15dcfab00b5d68230ff78b15b43a48c6 100644 --- a/tests/testutil.py +++ b/tests/testutil.py @@ -15,6 +15,7 @@ # along with this program. If not, see . import datetime +import itertools import beancount.core.amount as bc_amount import beancount.core.data as bc_data @@ -114,9 +115,54 @@ class TestConfig: return self.repo_path +class _TicketBuilder: + MESSAGE_ATTACHMENTS = [ + ('(Unnamed)', 'multipart/alternative', '0b'), + ('(Unnamed)', 'text/plain', '1.2k'), + ('(Unnamed)', 'text/html', '1.4k'), + ] + MISC_ATTACHMENTS = [ + ('Forwarded Message.eml', 'message/rfc822', '3.1k'), + ('photo.jpg', 'image/jpeg', '65.2k'), + ('document.pdf', 'application/pdf', '326k'), + ('screenshot.png', 'image/png', '1.9m'), + ('statement.txt', 'text/plain', '652b'), + ] + + def __init__(self): + self.id_seq = itertools.count(1) + self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS) + + def new_attch(self, attch): + return (str(next(self.id_seq)), *attch) + + def new_msg_with_attachments(self, attachments_count=1): + for attch in self.MESSAGE_ATTACHMENTS: + yield self.new_attch(attch) + for _ in range(attachments_count): + yield self.new_attch(next(self.misc_attchs)) + + def new_messages(self, messages_count, attachments_count=None): + for n in range(messages_count): + if attachments_count is None: + att_count = messages_count - n + else: + att_count = attachments_count + yield from self.new_msg_with_attachments(att_count) + + class RTClient: + _builder = _TicketBuilder() + DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/' + TICKET_DATA = { + '1': list(_builder.new_messages(1, 3)), + '2': list(_builder.new_messages(2, 1)), + '3': list(_builder.new_messages(3, 0)), + } + del _builder + def __init__(self, - url, + url=DEFAULT_URL, default_login=None, default_password=None, proxy=None, @@ -145,3 +191,44 @@ class RTClient: self.login_result = bool(login and password and not password.startswith('bad')) self.last_login = (login, password, self.login_result) return self.login_result + + def get_attachments(self, ticket_id): + try: + return list(self.TICKET_DATA[str(ticket_id)]) + except KeyError: + return None + + def get_attachment(self, ticket_id, attachment_id): + try: + att_seq = iter(self.TICKET_DATA[str(ticket_id)]) + except KeyError: + None + att_id = str(attachment_id) + multipart_id = None + for attch in att_seq: + if attch[0] == att_id: + break + elif attch[2].startswith('multipart/'): + multipart_id = attch[0] + else: + return None + tx_id = multipart_id or att_id + if attch[1] == '(Unnamed)': + filename = '' + else: + filename = attch[1] + return { + 'id': att_id, + 'ContentType': attch[2], + 'Filename': filename, + 'Transaction': tx_id, + } + + def get_ticket(self, ticket_id): + ticket_id_s = str(ticket_id) + if ticket_id_s not in self.TICKET_DATA: + return None + return { + 'id': 'ticket/{}'.format(ticket_id_s), + 'numerical_id': ticket_id_s, + }