Changeset - 1cc4e732f4e2
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-10-21 14:51:18
brettcsmith@brettcsmith.org
accrual: Add --end option.

For assemble-audit-reports as shown.
2 files changed with 21 insertions and 3 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
 
#!/usr/bin/env python3
 
"""accrual-report - Status reports for accruals
 

	
 
accrual-report checks accruals (postings under Assets:Receivable and
 
Liabilities:Payable) for errors and metadata consistency, and reports any
 
problems on stderr. Then it writes a report about the status of those
 
accruals.
 

	
 
If you run it with no arguments, it will generate an aging report in ODS format.
 

	
 
Otherwise, the typical way to run it is to pass an RT ticket number or
 
invoice link as an argument, to report about accruals that match those
 
criteria::
 

	
 
    # Report all accruals associated with RT#1230:
 
    accrual-report 1230
 
    # Report all accruals with the invoice link rt:45/670.
 
    accrual-report 45/670
 
    # Report all accruals with the invoice link Invoice980.pdf.
 
    accrual-report Invoice980.pdf
 

	
 
By default, to stay fast, accrual-report only looks at unaudited books. You
 
can search further back in history by passing the ``--since`` argument. The
 
argument can be a fiscal year, or a negative number of how many years back
 
to search::
 

	
 
    # Search for accruals since 2016
 
    accrual-report --since 2016 [search terms …]
 
    # Search for accruals from the beginning of three fiscal years ago
 
    accrual-report --since -3 [search terms …]
 

	
 
If you want to further limit what accruals are reported, you can match on
 
other metadata by passing additional arguments in ``name=value`` format.
 
You can pass any number of search terms. For example::
 

	
 
    # Report accruals associated with RT#1230 and Jane Doe
 
    accrual-report 1230 entity=Doe-Jane
 

	
 
accrual-report will automatically decide what kind of report to generate
 
from the search terms you provide and the results they return. If you
 
searched on an RT ticket or invoice that returned a single outstanding
 
payable, it writes an outgoing approval report. If you searched on RT ticket
 
or invoice that returned other results, it writes a balance
 
report. Otherwise, it writes an aging report. You can specify what report
 
type you want with the ``--report-type`` option::
 

	
 
    # Write an outgoing approval report for all outstanding payables for
 
    # Jane Doe, even if there's more than one
 
    accrual-report --report-type outgoing entity=Doe-Jane
 
    # Write an aging report for a single RT invoice (this can be helpful when
 
    # one invoice covers multiple parties)
 
    accrual-report --report-type aging 12/345
 
"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import logging
 
import re
 
import sys
 

	
 
from pathlib import Path
 

	
 
from typing import (
 
    cast,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    Deque,
 
    Dict,
 
    Hashable,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    Match,
 
    NamedTuple,
 
    Optional,
 
    Sequence,
 
    Set,
 
    TextIO,
 
    Tuple,
 
    TypeVar,
 
    Union,
 
)
 
from ..beancount_types import (
 
    Entries,
 
    Error,
 
    Errors,
 
    MetaKey,
 
    MetaValue,
 
    Transaction,
 
)
 

	
 
import odf.element  # type:ignore[import]
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 
import rt
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from . import rewrite
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from .. import filters
 
from .. import rtutil
 

	
 
PROGNAME = 'accrual-report'
 

	
 
PostGroups = Mapping[Optional[Hashable], 'AccrualPostings']
 
T = TypeVar('T')
 

	
 
logger = logging.getLogger('conservancy_beancount.reports.accrual')
 

	
 
class Account(NamedTuple):
 
    name: str
 
    aging_thresholds: Sequence[int]
 

	
 

	
 
class AccrualAccount(enum.Enum):
 
    # Note the aging report uses the same order accounts are defined here.
 
    # See AgingODS.start_spreadsheet().
 
    RECEIVABLE = Account('Assets:Receivable', [365, 120, 90, 60])
 
    PAYABLE = Account('Liabilities:Payable', [365, 90, 60, 30])
 

	
 
    @classmethod
 
    def account_names(cls) -> Iterator[str]:
 
        return (acct.value.name for acct in cls)
 

	
 
    @classmethod
 
    def by_account(cls, name: data.Account) -> 'AccrualAccount':
 
        for account in cls:
 
            if name.is_under(account.value.name):
 
                return account
 
        raise ValueError(f"unrecognized account {name!r}")
 

	
 
    @classmethod
 
    def classify(cls, related: core.RelatedPostings) -> 'AccrualAccount':
 
        for account in cls:
 
            account_name = account.value.name
 
            if all(post.account.is_under(account_name) for post in related):
 
                return account
 
        raise ValueError("unrecognized account set in related postings")
 

	
 
    @property
 
    def normalize_amount(self) -> Callable[[T], T]:
 
        return core.normalize_amount_func(self.value.name)
 

	
 

	
 
class AccrualPostings(core.RelatedPostings):
 
    __slots__ = ()
 

	
 
    @classmethod
 
    def make_consistent(cls,
 
                        postings: Iterable[data.Posting],
 
    ) -> Iterator[Tuple[Hashable, 'AccrualPostings']]:
 
        accruals: Dict[Tuple[str, ...], List[data.Posting]] = collections.defaultdict(list)
 
        payments: Dict[Tuple[str, ...], Deque[data.Posting]] = collections.defaultdict(Deque)
 
        key: Tuple[str, ...]
 
        for post in postings:
 
            norm_func = core.normalize_amount_func(post.account)
 
            invoice = str(post.meta.get('invoice', 'BlankInvoice'))
 
            if norm_func(post.units.number) >= 0:
 
                entity = str(post.meta.get('entity', 'BlankEntity'))
 
                key = (post.meta.date.isoformat(), entity, invoice, post.account)
 
                accruals[key].append(post)
 
            else:
 
                key = (invoice, post.account)
 
                payments[key].append(post)
 

	
 
        for key, acc_posts in accruals.items():
 
            pay_posts = payments[key[2:]]
 
            if not pay_posts:
 
                continue
 
            norm_func = core.normalize_amount_func(key[-1])
 
            balance = norm_func(core.MutableBalance(post.at_cost() for post in acc_posts))
 
            while pay_posts and not balance.le_zero():
 
                pay_post = pay_posts.popleft()
 
                acc_posts.append(pay_post)
 
                balance += norm_func(pay_post.at_cost())
 
            if balance.le_zero() and not balance.is_zero():
 
                # pay_post causes the accrual to be overpaid. Split it into two
 
                # synthesized postings: one that causes the accrual to be
 
                # exactly zero, and one with the remainder back in payments.
 
                post_cost = pay_post.at_cost()
 
                # Calling norm_func() reverses the call in the while loop to add
 
                # the amount to the balance.
 
                overpayment = norm_func(balance[post_cost.currency])
 
                amt_to_zero = post_cost._replace(number=post_cost.number - overpayment.number)
 
                acc_posts[-1] = pay_post._replace(units=amt_to_zero, cost=None, price=None)
 
                pay_posts.appendleft(pay_post._replace(units=overpayment, cost=None, price=None))
 
            acc_posts.sort(key=lambda post: post.meta.date)
 

	
 
        for key, acc_posts in accruals.items():
 
            yield key, cls(acc_posts, _can_own=True)
 
        for key, pay_posts in payments.items():
 
            if pay_posts:
 
                yield key, cls(pay_posts, _can_own=True)
 

	
 
    def is_paid(self) -> Optional[bool]:
 
        try:
 
            accrual_type = AccrualAccount.classify(self)
 
        except ValueError:
 
            return None
 
        else:
 
            return accrual_type.normalize_amount(self.balance()).le_zero()
 

	
 

	
 
class BaseReport:
 
    def __init__(self, out_file: TextIO) -> None:
 
        self.out_file = out_file
 
        self.logger = logger.getChild(type(self).__name__)
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        raise NotImplementedError("BaseReport._report")
 

	
 
    def run(self, groups: PostGroups) -> None:
 
        for index, invoice in enumerate(groups):
 
            for line in self._report(groups[invoice], index):
 
                print(line, file=self.out_file)
 

	
 

	
 
class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
    AGE_COLORS = [
 
        '#ff00ff',
 
        '#ff0000',
 
        '#ff8800',
 
        '#ffff00',
 
        '#00ff00',
 
    ]
 
    DOC_COLUMNS = [
 
        'rt-id',
 
        'invoice',
 
        'approval',
 
        'contract',
 
        'purchase-order',
 
    ]
 
    COLUMNS = [
 
        'Date',
 
        data.Metadata.human_name('entity'),
 
        'Invoice Amount',
 
        'Booked Amount',
 
        data.Metadata.human_name('project'),
 
        *(data.Metadata.human_name(key) for key in DOC_COLUMNS),
 
    ]
 
    COL_COUNT = len(COLUMNS)
 

	
 
    def __init__(self,
 
                 rt_wrapper: rtutil.RT,
 
                 date: datetime.date,
 
                 logger: logging.Logger,
 
    ) -> None:
 
        super().__init__(rt_wrapper)
 
        self.date = date
 
        self.logger = logger
 

	
 
    def section_key(self, row: AccrualPostings) -> data.Account:
 
        return row[0].account
 

	
 
    def start_spreadsheet(self) -> None:
 
        for accrual_type in AccrualAccount:
 
            self.use_sheet(accrual_type.name.title())
 
            for index in range(self.COL_COUNT):
 
                if index == 0:
 
                    style: Union[str, odf.style.Style] = ''
 
                elif index < 6:
 
                    style = self.column_style(1.2)
 
                else:
 
                    style = self.column_style(1.5)
 
                self.sheet.addElement(odf.table.TableColumn(stylename=style))
 
            self.add_row(*(
 
                self.string_cell(name, stylename=self.style_bold)
 
                for name in self.COLUMNS
 
            ))
 
            self.lock_first_row()
 

	
 
    def start_section(self, key: data.Account) -> None:
 
        accrual_type = AccrualAccount.by_account(key)
 
        self.norm_func = accrual_type.normalize_amount
 
        self.age_thresholds = list(accrual_type.value.aging_thresholds)
 
        self.age_thresholds.append(-sys.maxsize)
 
        self.age_balances = [core.MutableBalance() for _ in self.age_thresholds]
 
        self.age_styles = [
 
            self.merge_styles(self.style_date, self.border_style(
 
                core.Border.LEFT, '10pt', 'solid', color,
 
            )) for color in self.AGE_COLORS
 
        ]
 
        acct_parts = key.slice_parts()
 
        self.use_sheet(acct_parts[1])
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"{' '.join(acct_parts[2:])} {acct_parts[1]} Aging Report"
 
            f" for {self.date.isoformat()}",
 
            stylename=self.merge_styles(self.style_bold, self.style_centertext),
 
            numbercolumnsspanned=self.COL_COUNT,
 
        ))
 
        self.add_row()
 

	
 
    def end_section(self, key: data.Account) -> None:
 
        total_balance = core.MutableBalance()
 
        text_span = 4
 
        last_age_text: Optional[str] = None
 
        self.add_row()
 
        for threshold, balance, style in zip(
 
                self.age_thresholds, self.age_balances, self.age_styles,
 
        ):
 
            years, days = divmod(threshold, 365)
 
            years_text = f"{years} {'Year' if years == 1 else 'Years'}"
 
            days_text = f"{days} Days"
 
            if years and days:
 
                age_text = f"{years_text} {days_text}"
 
            elif years:
 
                age_text = years_text
 
            else:
 
                age_text = days_text
 
            if last_age_text is None:
 
                age_range = f"Over {age_text}"
 
            elif threshold < 0:
 
                self.add_row(
 
                    self.string_cell(
 
                        f"Total Unpaid Over {last_age_text}: ",
 
                        stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
                        numbercolumnsspanned=text_span,
 
                    ),
 
                    *(odf.table.TableCell() for _ in range(1, text_span)),
 
                    self.balance_cell(total_balance, stylename=self.style_total),
 
                )
 
                age_range = f"Under {last_age_text}"
 
            else:
 
                age_range = f"{age_text}–{last_age_text}"
 
            self.add_row(
 
                self.string_cell(
 
                    f"Total Aged {age_range}: ",
 
                    stylename=self.merge_styles(self.style_bold, self.style_endtext, style),
 
                    numbercolumnsspanned=text_span,
 
                ),
 
                *(odf.table.TableCell() for _ in range(1, text_span)),
 
                self.balance_cell(balance),
 
            )
 
            last_age_text = age_text
 
            total_balance += balance
 
        self.add_row(
 
            self.string_cell(
 
                "Total Unpaid: ",
 
                stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
                numbercolumnsspanned=text_span,
 
            ),
 
            *(odf.table.TableCell() for _ in range(1, text_span)),
 
            self.balance_cell(total_balance, stylename=self.style_bottomline),
 
        )
 

	
 
    def write_row(self, row: AccrualPostings) -> None:
 
        row_date = row[0].meta.date
 
        row_balance = self.norm_func(row.balance_at_cost())
 
        age = (self.date - row_date).days
 
        for index, threshold in enumerate(self.age_thresholds):
 
            if age >= threshold:
 
                if row_balance.ge_zero():
 
                    self.age_balances[index] += row_balance
 
                break
 
        else:
 
            return
 
        raw_balance = self.norm_func(row.balance())
 
        if raw_balance == row_balance:
 
            amount_cell = odf.table.TableCell()
 
        else:
 
            amount_cell = self.balance_cell(raw_balance)
 
        entities = row.meta_values('entity')
 
        entities.discard(None)
 
        projects = row.meta_values('project')
 
        projects.discard(None)
 
        self.add_row(
 
            self.date_cell(row_date, stylename=self.age_styles[index]),
 
            self.multiline_cell(sorted(entities)),
 
            amount_cell,
 
            self.balance_cell(row_balance),
 
            self.multiline_cell(sorted(projects)),
 
            *(self.meta_links_cell(row.all_meta_links(key))
 
              for key in self.DOC_COLUMNS),
 
        )
 

	
 

	
 
class AgingReport(BaseReport):
 
    def __init__(self,
 
                 rt_wrapper: rtutil.RT,
 
                 out_file: BinaryIO,
 
                 date: Optional[datetime.date]=None,
 
    ) -> None:
 
        if date is None:
 
            date = datetime.date.today()
 
        self.out_bin = out_file
 
        self.logger = logger.getChild(type(self).__name__)
 
        self.ods = AgingODS(rt_wrapper, date, self.logger)
 

	
 
    def run(self, groups: PostGroups) -> None:
 
        rows = [group for group in groups.values()
 
                if not group.balance_at_cost().is_zero()]
 
        rows.sort(key=lambda group: (
 
            group[0].account,
 
            group[0].meta.date,
 
            abs(sum(amt.number for amt in group.balance_at_cost().values())),
 
        ))
 
        self.ods.write(rows)
 
        self.ods.save_file(self.out_bin)
 

	
 

	
 
class BalanceReport(BaseReport):
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        meta = posts[0].meta
 
        date_s = meta.date.strftime('%Y-%m-%d')
 
        entity_s = meta.get('entity', '<no entity>')
 
        invoice_s = meta.get('invoice', '<no invoice>')
 
        balance_s = posts.balance_at_cost().format(zero="Zero balance")
 
        if index:
 
            yield ""
 
        yield f"{entity_s} {invoice_s}:"
 
        yield f"  {balance_s} outstanding since {date_s}"
 

	
 

	
 
class OutgoingReport(BaseReport):
 
    class PaymentMethods(enum.Enum):
 
        ach = 'ACH'
 
        check = 'Check'
 
        creditcard = 'Credit Card'
 
        credit_card = creditcard
 
        debitcard = 'Debit Card'
 
        debit_card = debitcard
 
        echeck = 'E-Check'
 
        e_check = echeck
 
        paypal = 'PayPal'
 
        pay_pal = paypal
 
        vendorportal = 'Vendor Portal'
 
        vendor_portal = vendorportal
 
        wire = 'Wire'
 
        fxwire = wire
 
        fx_wire = fxwire
 
        uswire = wire
 
        us_wire = uswire
 

	
 

	
 
    def __init__(self, rt_wrapper: rtutil.RT, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_wrapper = rt_wrapper
 
        self.rt_client = rt_wrapper.rt
 

	
 
    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
 
        rt_ids = posts.first_meta_links('rt-id')
 
        rt_id = next(rt_ids, None)
 
        rt_id2 = next(rt_ids, None)
 
        if rt_id is None:
 
            raise ValueError("no rt-id links found")
 
        elif rt_id2 is not None:
 
            raise ValueError("multiple rt-id links found")
 
        parsed = rtutil.RT.parse(rt_id)
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _get_payment_method(self, posts: AccrualPostings, ticket_id: str) -> Optional[str]:
 
        payment_methods = posts.meta_values('payment-method')
 
        payment_methods.discard(None)
 
        if all(isinstance(s, str) for s in payment_methods):
 
            # type ignore for <https://github.com/python/mypy/issues/7853>
 
            payment_methods = {s.strip().lower() for s in payment_methods}  # type:ignore[union-attr]
 
        log_prefix = f"cannot set payment-method for rt:{ticket_id}:"
 
        payment_method_count = len(payment_methods)
 
        if payment_method_count != 1:
 
            self.logger.warning("%s %s metadata values found",
 
                                log_prefix, payment_method_count)
 
            return None
 
        payment_method = payment_methods.pop()
 
        if not isinstance(payment_method, str):
 
            self.logger.warning("%s %r is not a string value",
 
                                log_prefix, payment_method)
 
            return None
 
        try:
 
            currency, method_key = payment_method.split(None, 1)
 
        except ValueError:
 
            self.logger.warning("%s no method specified in %r",
 
                                log_prefix, payment_method)
 
            return None
 
        curr_match = re.fullmatch(r'[a-z]{3}', currency)
 
        if curr_match is None:
 
            self.logger.warning("%s invalid currency %r",
 
                                log_prefix, currency)
 
        try:
 
            method_enum = self.PaymentMethods[re.sub(r'[- ]', '_', method_key)]
 
        except KeyError:
 
            self.logger.warning("%s invalid method %r",
 
                                log_prefix, method_key)
 
            curr_match = None
 
        if curr_match is None:
 
            return None
 
        else:
 
            return f'{currency.upper()} {method_enum.value}'
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            meta = posts[0].meta
 
            self.logger.error(
 
                "can't generate outgoings report for %s %s %s because no RT ticket available: %s",
 
                meta.date.isoformat(),
 
                meta.get('entity', '<no entity>'),
 
                meta.get('invoice', '<no invoice>'),
 
                errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
 
                or ''
 
            )
 
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
 

	
 
        last_zero_index = -1
 
        for index, (post, balance) in enumerate(posts.iter_with_balance()):
 
            if balance.is_zero():
 
                prior_zero_index = last_zero_index
 
                last_zero_index = index
 
        if last_zero_index == index:
 
            last_zero_index = prior_zero_index
 
        posts = posts[last_zero_index + 1:]
 

	
 
        balance = -posts.balance_at_cost()
 
        balance_s = balance.format(None)
 
        raw_balance = -posts.balance()
 
        payment_amount = raw_balance.format('¤¤ #,##0.00')
 
        if raw_balance != balance:
 
            payment_amount += f' ({balance_s})'
 
            balance_s = f'{raw_balance} ({balance_s})'
 

	
 
        payment_to = ticket.get('CF.{payment-to}') or requestor_name
 
        contract_links = list(posts.all_meta_links('contract'))
 
        if contract_links:
 
            contract_s = ' , '.join(self.rt_wrapper.iter_urls(
 
                contract_links, missing_fmt='<BROKEN RT LINK: {}>',
 
            ))
 
        else:
 
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
 
        projects = [v for v in posts.meta_values('project')
 
                    if isinstance(v, str)]
 

	
 
        yield "PAYMENT FOR APPROVAL:"
 
        yield f"REQUESTOR: {requestor}"
 
        yield f"PAYMENT TO: {payment_to}"
 
        yield f"TOTAL TO PAY: {balance_s}"
 
        yield f"AGREEMENT: {contract_s}"
 
        yield f"PROJECT: {', '.join(projects)}"
 
        yield "\nBEANCOUNT ENTRIES:\n"
 

	
 
        last_txn: Optional[Transaction] = None
 
        for post in posts:
 
            txn = post.meta.txn
 
            if txn is not last_txn:
 
                last_txn = txn
 
                txn = self.rt_wrapper.txn_with_urls(txn, '{}')
 
                # Suppress payment-method metadata from the report.
 
                txn.meta.pop('payment-method', None)
 
                for txn_post in txn.postings:
 
                    if txn_post.meta:
 
                        txn_post.meta.pop('payment-method', None)
 
                yield bc_printer.format_entry(txn)
 

	
 
        cf_targets = {
 
            'payment-amount': payment_amount,
 
            'payment-method': (self._get_payment_method(posts, ticket_id)
 
                               or ticket.get('CF.{payment-method}')),
 
            'payment-to': payment_to,
 
        }
 

	
 
        cf_updates = {
 
            f'CF_{key}': value
 
            for key, value in cf_targets.items()
 
            if ticket.get(f'CF.{{{key}}}') != value
 
        }
 
        if cf_updates:
 
            try:
 
                ok = self.rt_client.edit_ticket(ticket_id, **cf_updates)
 
            except rt.RtError:
 
                self.logger.debug("RT exception on edit_ticket", exc_info=True)
 
                ok = False
 
            if not ok:
 
                self.logger.warning("failed to set custom fields for rt:%s", ticket_id)
 

	
 

	
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, one of `aging`, `balance`, or
 
`outgoing`. If not specified, the default is `aging` when no search terms are
 
given, `outgoing` for search terms that return a single outstanding payable,
 
and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
 
        type=int,
 
        default=0,
 
        help="""How far back to search the books for related transactions.
 
You can either specify a fiscal year, or a negative offset from the current
 
fiscal year, to start loading entries from. The default is to load the current,
 
unaudited books.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Do not consider entries from this date forward, in YYYY-MM-DD
 
format.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for the balance and outgoing reports, and a generated
 
filename for other reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None and not any(
 
            term.meta_key == 'invoice' or term.meta_key == 'rt-id'
 
            for term in args.search_terms
 
    ):
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        load_since = None if args.report_type == ReportType.AGING else args.since
 
        entries, load_errors, _ = books_loader.load_all(load_since)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 
    filters.remove_opening_balance_txn(entries)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 

	
 
    postings_src = data.Posting.from_entries(entries)
 
    stop_date = args.stop_date or datetime.date(datetime.MAXYEAR, 12, 31)
 
    postings_src: Iterator[data.Posting] = (
 
        posting
 
        for posting in data.Posting.from_entries(entries)
 
        if posting.meta.date < stop_date
 
    )
 
    for rewrite_path in args.rewrite_rules:
 
        try:
 
            ruleset = rewrite.RewriteRuleset.from_yaml(rewrite_path)
 
        except ValueError as error:
 
            logger.critical("failed loading rewrite rules from %s: %s",
 
                            rewrite_path, error.args[0])
 
            return cliutil.ExitCode.RewriteRulesError
 
        postings_src = ruleset.rewrite(postings_src)
 
    postings = list(filter_search(postings_src, args.search_terms))
 
    if not postings:
 
        logger.warning("no matching entries found to report")
 
        returncode = returncode or cliutil.ExitCode.NoDataFiltered
 
    # groups is a mapping of metadata value strings to AccrualPostings.
 
    # The keys are basically arbitrary, the report classes don't rely on them,
 
    # but they do help symbolize what's being grouped.
 
    # For the outgoing approval report, groups maps rt-id link strings to
 
    # associated accruals.
 
    # For all other reports, groups comes from AccrualReport.make_consistent().
 
    groups: PostGroups
 
    if args.report_type is None or args.report_type is ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.group_by_first_meta_link(postings, 'rt-id'))
 
        if args.report_type is None and len(groups) == 1:
 
            key = next(iter(groups))
 
            group = groups[key]
 
            account = group[0].account
 
            if (AccrualAccount.by_account(account) is AccrualAccount.PAYABLE
 
                and all(post.account == account for post in group)
 
                and not group.balance().ge_zero()
 
                and key):  # Make sure we have a usable rt-id
 
                args.report_type = ReportType.OUTGOING
 
    if args.report_type is not ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.make_consistent(postings))
 
    if args.report_type is not ReportType.AGING:
 
        groups = {
 
            key: posts for key, posts in groups.items() if not posts.is_paid()
 
        } or groups
 
    del postings
 

	
 
    report: Optional[BaseReport] = None
 
    output_path: Optional[Path] = None
 
    if args.report_type is ReportType.AGING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate aging report: RT client is required")
 
        else:
 
            if args.output_file is None:
 
                now = datetime.datetime.now()
 
                out_dir_path = config.repository_path() or Path()
 
                args.output_file = out_dir_path / now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods')
 
                logger.info("Writing report to %s", args.output_file)
 
            out_bin = cliutil.bytes_output(args.output_file, stdout)
 
            report = AgingReport(rt_wrapper, out_bin)
 
            report = AgingReport(rt_wrapper, out_bin, args.stop_date)
 
            report.ods.set_common_properties(config.books_repo())
 
    elif args.report_type is ReportType.OUTGOING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate outgoing report: RT client is required")
 
        else:
 
            out_file = cliutil.text_output(args.output_file, stdout)
 
            report = OutgoingReport(rt_wrapper, out_file)
 
    else:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = BalanceReport(out_file)
 

	
 
    if report is None:
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        report.run(groups)
 
    return returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/tools/audit_report.py
Show inline comments
 
"""audit_report.py - Utility to run all reports for an audit
 

	
 
This tool generates all reports for a fiscal year, plus the following months
 
available, to submit for audit. It parallelizes the work to run as efficiently
 
as possible.
 
"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import concurrent.futures as futmod
 
import datetime
 
import logging
 
import os
 
import runpy
 
import sys
 
import tempfile
 

	
 
from pathlib import Path
 

	
 
from typing import (
 
    Callable,
 
    Iterator,
 
    List,
 
    Optional,
 
    Sequence,
 
    Set,
 
    TextIO,
 
    Tuple,
 
)
 
from types import (
 
    ModuleType,
 
)
 

	
 
from . import extract_odf_links
 
from .. import cliutil
 
from .. import config as configmod
 
from ..reports import accrual
 
from ..reports import balance_sheet
 
from ..reports import fund
 
from ..reports import ledger
 

	
 
from beancount.scripts import check as bc_check
 

	
 
ArgList = List[str]
 
ReportFunc = Callable[[ArgList, TextIO, TextIO, configmod.Config], int]
 

	
 
PROGNAME = 'audit-report'
 
logger = logging.getLogger('conservancy_beancount.tools.audit_report')
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        '--verbose', '-v',
 
        action='store_true',
 
        help="""Display progress information
 
""")
 
    cliutil.add_jobs_argument(parser)
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--output-directory', '-O',
 
        metavar='DIR',
 
        type=Path,
 
        help="""Write all reports to this directory.
 
Default is a newly-created directory under your repository.
 
""")
 
    parser.add_argument(
 
        '--force',
 
        action='store_true',
 
        help="""Run reports even if bean-check reports errors.
 
""")
 
    parser.add_argument(
 
        'audit_year',
 
        metavar='YEAR',
 
        nargs='?',
 
        type=cliutil.year_or_date_arg,
 
        help="""Main fiscal year to generate reports for.
 
Defaults to the last complete fiscal year.
 
""")
 
    parser.add_argument(
 
        'end_date',
 
        metavar='END',
 
        nargs='?',
 
        type=cliutil.date_arg,
 
        help="""End date for reports for the following fiscal year.
 
The default is automatically calculated from today's date.
 
""")
 
    args = parser.parse_args(arglist)
 
    args.arg_error = parser.error
 
    return args
 

	
 
def now_s() -> str:
 
    return datetime.datetime.now().isoformat(sep=' ', timespec='seconds')
 

	
 
def bean_check(books_path: Path) -> int:
 
    sys.argv = ['bean-check', str(books_path)]
 
    logger.debug("running %r", sys.argv)
 
    # bean-check logs timing information to the root logger at INFO level.
 
    # Suppress that.
 
    logging.getLogger().setLevel(logging.WARNING)
 
    return bc_check.main()  # type:ignore[no-any-return]
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if args.verbose:
 
        logger.setLevel(logging.DEBUG)
 
        reports_logger = logging.getLogger('conservancy_beancount.reports')
 
        reports_logger.setLevel(logging.DEBUG)
 
        reports_logger.getChild('rewrite').setLevel(args.loglevel)
 

	
 
    fy = config.fiscal_year_begin()
 
    today = datetime.date.today()
 
    if args.audit_year is None:
 
        args.audit_year = fy.for_date(today) - 1
 
    audit_begin = fy.first_date(args.audit_year)
 
    audit_end = fy.next_fy_date(args.audit_year)
 
    if args.end_date is None:
 
        days_diff = (today - audit_end).days
 
        if days_diff < (28 * 2):
 
            args.end_date = today
 
        elif days_diff >= 365:
 
            args.end_date = fy.next_fy_date(args.audit_year + 1)
 
        else:
 
            end_date = today - datetime.timedelta(days=today.day + 1)
 
            args.end_date = end_date.replace(day=1)
 
    if args.end_date < audit_end:
 
        args.arg_error("end date is within audited fiscal year")
 
    next_year = fy.for_date(args.end_date)
 
    repo_path = config.repository_path()
 

	
 
    if args.output_directory is None:
 
        args.output_directory = Path(tempfile.mkdtemp(
 
            prefix=f'FY{args.audit_year}AuditReports.', dir=repo_path,
 
        ))
 
        logger.info("writing reports to %s", args.output_directory)
 
    else:
 
        args.output_directory.mkdir(exist_ok=True)
 
    output_reports: List[Path] = []
 
    def common_args(out_name: str, year: Optional[int]=None, *arglist: str) -> Iterator[str]:
 
        if year is not None:
 
            out_name = f'FY{year}{out_name}.ods'
 
        if year == args.audit_year:
 
            yield f'--begin={audit_begin.isoformat()}'
 
            yield f'--end={audit_end.isoformat()}'
 
        elif year == next_year:
 
            yield f'--begin={audit_end.isoformat()}'
 
            yield f'--end={args.end_date.isoformat()}'
 
        elif year is not None:
 
            raise ValueError(f"unknown year {year!r}")
 
        out_path = args.output_directory / out_name
 
        output_reports.append(out_path)
 
        for path in args.rewrite_rules:
 
            yield f'--rewrite-rules={path}'
 
        yield f'--output-file={out_path}'
 
        yield from arglist
 
    reports: List[Tuple[ReportFunc, ArgList]] = [
 
        # Reports are sorted roughly in descending order of how long each takes
 
        # to generate.
 
        (ledger.main, list(common_args('GeneralLedger', args.audit_year))),
 
        (ledger.main, list(common_args('GeneralLedger', next_year))),
 
        (ledger.main, list(common_args('Disbursements', args.audit_year, '--disbursements'))),
 
        (ledger.main, list(common_args('Receipts', args.audit_year, '--receipts'))),
 
        (ledger.main, list(common_args('Disbursements', next_year, '--disbursements'))),
 
        (ledger.main, list(common_args('Receipts', next_year, '--receipts'))),
 
        (accrual.main, list(common_args('AgingReport.ods'))),
 
        (accrual.main, list(common_args(f'FY{next_year}AgingReport.ods'))),
 
        (accrual.main, list(common_args(
 
            f'FY{args.audit_year}AgingReport.ods',
 
            None,
 
            f'--end={audit_end.isoformat()}',
 
        ))),
 
        (balance_sheet.main, list(common_args('Summary', args.audit_year))),
 
        (fund.main, list(common_args('FundReport', args.audit_year))),
 
        (fund.main, list(common_args('FundReport', next_year))),
 
    ]
 

	
 
    books = config.books_loader()
 
    if books is None:
 
        logger.critical("no books available to load")
 
        return os.EX_NOINPUT
 

	
 
    with futmod.ProcessPoolExecutor(args.jobs) as pool:
 
        logger.debug("%s: process pool ready with %s workers", now_s(), args.jobs)
 
        fy_paths = books._iter_fy_books(fy.range(args.audit_year - 1, args.end_date))
 
        check_results = pool.map(bean_check, fy_paths)
 
        if all(exitcode == 0 for exitcode in check_results):
 
            logger.debug("%s: bean-check passed", now_s())
 
        else:
 
            logger.log(
 
                logging.WARNING if args.force else logging.ERROR,
 
                "%s: bean-check failed",
 
                now_s(),
 
            )
 
            if not args.force:
 
                return os.EX_DATAERR
 

	
 
        report_results = [
 
            pool.submit(report_func, arglist, config=config)
 
            for report_func, arglist in reports
 
        ]
 
        report_errors = [res.result() for res in report_results if res.result() != 0]
 
        if not report_errors:
 
            logger.debug("%s: all reports generated", now_s())
 
        else:
 
            logger.error("%s: %s reports generated errors", now_s(), len(report_errors))
 
            if not args.force:
 
                return max(report_errors)
 
    return os.EX_OK
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
0 comments (0 inline, 0 general)