Files @ 32b62df5405f
Branch filter:

Location: NPO-Accounting/conservancy_beancount/conservancy_beancount/reports/accrual.py

Brett Smith
cliutil: Better implementation of is_main_script.

The old one could return True if you called accrual.main()
directly from one-off test scripts.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
#!/usr/bin/env python3
"""accrual-report - Status reports for accruals

accrual-report checks accruals (postings under Assets:Receivable and
Liabilities:Payable) for errors and metadata consistency, and reports any
problems on stderr. Then it writes a report about the status of those
accruals on stdout.

The typical way to run it is to pass an RT ticket number or invoice link as an
argument::

    # Report all accruals associated with RT#1230:
    accrual-report 1230
    # Report all accruals with the invoice link rt:45/670.
    accrual-report 45/670
    # Report all accruals with the invoice link Invoice980.pdf.
    accrual-report Invoice980.pdf

By default, to stay fast, accrual-report only looks for postings from the
beginning of the last fiscal year. You can search further back in history
by passing the ``--since`` argument. The argument can be a fiscal year, or
a negative number of how many years back to search::

    # Search for accruals since 2016
    accrual-report --since 2016 [search terms …]
    # Search for accruals from the beginning of three fiscal years ago
    accrual-report --since -3 [search terms …]

If you want to further limit what accruals are reported, you can match on
other metadata by passing additional arguments in ``name=value`` format.
You can pass any number of search terms. For example::

    # Report accruals associated with RT#1230 and Jane Doe
    accrual-report 1230 entity=Doe-Jane

accrual-report will automatically decide what kind of report to generate from
the results of your search. If the search matches a single outstanding payable,
it will write an outgoing approval report; otherwise, it writes a basic balance
report. You can request a specific report type with the ``--report-type``
option::

    # Write an outgoing approval report for all outstanding accruals for
    # Jane Doe, even if there's more than one
    accrual-report --report-type outgoing entity=Doe-Jane
"""
# Copyright © 2020  Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import argparse
import collections
import datetime
import enum
import logging
import re
import sys

from typing import (
    Callable,
    Dict,
    Iterable,
    Iterator,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    TextIO,
    Tuple,
)
from ..beancount_types import (
    Error,
    MetaKey,
    MetaValue,
    Transaction,
)

import rt

from beancount.parser import printer as bc_printer

from . import core
from .. import cliutil
from .. import config as configmod
from .. import data
from .. import filters
from .. import rtutil

PROGNAME = 'accrual-report'

PostGroups = Mapping[Optional[MetaValue], core.RelatedPostings]
RTObject = Mapping[str, str]

logger = logging.getLogger('conservancy_beancount.reports.accrual')

class Account(NamedTuple):
    name: str
    balance_paid: Callable[[core.Balance], bool]


class AccrualAccount(enum.Enum):
    PAYABLE = Account('Liabilities:Payable', core.Balance.ge_zero)
    RECEIVABLE = Account('Assets:Receivable', core.Balance.le_zero)

    @classmethod
    def account_names(cls) -> Iterator[str]:
        return (acct.value.name for acct in cls)

    @classmethod
    def classify(cls, related: core.RelatedPostings) -> 'AccrualAccount':
        for account in cls:
            account_name = account.value.name
            if all(post.account.is_under(account_name) for post in related):
                return account
        raise ValueError("unrecognized account set in related postings")

    @classmethod
    def filter_paid_accruals(cls, groups: PostGroups) -> PostGroups:
        return {
            key: related
            for key, related in groups.items()
            if not cls.classify(related).value.balance_paid(related.balance())
        }


class BaseReport:
    def __init__(self, out_file: TextIO) -> None:
        self.out_file = out_file
        self.logger = logger.getChild(type(self).__name__)

    def _since_last_nonzero(self, posts: core.RelatedPostings) -> core.RelatedPostings:
        retval = core.RelatedPostings()
        for post in posts:
            if retval.balance().is_zero():
                retval.clear()
            retval.add(post)
        return retval

    def _report(self,
                invoice: str,
                posts: core.RelatedPostings,
                index: int,
    ) -> Iterable[str]:
        raise NotImplementedError("BaseReport._report")

    def run(self, groups: PostGroups) -> None:
        for index, invoice in enumerate(groups):
            for line in self._report(str(invoice), groups[invoice], index):
                print(line, file=self.out_file)


class BalanceReport(BaseReport):
    def _report(self,
                invoice: str,
                posts: core.RelatedPostings,
                index: int,
    ) -> Iterable[str]:
        posts = self._since_last_nonzero(posts)
        balance = posts.balance()
        date_s = posts[0].meta.date.strftime('%Y-%m-%d')
        if index:
            yield ""
        yield f"{invoice}:"
        yield f"  {balance} outstanding since {date_s}"


class OutgoingReport(BaseReport):
    def __init__(self, rt_client: rt.Rt, out_file: TextIO) -> None:
        super().__init__(out_file)
        self.rt_client = rt_client
        self.rt_wrapper = rtutil.RT(rt_client)

    def _primary_rt_id(self, posts: core.RelatedPostings) -> rtutil.TicketAttachmentIds:
        rt_ids = posts.all_meta_links('rt-id')
        rt_ids_count = len(rt_ids)
        if rt_ids_count != 1:
            raise ValueError(f"{rt_ids_count} rt-id links found")
        parsed = rtutil.RT.parse(rt_ids.pop())
        if parsed is None:
            raise ValueError("rt-id is not a valid RT reference")
        else:
            return parsed

    def _report(self,
                invoice: str,
                posts: core.RelatedPostings,
                index: int,
    ) -> Iterable[str]:
        posts = self._since_last_nonzero(posts)
        try:
            ticket_id, _ = self._primary_rt_id(posts)
            ticket = self.rt_client.get_ticket(ticket_id)
            # Note we only use this when ticket is None.
            errmsg = f"ticket {ticket_id} not found"
        except (ValueError, rt.RtError) as error:
            ticket = None
            errmsg = error.args[0]
        if ticket is None:
            self.logger.error(
                "can't generate outgoings report for %s because no RT ticket available: %s",
                invoice, errmsg,
            )
            return

        try:
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
        except (IndexError, rt.RtError):
            rt_requestor = None
        if rt_requestor is None:
            requestor = ''
            requestor_name = ''
        else:
            requestor_name = (
                rt_requestor.get('RealName')
                or ticket.get('CF.{payment-to}')
                or ''
            )
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()

        raw_balance = -posts.balance()
        cost_balance = -posts.balance_at_cost()
        cost_balance_s = cost_balance.format(None)
        if raw_balance == cost_balance:
            balance_s = cost_balance_s
        else:
            balance_s = f'{raw_balance} ({cost_balance_s})'

        contract_links = posts.all_meta_links('contract')
        if contract_links:
            contract_s = ' , '.join(self.rt_wrapper.iter_urls(
                contract_links, missing_fmt='<BROKEN RT LINK: {}>',
            ))
        else:
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
        projects = [v for v in posts.meta_values('project')
                    if isinstance(v, str)]

        yield "PAYMENT FOR APPROVAL:"
        yield f"REQUESTOR: {requestor}"
        yield f"TOTAL TO PAY: {balance_s}"
        yield f"AGREEMENT: {contract_s}"
        yield f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}"
        yield f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}"
        yield f"PROJECT: {', '.join(projects)}"
        yield "\nBEANCOUNT ENTRIES:\n"

        last_txn: Optional[Transaction] = None
        for post in posts:
            txn = post.meta.txn
            if txn is not last_txn:
                last_txn = txn
                txn = self.rt_wrapper.txn_with_urls(txn, '{}')
                yield bc_printer.format_entry(txn)


class ReportType(enum.Enum):
    BALANCE = BalanceReport
    OUTGOING = OutgoingReport
    BAL = BALANCE
    OUT = OUTGOING
    OUTGOINGS = OUTGOING

    @classmethod
    def by_name(cls, name: str) -> 'ReportType':
        try:
            return cls[name.upper()]
        except KeyError:
            raise ValueError(f"unknown report type {name!r}") from None

    @classmethod
    def default_for(cls, groups: PostGroups) -> 'ReportType':
        if len(groups) == 1 and all(
                AccrualAccount.classify(group) is AccrualAccount.PAYABLE
                and not AccrualAccount.PAYABLE.value.balance_paid(group.balance())
                for group in groups.values()
        ):
            return cls.OUTGOING
        else:
            return cls.BALANCE


class ReturnFlag(enum.IntFlag):
    LOAD_ERRORS = 1
    CONSISTENCY_ERRORS = 2
    REPORT_ERRORS = 4
    NOTHING_TO_REPORT = 8


class SearchTerm(NamedTuple):
    meta_key: MetaKey
    pattern: str

    @classmethod
    def parse(cls, s: str) -> 'SearchTerm':
        key_match = re.match(r'^[a-z][-\w]*=', s)
        key: Optional[str]
        if key_match:
            key, _, raw_link = s.partition('=')
        else:
            key = None
            raw_link = s
        rt_ids = rtutil.RT.parse(raw_link)
        if rt_ids is None:
            rt_ids = rtutil.RT.parse('rt:' + raw_link)
        if rt_ids is None:
            if key is None:
                key = 'invoice'
            pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
        else:
            ticket_id, attachment_id = rt_ids
            if key is None:
                key = 'rt-id' if attachment_id is None else 'invoice'
            pattern = rtutil.RT.metadata_regexp(
                ticket_id,
                attachment_id,
                first_link_only=key == 'rt-id' and attachment_id is None,
            )
        return cls(key, pattern)


def consistency_check(groups: PostGroups) -> Iterable[Error]:
    for key, related in groups.items():
        for checked_meta in ['contract', 'entity', 'purchase-order']:
            meta_values = related.meta_values(checked_meta)
            if len(meta_values) != 1:
                errmsg = f'inconsistent {checked_meta} for invoice {key}'
                for post in related:
                    yield Error(
                        post.meta,
                        f'{errmsg}: {post.meta.get(checked_meta)}',
                        post.meta.txn,
                    )

def filter_search(postings: Iterable[data.Posting],
                  search_terms: Iterable[SearchTerm],
) -> Iterable[data.Posting]:
    accounts = tuple(AccrualAccount.account_names())
    postings = (post for post in postings if post.account.is_under(*accounts))
    for meta_key, pattern in search_terms:
        postings = filters.filter_meta_match(postings, meta_key, re.compile(pattern))
    return postings

def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
    parser = argparse.ArgumentParser(prog=PROGNAME)
    cliutil.add_version_argument(parser)
    parser.add_argument(
        '--report-type', '-t',
        metavar='NAME',
        type=ReportType.by_name,
        help="""The type of report to generate, either `balance` or `outgoing`.
If not specified, the default is `outgoing` for search criteria that return a
single outstanding payable, and `balance` any other time.
""")
    parser.add_argument(
        '--since',
        metavar='YEAR',
        type=int,
        default=-1,
        help="""How far back to search the books for related transactions.
You can either specify a fiscal year, or a negative offset from the current
fiscal year, to start loading entries from. The default is -1 (start from the
previous fiscal year).
""")
    cliutil.add_loglevel_argument(parser)
    parser.add_argument(
        'search',
        nargs=argparse.ZERO_OR_MORE,
        help="""Report on accruals that match this criteria. The format is
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
metadata to match. A single ticket number is a shortcut for
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
""")
    args = parser.parse_args(arglist)
    args.search_terms = [SearchTerm.parse(s) for s in args.search]
    return args

def main(arglist: Optional[Sequence[str]]=None,
         stdout: TextIO=sys.stdout,
         stderr: TextIO=sys.stderr,
         config: Optional[configmod.Config]=None,
) -> int:
    if cliutil.is_main_script(PROGNAME):
        global logger
        logger = logging.getLogger(PROGNAME)
        sys.excepthook = cliutil.ExceptHook(logger)
    args = parse_arguments(arglist)
    cliutil.setup_logger(logger, args.loglevel, stderr)
    if config is None:
        config = configmod.Config()
        config.load_file()
    books_loader = config.books_loader()
    if books_loader is not None:
        entries, load_errors, _ = books_loader.load_fy_range(args.since)
    else:
        entries = []
        source = {
            'filename': str(config.config_file_path()),
            'lineno': 1,
        }
        load_errors = [Error(source, "no books to load in configuration", None)]
    postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
    groups = core.RelatedPostings.group_by_meta(postings, 'invoice')
    groups = AccrualAccount.filter_paid_accruals(groups) or groups
    meta_errors = consistency_check(groups)
    returncode = 0
    for error in load_errors:
        bc_printer.print_error(error, file=stderr)
        returncode |= ReturnFlag.LOAD_ERRORS
    for error in meta_errors:
        bc_printer.print_error(error, file=stderr)
        returncode |= ReturnFlag.CONSISTENCY_ERRORS
    if args.report_type is None:
        args.report_type = ReportType.default_for(groups)
    if not groups:
        logger.warning("no matching entries found to report")
        returncode |= ReturnFlag.NOTHING_TO_REPORT
    report: Optional[BaseReport] = None
    if args.report_type is ReportType.OUTGOING:
        rt_client = config.rt_client()
        if rt_client is None:
            logger.error("unable to generate outgoing report: RT client is required")
        else:
            report = OutgoingReport(rt_client, stdout)
    else:
        report = args.report_type.value(stdout)
    if report is None:
        returncode |= ReturnFlag.REPORT_ERRORS
    else:
        report.run(groups)
    return 0 if returncode == 0 else 16 + returncode

if __name__ == '__main__':
    exit(main())