Files @ 6deaacb11bdd
Branch filter:

Location: NPO-Accounting/conservancy_beancount/conservancy_beancount/rtutil.py

bkuhn
Add US:TN:Unemployment as a valid `payroll-type` metadata for taxes
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
"""RT client utilities"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import datetime
import functools
import logging
import mimetypes
import os
import re
import sqlite3
import urllib.parse as urlparse

import dateutil.parser
import rt

from pathlib import Path

from . import data
from beancount.core import data as bc_data

from typing import (
    cast,
    overload,
    Callable,
    Iterable,
    Iterator,
    MutableMapping,
    Optional,
    Set,
    Tuple,
    Union,
)
from .beancount_types import (
    Transaction,
)

RTId = Union[int, str]
TicketAttachmentIds = Tuple[str, Optional[str]]
_LinkCache = MutableMapping[TicketAttachmentIds, Optional[str]]
_URLLookup = Callable[..., Optional[str]]

class RTDateTime(datetime.datetime):
    """Construct datetime objects from strings returned by RT

    Typical usage looks like::

        ticket = rt_client.get_ticket(...)
        created = RTDateTime(ticket.get('Created'))
    """
    # Normally I'd just write a function to do this, but having a dedicated
    # class helps support query-report: the class can pull double duty to both
    # parse the data from RT, and determine proper output formatting.
    # The RT REST API returns datetimes in the user's configured timezone, and
    # there doesn't seem to be any API call that tells you what that is. You
    # have to live with the object being timezone-naive.
    def __new__(cls, source: str) -> 'RTDateTime':
        if not source or source == 'Not set':
            retval = datetime.datetime.min
        else:
            retval = dateutil.parser.parse(source)
        return cast(RTDateTime, retval)


class RTLinkCache(_LinkCache):
    """Cache RT links to disk

    This class provides a dict-like interface to a cache of RT links.
    Once an object is in RT, a link to it should never change.
    The only exception is when objects get shredded, and those objects
    shouldn't be referenced in books anyway.

    This implementation is backed by a sqlite database. You can call::

        db = RTLinkCache.setup(path)

    This method will try to open a sqlite database at the given path,
    and set up necessary tables, etc.
    If it succeeds, it returns a database connection you can use to
    initialize the cache.
    If it fails, it returns None, and the caller should use some other
    dict-like object (like a normal dict) for caching.
    You can give the result to the RT utility class either way,
    and it will do the right thing for itself::

        rt = RT(rt_client, db)
    """

    CREATE_TABLE_SQL = """CREATE TABLE IF NOT EXISTS RTLinkCache(
 ticket_id TEXT NOT NULL,
 attachment_id TEXT,
 url TEXT NOT NULL,
 PRIMARY KEY (ticket_id, attachment_id)
)"""
    logger = logging.getLogger('conservancy_beancount.rtutil.RTLinkCache')

    @classmethod
    def setup(cls, cache_path: Path) -> Optional[sqlite3.Connection]:
        try:
            db = sqlite3.connect(os.fspath(cache_path), isolation_level=None)
            cursor = db.cursor()
            cursor.execute(cls.CREATE_TABLE_SQL)
            cursor.execute('SELECT url FROM RTLinkCache LIMIT 1')
            have_data = cursor.fetchone() is not None
        except sqlite3.OperationalError:
            # If we couldn't get this far, sqlite provides no benefit.
            cls.logger.debug("setup: error loading %s", cache_path, exc_info=True)
            return None
        try:
            # There shouldn't be any records where url is NULL, so running this
            # DELETE pulls double duty for us: it tells us whether or not we
            # can write to the database and it enforces database integrity.
            cursor.execute('DELETE FROM RTLinkCache WHERE url IS NULL')
        except sqlite3.OperationalError:
            cls.logger.debug("setup: error writing %s", cache_path, exc_info=True)
            can_write = False
        else:
            can_write = True
        if not (can_write or have_data):
            # If there's nothing to read and no way to write, sqlite provides
            # no benefit.
            cls.logger.debug("setup: not using %s: nothing to read or write", cache_path)
            return None
        elif not can_write:
            # Set up an in-memory database that we can write to, seeded with
            # the data available to read.
            try:
                cursor.close()
                db.close()
                db = sqlite3.connect(':memory:', isolation_level=None)
                cursor = db.cursor()
                # It would better to use
                #   '{}?mode=ro'.format(cache_path.as_uri())
                # as the argument here, but that doesn't work on SUSE 15,
                # possibly because its sqlite doesn't recognize query
                # arguments (added to upstream sqlite in late 2016).
                cursor.execute('ATTACH DATABASE ? AS readsource',
                               (os.fspath(cache_path),))
                cursor.execute(cls.CREATE_TABLE_SQL)
                cursor.execute('INSERT INTO RTLinkCache SELECT * FROM readsource.RTLinkCache')
                cursor.execute('DETACH DATABASE readsource')
            except sqlite3.OperationalError as error:
                # We're back to the case of having nothing to read and no way
                # to write.
                cls.logger.debug("setup: error loading %s into memory", cache_path, exc_info=True)
                return None
            else:
                cls.logger.debug("setup: loaded %s into memory", cache_path)
        else:
            cls.logger.debug("setup: caching at %s", cache_path)
        cursor.close()
        db.commit()
        return db

    def __init__(self, cache_db: sqlite3.Connection) -> None:
        self._db = cache_db
        self._nourls: Set[TicketAttachmentIds] = set()

    def __iter__(self) -> Iterator[TicketAttachmentIds]:
        yield from self._db.execute('SELECT ticket_id, attachment_id FROM RTLinkCache')
        yield from self._nourls

    def __len__(self) -> int:
        cursor = self._db.execute('SELECT COUNT(*) FROM RTLinkCache')
        count: int = cursor.fetchone()[0]
        return count + len(self._nourls)

    def __getitem__(self, key: TicketAttachmentIds) -> Optional[str]:
        if key in self._nourls:
            return None
        cursor = self._db.execute(
            'SELECT url FROM RTLinkCache WHERE ticket_id = ? AND attachment_id IS ?',
            key,
        )
        row = cursor.fetchone()
        if row is None:
            raise KeyError(key)
        else:
            retval: str = row[0]
            return retval

    def __setitem__(self, key: TicketAttachmentIds, value: Optional[str]) -> None:
        if value is None:
            self._nourls.add(key)
        else:
            ticket_id, attachment_id = key
            try:
                self._db.execute(
                    'INSERT INTO RTLinkCache VALUES(?, ?, ?)',
                    (ticket_id, attachment_id, value),
                )
            except sqlite3.IntegrityError:
                # Another instance might've inserted the URL before we did.
                # That's fine as long as it's the same URL, which it should be.
                if value != self.get(key):
                    raise

    def __delitem__(self, key: TicketAttachmentIds) -> None:
        raise NotImplementedError("RTLinkCache.__delitem__")


class RT:
    """RT utility wrapper class

    Given an RT client object, this class provides common functionality for
    working with RT links in Beancount metadata:

    * Parse links
    * Verify that they refer to extant objects in RT
    * Convert metadata links to RT web links
    * Cache results, to reduce network requests.
      You can set up an RTLinkCache to cache links to disks over multiple runs.
      Refer to RTLinkCache's docstring for details and instructions.
    """

    PARSE_REGEXPS = [
        re.compile(r'^rt:([0-9]+)(?:/([0-9]+))?/?$'),
        re.compile(r'^rt://ticket/([0-9]+)(?:/attachments?/([0-9]+))?/?$'),
    ]

    def __init__(self, rt_client: rt.Rt, cache_db: Optional[sqlite3.Connection]=None) -> None:
        urlparts = urlparse.urlparse(rt_client.url)
        try:
            index = urlparts.path.rindex('/REST/')
        except ValueError:
            base_path = urlparts.path.rstrip('/') + '/'
        else:
            base_path = urlparts.path[:index + 1]
        self.url_base = urlparts._replace(path=base_path)
        self.rt = rt_client
        self._cache: _LinkCache
        if cache_db is None:
            self._cache = {}
        else:
            self._cache = RTLinkCache(cache_db)

    # mypy complains that the first argument isn't self, but this isn't meant
    # to be a method, it's just an internal decrator.
    def _cache_method(func: _URLLookup) -> _URLLookup:  # type:ignore[misc]
        @functools.wraps(func)
        def caching_wrapper(self: 'RT',
                            ticket_id: RTId,
                            attachment_id: Optional[RTId]=None,
        ) -> Optional[str]:
            cache_key = (str(ticket_id),
                         None if attachment_id is None else str(attachment_id))
            url: Optional[str]
            try:
                url = self._cache[cache_key]
            except KeyError:
                if attachment_id is None:
                    url = func(self, ticket_id)
                else:
                    url = func(self, ticket_id, attachment_id)
                self._cache[cache_key] = url
            return url
        return caching_wrapper

    def _extend_url(self,
                    path_tail: str,
                    fragment: Optional[str]=None,
                    **query: str,
    ) -> str:
        if fragment is None:
            fragment = self.url_base.fragment
        else:
            fragment = urlparse.quote(fragment)
        if query:
            query_s = urlparse.urlencode(query)
        else:
            query_s = self.url_base.query
        urlparts = self.url_base._replace(
            path=self.url_base.path + urlparse.quote(path_tail),
            query=query_s,
            fragment=fragment,
        )
        return urlparse.urlunparse(urlparts)

    def _ticket_url(self, ticket_id: RTId, txn_id: Optional[RTId]=None) -> str:
        if txn_id is None:
            fragment = None
        else:
            fragment = 'txn-{}'.format(txn_id)
        return self._extend_url('Ticket/Display.html', fragment, id=str(ticket_id))

    @_cache_method
    def attachment_url(self, ticket_id: RTId, attachment_id: RTId) -> Optional[str]:
        attachment = self.rt.get_attachment(ticket_id, attachment_id)
        if attachment is None:
            return None
        mimetype = attachment.get('ContentType', '')
        if mimetype.startswith('text/'):
            return self._ticket_url(ticket_id, attachment['Transaction'])
        else:
            filename = attachment.get('Filename', '')
            if not filename:
                filename = 'RT{} attachment {}{}'.format(
                    ticket_id,
                    attachment_id,
                    mimetypes.guess_extension(mimetype) or '.bin',
                )
            path_tail = 'Ticket/Attachment/{0[Transaction]}/{0[id]}/{1}'.format(
                attachment,
                filename,
            )
            return self._extend_url(path_tail)

    def exists(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> bool:
        return self.url(ticket_id, attachment_id) is not None

    def iter_urls(self,
                  links: Iterable[str],
                  rt_fmt: str='{}',
                  nonrt_fmt: str='{}',
                  missing_fmt: str='{}',
    ) -> Iterator[str]:
        """Iterate over metadata links, replacing RT references with web URLs

        This method iterates over metadata link strings (e.g., from
        Metadata.get_links()) and transforms them for web presentation.

        If the string is a valid RT reference, the corresponding web URL
        will be formatted with ``rt_fmt``.

        If the string is a well-formed RT reference but the object doesn't
        exist, it will be formatted with ``missing_fmt``.

        All other link strings will be formatted with ``nonrt_fmt``.

        """
        for link in links:
            parsed = self.parse(link)
            if parsed is None:
                yield nonrt_fmt.format(link)
            else:
                ticket_id, attachment_id = parsed
                url = self.url(ticket_id, attachment_id)
                if url is None:
                    yield missing_fmt.format(link)
                else:
                    yield rt_fmt.format(url)

    @classmethod
    def metadata_regexp(self,
                        ticket_id: RTId,
                        attachment_id: Optional[RTId]=None,
                        *,
                        first_link_only: bool=False
    ) -> str:
        """Return a pattern to find RT links in metadata

        Given a ticket ID and optional attachment ID, this method returns a
        regular expression pattern that will find matching RT links in a
        metadata value string, written in any format.

        If the keyword-only argument first_link_only is true, the pattern will
        only match the first link in a metadata string. Otherwise the pattern
        matches any link in the string (the default).
        """
        if first_link_only:
            prolog = r'^\s*'
        else:
            prolog = r'(?:^|\s)'
        if attachment_id is None:
            attachment = ''
        else:
            attachment = r'/(?:attachments?/)?{}'.format(attachment_id)
        ticket = r'rt:(?://ticket/)?{}'.format(ticket_id)
        epilog = r'/?(?:$|\s)'
        return f'{prolog}{ticket}{attachment}{epilog}'

    @classmethod
    def parse(cls, s: str) -> Optional[Tuple[str, Optional[str]]]:
        for regexp in cls.PARSE_REGEXPS:
            match = regexp.match(s)
            if match is not None:
                ticket_id, attachment_id = match.groups()
                return (ticket_id, attachment_id)
        return None

    @_cache_method
    def ticket_url(self, ticket_id: RTId) -> Optional[str]:
        if self.rt.get_ticket(ticket_id) is None:
            return None
        return self._ticket_url(ticket_id)

    @overload
    def _meta_with_urls(self,
                        meta: None,
                        rt_fmt: str,
                        nonrt_fmt: str,
                        missing_fmt: str,
    ) -> None: ...

    @overload
    def _meta_with_urls(self,
                        meta: bc_data.Meta,
                        rt_fmt: str,
                        nonrt_fmt: str,
                        missing_fmt: str,
    ) -> bc_data.Meta: ...

    def _meta_with_urls(self,
                        meta: Optional[bc_data.Meta],
                        rt_fmt: str,
                        nonrt_fmt: str,
                        missing_fmt: str,
    ) -> Optional[bc_data.Meta]:
        if meta is None:
            return None
        link_meta = data.Metadata(meta)
        retval = meta.copy()
        for key in data.LINK_METADATA:
            try:
                links = link_meta.get_links(key)
            except TypeError:
                links = ()
            if links:
                retval[key] = ' '.join(self.iter_urls(
                    links, rt_fmt, nonrt_fmt, missing_fmt,
                ))
        return retval

    def txn_with_urls(self, txn: Transaction,
                      rt_fmt: str='<{}>',
                      nonrt_fmt: str='{}',
                      missing_fmt: str='{}',
    ) -> Transaction:
        """Copy a transaction with RT references replaced with web URLs

        Given a Beancount Transaction, this method returns a Transaction
        that's identical, except any references to RT in the metadata for
        the Transaction and its Postings are replaced with web URLs.
        This is useful for reporting tools that want to format the
        transaction with URLs that are recognizable by other tools.

        The format string arguments have the same meaning as RT.iter_urls().
        See that docstring for details.
        """
        # mypy doesn't recognize that postings is a valid argument, probably a
        # bug in the NamedTuple→Directive→Transaction hierarchy.
        return txn._replace(  # type:ignore[call-arg]
            meta=self._meta_with_urls(txn.meta, rt_fmt, nonrt_fmt, missing_fmt),
            postings=[post._replace(meta=self._meta_with_urls(
                post.meta, rt_fmt, nonrt_fmt, missing_fmt,
            )) for post in txn.postings],
        )

    @classmethod
    def unparse(cls, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> str:
        """Return a metadata link string for the given ticket+attachment id"""
        if attachment_id is None:
            return f'rt:{ticket_id}'
        else:
            return f'rt:{ticket_id}/{attachment_id}'

    def url(self, ticket_id: RTId, attachment_id: Optional[RTId]=None) -> Optional[str]:
        if attachment_id is None:
            return self.ticket_url(ticket_id)
        else:
            return self.attachment_url(ticket_id, attachment_id)