Files @ 18800b249d2c
Branch filter:

Location: NPO-Accounting/conservancy_beancount/conservancy_beancount/reports/accrual.py

Brett Smith
config: Let user specify books dir with ~.
#!/usr/bin/env python3
"""accrual.py - Various reports about accruals"""
# Copyright © 2020  Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import argparse
import datetime
import enum
import collections
import re
import sys

from typing import (
    Callable,
    Dict,
    Iterable,
    Iterator,
    Mapping,
    NamedTuple,
    Optional,
    Sequence,
    Set,
    TextIO,
    Tuple,
)
from ..beancount_types import (
    Error,
    MetaKey,
    MetaValue,
    Transaction,
)

import rt

from beancount.parser import printer as bc_printer

from . import core
from .. import config as configmod
from .. import data
from .. import filters
from .. import rtutil

PostGroups = Mapping[Optional[MetaValue], core.RelatedPostings]
ReportFunc = Callable[
    [PostGroups, TextIO, TextIO, Optional[rt.Rt], Optional[rtutil.RT]],
    None
]
RTObject = Mapping[str, str]

class ReportType:
    NAMES: Set[str] = set()
    BY_NAME: Dict[str, ReportFunc] = {}

    @classmethod
    def register(cls, *names: str) -> Callable[[ReportFunc], ReportFunc]:
        def register_wrapper(func: ReportFunc) -> ReportFunc:
            for name in names:
                cls.BY_NAME[name] = func
            cls.NAMES.add(names[0])
            return func
        return register_wrapper

    @classmethod
    def by_name(cls, name: str) -> ReportFunc:
        try:
            return cls.BY_NAME[name.lower()]
        except KeyError:
            raise ValueError(f"unknown report type {name!r}") from None

    @classmethod
    def default_for(cls, groups: PostGroups) -> Tuple[ReportFunc, PostGroups]:
        nonzero_groups = {
            key: group for key, group in groups.items()
            if not group.balance().is_zero()
        }
        if len(nonzero_groups) == 1 and all(
                post.account.is_under('Liabilities')
                for group in nonzero_groups.values()
                for post in group
        ):
            report_name = 'outgoing'
        else:
            report_name = 'balance'
        return cls.BY_NAME[report_name], nonzero_groups or groups


class ReturnFlag(enum.IntFlag):
    LOAD_ERRORS = 1
    CONSISTENCY_ERRORS = 2
    REPORT_ERRORS = 4
    NOTHING_TO_REPORT = 8


class SearchTerm(NamedTuple):
    meta_key: MetaKey
    pattern: str

    @classmethod
    def parse(cls, s: str) -> 'SearchTerm':
        key_match = re.match(r'^[a-z][-\w]*=', s)
        key: Optional[str]
        if key_match:
            key, _, raw_link = s.partition('=')
        else:
            key = None
            raw_link = s
        rt_ids = rtutil.RT.parse(raw_link)
        if rt_ids is None:
            rt_ids = rtutil.RT.parse('rt:' + raw_link)
        if rt_ids is None:
            if key is None:
                key = 'invoice'
            pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
        else:
            ticket_id, attachment_id = rt_ids
            if key is None:
                key = 'rt-id' if attachment_id is None else 'invoice'
            pattern = rtutil.RT.metadata_regexp(
                ticket_id,
                attachment_id,
                first_link_only=key == 'rt-id' and attachment_id is None,
            )
        return cls(key, pattern)


def consistency_check(groups: PostGroups) -> Iterable[Error]:
    for key, related in groups.items():
        for checked_meta in ['contract', 'entity', 'purchase-order']:
            meta_values = related.meta_values(checked_meta)
            if len(meta_values) != 1:
                errmsg = f'inconsistent {checked_meta} for invoice {key}'
                for post in related:
                    yield Error(
                        post.meta,
                        f'{errmsg}: {post.meta.get(checked_meta)}',
                        post.meta.txn,
                    )

def _since_last_nonzero(posts: core.RelatedPostings) -> core.RelatedPostings:
    retval = core.RelatedPostings()
    for post in posts:
        if retval.balance().is_zero():
            retval.clear()
        retval.add(post)
    return retval

@ReportType.register('balance', 'bal')
def balance_report(groups: PostGroups,
                   out_file: TextIO,
                   err_file: TextIO=sys.stderr,
                   rt_client: Optional[rt.Rt]=None,
                   rt_wrapper: Optional[rtutil.RT]=None,
) -> None:
    prefix = ''
    for invoice, related in groups.items():
        related = _since_last_nonzero(related)
        balance = related.balance()
        date_s = related[0].meta.date.strftime('%Y-%m-%d')
        print(
            f"{prefix}{invoice}:",
            f"  {balance} outstanding since {date_s}",
            sep='\n', file=out_file,
        )
        prefix = '\n'

def _primary_rt_id(related: core.RelatedPostings) -> rtutil.TicketAttachmentIds:
    rt_ids = related.all_meta_links('rt-id')
    rt_ids_count = len(rt_ids)
    if rt_ids_count != 1:
        raise ValueError(f"{rt_ids_count} rt-id links found")
    parsed = rtutil.RT.parse(rt_ids.pop())
    if parsed is None:
        raise ValueError("rt-id is not a valid RT reference")
    else:
        return parsed

@ReportType.register('outgoing', 'outgoings', 'out')
def outgoing_report(groups: PostGroups,
                    out_file: TextIO,
                    err_file: TextIO=sys.stderr,
                    rt_client: Optional[rt.Rt]=None,
                    rt_wrapper: Optional[rtutil.RT]=None,
) -> None:
    if rt_client is None or rt_wrapper is None:
        raise ValueError("RT client is required but not configured")
    for invoice, related in groups.items():
        related = _since_last_nonzero(related)
        try:
            ticket_id, _ = _primary_rt_id(related)
            ticket = rt_client.get_ticket(ticket_id)
            # Note we only use this when ticket is None.
            errmsg = f"ticket {ticket_id} not found"
        except (ValueError, rt.RtError) as error:
            ticket = None
            errmsg = error.args[0]
        if ticket is None:
            print("error: can't generate outgoings report for {}"
                  " because no RT ticket available: {}".format(
                      invoice, errmsg,
                  ), file=err_file)
            continue

        try:
            rt_requestor = rt_client.get_user(ticket['Requestors'][0])
        except (IndexError, rt.RtError):
            rt_requestor = None
        if rt_requestor is None:
            requestor = ''
            requestor_name = ''
        else:
            requestor = '{RealName} <{EmailAddress}>'.format_map(rt_requestor)
            requestor_name = rt_requestor['RealName']

        contract_links = related.all_meta_links('contract')
        if contract_links:
            contract_s = ' , '.join(rt_wrapper.iter_urls(
                contract_links, '<{}>', '{}', '<BROKEN RT LINK: {}>',
            ))
        else:
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
        projects = [v for v in related.meta_values('project')
                    if isinstance(v, str)]

        print(
            "PAYMENT FOR APPROVAL:",
            f"REQUESTOR: {requestor}",
            f"TOTAL TO PAY: {-related.balance()}",
            f"AGREEMENT: {contract_s}",
            f"PAYMENT TO: {ticket.get('CF.{payment-to}', requestor_name)}",
            f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}",
            f"PROJECT: {', '.join(projects)}",
            "\nBEANCOUNT ENTRIES:",
            sep='\n', file=out_file,
        )

        last_txn: Optional[Transaction] = None
        for post in related:
            txn = post.meta.txn
            if txn is not last_txn:
                last_txn = txn
                txn = rt_wrapper.txn_with_urls(txn)
                bc_printer.print_entry(txn, file=out_file)

def filter_search(postings: Iterable[data.Posting],
                  search_terms: Iterable[SearchTerm],
) -> Iterable[data.Posting]:
    postings = (post for post in postings if post.account.is_under(
        'Assets:Receivable', 'Liabilities:Payable',
    ))
    for meta_key, pattern in search_terms:
        postings = filters.filter_meta_match(postings, meta_key, re.compile(pattern))
    return postings

def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--report-type', '-t',
        metavar='NAME',
        type=ReportType.by_name,
        help="""The type of report to generate, either `balance` or `outgoing`.
If not specified, the default is `outgoing` for search criteria that return a
single outstanding payable, and `balance` any other time.
""")
    parser.add_argument(
        '--since',
        metavar='YEAR',
        type=int,
        default=-1,
        help="""How far back to search the books for related transactions.
You can either specify a fiscal year, or a negative offset from the current
fiscal year, to start loading entries from. The default is -1 (start from the
previous fiscal year).
""")
    parser.add_argument(
        'search',
        nargs=argparse.ZERO_OR_MORE,
        help="""Report on accruals that match this criteria. The format is
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
metadata to match. A single ticket number is a shortcut for
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
""")
    args = parser.parse_args(arglist)
    args.search_terms = [SearchTerm.parse(s) for s in args.search]
    return args

def main(arglist: Optional[Sequence[str]]=None,
         stdout: TextIO=sys.stdout,
         stderr: TextIO=sys.stderr,
         config: Optional[configmod.Config]=None,
) -> int:
    returncode = 0
    args = parse_arguments(arglist)
    if config is None:
        config = configmod.Config()
        config.load_file()
    books_loader = config.books_loader()
    if books_loader is not None:
        entries, load_errors, _ = books_loader.load_fy_range(args.since)
    else:
        entries = []
        source = {
            'filename': str(config.config_file_path()),
            'lineno': 1,
        }
        load_errors = [Error(source, "no books to load in configuration", None)]
    postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
    groups = core.RelatedPostings.group_by_meta(postings, 'invoice')
    meta_errors = consistency_check(groups)
    for error in load_errors:
        bc_printer.print_error(error, file=stderr)
        returncode |= ReturnFlag.LOAD_ERRORS
    for error in meta_errors:
        bc_printer.print_error(error, file=stderr)
        returncode |= ReturnFlag.CONSISTENCY_ERRORS
    if args.report_type is None:
        args.report_type, groups = ReportType.default_for(groups)
    if not groups:
        print("warning: no matching entries found to report", file=stderr)
        returncode |= ReturnFlag.NOTHING_TO_REPORT
    else:
        try:
            args.report_type(
                groups,
                stdout,
                stderr,
                config.rt_client(),
                config.rt_wrapper(),
            )
        except ValueError as exc:
            print("error: unable to generate {}: {}".format(
                args.report_type.__name__.replace('_', ' '),
                exc.args[0],
            ), file=stderr)
            returncode |= ReturnFlag.REPORT_ERRORS
    return 0 if returncode == 0 else 16 + returncode

if __name__ == '__main__':
    exit(main())