Changeset - aef00ce83f20
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-05-30 14:35:29
brettcsmith@brettcsmith.org
accrual: Check the consistency of accruals' cost.
3 files changed with 34 insertions and 10 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
 
#!/usr/bin/env python3
 
"""accrual-report - Status reports for accruals
 

	
 
accrual-report checks accruals (postings under Assets:Receivable and
 
Liabilities:Payable) for errors and metadata consistency, and reports any
 
problems on stderr. Then it writes a report about the status of those
 
accruals on stdout.
 

	
 
The typical way to run it is to pass an RT ticket number or invoice link as an
 
argument::
 

	
 
    # Report all accruals associated with RT#1230:
 
    accrual-report 1230
 
    # Report all accruals with the invoice link rt:45/670.
 
    accrual-report 45/670
 
    # Report all accruals with the invoice link Invoice980.pdf.
 
    accrual-report Invoice980.pdf
 

	
 
By default, to stay fast, accrual-report only looks for postings from the
 
beginning of the last fiscal year. You can search further back in history
 
by passing the ``--since`` argument. The argument can be a fiscal year, or
 
a negative number of how many years back to search::
 

	
 
    # Search for accruals since 2016
 
    accrual-report --since 2016 [search terms …]
 
    # Search for accruals from the beginning of three fiscal years ago
 
    accrual-report --since -3 [search terms …]
 

	
 
If you want to further limit what accruals are reported, you can match on
 
other metadata by passing additional arguments in ``name=value`` format.
 
You can pass any number of search terms. For example::
 

	
 
    # Report accruals associated with RT#1230 and Jane Doe
 
    accrual-report 1230 entity=Doe-Jane
 

	
 
accrual-report will automatically decide what kind of report to generate from
 
the results of your search. If the search matches a single outstanding payable,
 
it will write an outgoing approval report; otherwise, it writes a basic balance
 
report. You can request a specific report type with the ``--report-type``
 
option::
 

	
 
    # Write an outgoing approval report for all outstanding accruals for
 
    # Jane Doe, even if there's more than one
 
    accrual-report --report-type outgoing entity=Doe-Jane
 
"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import logging
 
import re
 
import sys
 

	
 
from typing import (
 
    Any,
 
    Callable,
 
    Dict,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    NamedTuple,
 
    Optional,
 
    Sequence,
 
    Set,
 
    TextIO,
 
    Tuple,
 
)
 
from ..beancount_types import (
 
    Error,
 
    MetaKey,
 
    MetaValue,
 
    Transaction,
 
)
 

	
 
import rt
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from .. import filters
 
from .. import rtutil
 

	
 
PROGNAME = 'accrual-report'
 

	
 
PostGroups = Mapping[Optional[MetaValue], core.RelatedPostings]
 
RTObject = Mapping[str, str]
 

	
 
logger = logging.getLogger('conservancy_beancount.reports.accrual')
 

	
 
class Account(NamedTuple):
 
    name: str
 
    balance_paid: Callable[[core.Balance], bool]
 

	
 

	
 
class AccrualAccount(enum.Enum):
 
    PAYABLE = Account('Liabilities:Payable', core.Balance.ge_zero)
 
    RECEIVABLE = Account('Assets:Receivable', core.Balance.le_zero)
 

	
 
    @classmethod
 
    def account_names(cls) -> Iterator[str]:
 
        return (acct.value.name for acct in cls)
 

	
 
    @classmethod
 
    def classify(cls, related: core.RelatedPostings) -> 'AccrualAccount':
 
        for account in cls:
 
            account_name = account.value.name
 
            if all(post.account.is_under(account_name) for post in related):
 
                return account
 
        raise ValueError("unrecognized account set in related postings")
 

	
 
    @classmethod
 
    def filter_paid_accruals(cls, groups: PostGroups) -> PostGroups:
 
        return {
 
            key: related
 
            for key, related in groups.items()
 
            if not cls.classify(related).value.balance_paid(related.balance())
 
        }
 

	
 

	
 
class BaseReport:
 
    def __init__(self, out_file: TextIO) -> None:
 
        self.out_file = out_file
 
        self.logger = logger.getChild(type(self).__name__)
 

	
 
    def _since_last_nonzero(self, posts: core.RelatedPostings) -> core.RelatedPostings:
 
        retval = core.RelatedPostings()
 
        for post in posts:
 
            if retval.balance().is_zero():
 
                retval.clear()
 
            retval.add(post)
 
        return retval
 

	
 
    def _report(self,
 
                invoice: str,
 
                posts: core.RelatedPostings,
 
                index: int,
 
    ) -> Iterable[str]:
 
        raise NotImplementedError("BaseReport._report")
 

	
 
    def run(self, groups: PostGroups) -> None:
 
        for index, invoice in enumerate(groups):
 
            for line in self._report(str(invoice), groups[invoice], index):
 
                print(line, file=self.out_file)
 

	
 

	
 
class BalanceReport(BaseReport):
 
    def _report(self,
 
                invoice: str,
 
                posts: core.RelatedPostings,
 
                index: int,
 
    ) -> Iterable[str]:
 
        posts = self._since_last_nonzero(posts)
 
        balance = posts.balance()
 
        date_s = posts[0].meta.date.strftime('%Y-%m-%d')
 
        if index:
 
            yield ""
 
        yield f"{invoice}:"
 
        yield f"  {balance} outstanding since {date_s}"
 

	
 

	
 
class OutgoingReport(BaseReport):
 
    def __init__(self, rt_client: rt.Rt, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_client = rt_client
 
        self.rt_wrapper = rtutil.RT(rt_client)
 

	
 
    def _primary_rt_id(self, posts: core.RelatedPostings) -> rtutil.TicketAttachmentIds:
 
        rt_ids = posts.all_meta_links('rt-id')
 
        rt_ids_count = len(rt_ids)
 
        if rt_ids_count != 1:
 
            raise ValueError(f"{rt_ids_count} rt-id links found")
 
        parsed = rtutil.RT.parse(rt_ids.pop())
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _report(self,
 
                invoice: str,
 
                posts: core.RelatedPostings,
 
                index: int,
 
    ) -> Iterable[str]:
 
        posts = self._since_last_nonzero(posts)
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            self.logger.error(
 
                "can't generate outgoings report for %s because no RT ticket available: %s",
 
                invoice, errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
 
                or ''
 
            )
 
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
 

	
 
        raw_balance = -posts.balance()
 
        cost_balance = -posts.balance_at_cost()
 
        cost_balance_s = cost_balance.format(None)
 
        if raw_balance == cost_balance:
 
            balance_s = cost_balance_s
 
        else:
 
            balance_s = f'{raw_balance} ({cost_balance_s})'
 

	
 
        contract_links = posts.all_meta_links('contract')
 
        if contract_links:
 
            contract_s = ' , '.join(self.rt_wrapper.iter_urls(
 
                contract_links, missing_fmt='<BROKEN RT LINK: {}>',
 
            ))
 
        else:
 
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
 
        projects = [v for v in posts.meta_values('project')
 
                    if isinstance(v, str)]
 

	
 
        yield "PAYMENT FOR APPROVAL:"
 
        yield f"REQUESTOR: {requestor}"
 
        yield f"TOTAL TO PAY: {balance_s}"
 
        yield f"AGREEMENT: {contract_s}"
 
        yield f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}"
 
        yield f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}"
 
        yield f"PROJECT: {', '.join(projects)}"
 
        yield "\nBEANCOUNT ENTRIES:\n"
 

	
 
        last_txn: Optional[Transaction] = None
 
        for post in posts:
 
            txn = post.meta.txn
 
            if txn is not last_txn:
 
                last_txn = txn
 
                txn = self.rt_wrapper.txn_with_urls(txn, '{}')
 
                yield bc_printer.format_entry(txn)
 

	
 

	
 
class ReportType(enum.Enum):
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 
    @classmethod
 
    def default_for(cls, groups: PostGroups) -> 'ReportType':
 
        if len(groups) == 1 and all(
 
                AccrualAccount.classify(group) is AccrualAccount.PAYABLE
 
                and not AccrualAccount.PAYABLE.value.balance_paid(group.balance())
 
                for group in groups.values()
 
        ):
 
            return cls.OUTGOING
 
        else:
 
            return cls.BALANCE
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    CONSISTENCY_ERRORS = 2
 
    REPORT_ERRORS = 4
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
class SearchTerm(NamedTuple):
 
    meta_key: MetaKey
 
    pattern: str
 

	
 
    @classmethod
 
    def parse(cls, s: str) -> 'SearchTerm':
 
        key_match = re.match(r'^[a-z][-\w]*=', s)
 
        key: Optional[str]
 
        if key_match:
 
            key, _, raw_link = s.partition('=')
 
        else:
 
            key = None
 
            raw_link = s
 
        rt_ids = rtutil.RT.parse(raw_link)
 
        if rt_ids is None:
 
            rt_ids = rtutil.RT.parse('rt:' + raw_link)
 
        if rt_ids is None:
 
            if key is None:
 
                key = 'invoice'
 
            pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
 
        else:
 
            ticket_id, attachment_id = rt_ids
 
            if key is None:
 
                key = 'rt-id' if attachment_id is None else 'invoice'
 
            pattern = rtutil.RT.metadata_regexp(
 
                ticket_id,
 
                attachment_id,
 
                first_link_only=key == 'rt-id' and attachment_id is None,
 
            )
 
        return cls(key, pattern)
 

	
 
def _consistency_check_one_thing(
 
        key: MetaValue,
 
        related: core.RelatedPostings,
 
        get_name: str,
 
        get_func: Callable[[data.Posting], Any],
 
) -> Iterable[Error]:
 
    values = {get_func(post) for post in related}
 
    if len(values) != 1:
 
        for post in related:
 
            errmsg = f'inconsistent {get_name} for invoice {key}: {get_func(post)}'
 
            yield Error(post.meta, errmsg, post.meta.txn)
 

	
 
def consistency_check(groups: PostGroups) -> Iterable[Error]:
 
    errfmt = 'inconsistent {} for invoice {}: {{}}'
 
    for key, related in groups.items():
 
        yield from _consistency_check_one_thing(
 
            key, related, 'cost', lambda post: post.cost,
 
        )
 
        for checked_meta in ['contract', 'entity', 'purchase-order']:
 
            meta_values = related.meta_values(checked_meta)
 
            if len(meta_values) != 1:
 
                errmsg = f'inconsistent {checked_meta} for invoice {key}'
 
                for post in related:
 
                    yield Error(
 
                        post.meta,
 
                        f'{errmsg}: {post.meta.get(checked_meta)}',
 
                        post.meta.txn,
 
                    )
 
            yield from _consistency_check_one_thing(
 
                key, related, checked_meta, lambda post: post.meta.get(checked_meta),
 
            )
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for meta_key, pattern in search_terms:
 
        postings = filters.filter_meta_match(postings, meta_key, re.compile(pattern))
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, either `balance` or `outgoing`.
 
If not specified, the default is `outgoing` for search criteria that return a
 
single outstanding payable, and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
 
        type=int,
 
        default=-1,
 
        help="""How far back to search the books for related transactions.
 
You can either specify a fiscal year, or a negative offset from the current
 
fiscal year, to start loading entries from. The default is -1 (start from the
 
previous fiscal year).
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search',
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    args.search_terms = [SearchTerm.parse(s) for s in args.search]
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    if cliutil.is_main_script(PROGNAME):
 
        global logger
 
        logger = logging.getLogger(PROGNAME)
 
        sys.excepthook = cliutil.ExceptHook(logger)
 
    args = parse_arguments(arglist)
 
    cliutil.setup_logger(logger, args.loglevel, stderr)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 
    books_loader = config.books_loader()
 
    if books_loader is not None:
 
        entries, load_errors, _ = books_loader.load_fy_range(args.since)
 
    else:
 
        entries = []
 
        source = {
 
            'filename': str(config.config_file_path()),
 
            'lineno': 1,
 
        }
 
        load_errors = [Error(source, "no books to load in configuration", None)]
 
    postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
 
    groups = core.RelatedPostings.group_by_meta(postings, 'invoice')
 
    groups = AccrualAccount.filter_paid_accruals(groups) or groups
 
    meta_errors = consistency_check(groups)
 
    returncode = 0
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
    for error in meta_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.CONSISTENCY_ERRORS
 
    if args.report_type is None:
 
        args.report_type = ReportType.default_for(groups)
 
    if not groups:
 
        logger.warning("no matching entries found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 
    report: Optional[BaseReport] = None
 
    if args.report_type is ReportType.OUTGOING:
 
        rt_client = config.rt_client()
 
        if rt_client is None:
 
            logger.error("unable to generate outgoing report: RT client is required")
 
        else:
 
            report = OutgoingReport(rt_client, stdout)
 
    else:
 
        report = args.report_type.value(stdout)
 
    if report is None:
 
        returncode |= ReturnFlag.REPORT_ERRORS
 
    else:
 
        report.run(groups)
 
    return 0 if returncode == 0 else 16 + returncode
 

	
 
if __name__ == '__main__':
 
    exit(main())
setup.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.0.8',
 
    version='1.0.9',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'babel>=2.6',  # Debian:python3-babel
 
        'beancount>=2.2',  # Debian:beancount
 
        'PyYAML>=3.0',  # Debian:python3-yaml
 
        'regex',  # Debian:python3-regex
 
        'rt>=2.0',
 
    ],
 
    setup_requires=[
 
        'pytest-mypy',
 
        'pytest-runner',  # Debian:python3-pytest-runner
 
    ],
 
    tests_require=[
 
        'mypy>=0.770',  # Debian:python3-mypy
 
        'pytest',  # Debian:python3-pytest
 
    ],
 

	
 
    packages=[
 
        'conservancy_beancount',
 
        'conservancy_beancount.plugin',
 
        'conservancy_beancount.reports',
 
    ],
 
    entry_points={
 
        'console_scripts': [
 
            'accrual-report = conservancy_beancount.reports.accrual:main',
 
        ],
 
    },
 
)
tests/test_reports_accrual.py
Show inline comments
 
"""test_reports_accrual - Unit tests for accrual report"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import copy
 
import io
 
import itertools
 
import logging
 
import re
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from beancount import loader as bc_loader
 
from conservancy_beancount import data
 
from conservancy_beancount import rtutil
 
from conservancy_beancount.reports import accrual
 
from conservancy_beancount.reports import core
 

	
 
_accruals_load = bc_loader.load_file(testutil.test_path('books/accruals.beancount'))
 
ACCRUALS_COUNT = sum(
 
    1
 
    for entry in _accruals_load[0]
 
    for post in getattr(entry, 'postings', ())
 
    if post.account.startswith(('Assets:Receivable:', 'Liabilities:Payable:'))
 
)
 

	
 
ACCOUNTS = [
 
    'Assets:Receivable:Accounts',
 
    'Assets:Receivable:Loans',
 
    'Liabilities:Payable:Accounts',
 
    'Liabilities:Payable:Vacation',
 
]
 

	
 
CONSISTENT_METADATA = [
 
    'contract',
 
    'entity',
 
    'purchase-order',
 
]
 

	
 
class RTClient(testutil.RTClient):
 
    TICKET_DATA = {
 
        '40': [
 
            ('400', 'invoice feb.csv', 'text/csv', '40.4k'),
 
        ],
 
        '44': [
 
            ('440', 'invoice feb.csv', 'text/csv', '40.4k'),
 
        ],
 
        '490': [],
 
        '505': [],
 
        '510': [
 
            ('4000', 'contract.pdf', 'application/pdf', '1.4m'),
 
            ('5100', 'invoice april.pdf', 'application/pdf', '1.5m'),
 
            ('5105', 'payment.png', 'image/png', '51.5k'),
 
            ('6100', 'invoice may.pdf', 'application/pdf', '1.6m'),
 
        ],
 
        '515': [],
 
        '520': [],
 
    }
 

	
 

	
 
@pytest.fixture
 
def accrual_entries():
 
    return copy.deepcopy(_accruals_load[0])
 

	
 
@pytest.fixture
 
def accrual_postings():
 
    entries = copy.deepcopy(_accruals_load[0])
 
    return data.Posting.from_entries(entries)
 

	
 
def check_link_regexp(regexp, match_s, first_link_only=False):
 
    assert regexp
 
    assert re.search(regexp, match_s)
 
    assert re.search(regexp, match_s + ' postlink')
 
    assert re.search(regexp, match_s + '0') is None
 
    assert re.search(regexp, '1' + match_s) is None
 
    end_match = re.search(regexp, 'prelink ' + match_s)
 
    if first_link_only:
 
        assert end_match is None
 
    else:
 
        assert end_match
 

	
 
def relate_accruals_by_meta(postings, value, key='invoice'):
 
    return core.RelatedPostings(
 
        post for post in postings
 
        if post.meta.get(key) == value
 
        and post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
 
    )
 

	
 
@pytest.mark.parametrize('link_fmt', [
 
    '{}',
 
    'rt:{}',
 
    'rt://ticket/{}',
 
])
 
def test_search_term_parse_rt_shortcuts(link_fmt):
 
    key, regexp = accrual.SearchTerm.parse(link_fmt.format(220))
 
    assert key == 'rt-id'
 
    check_link_regexp(regexp, 'rt:220', first_link_only=True)
 
    check_link_regexp(regexp, 'rt://ticket/220', first_link_only=True)
 

	
 
@pytest.mark.parametrize('link_fmt', [
 
    '{}/{}',
 
    'rt:{}/{}',
 
    'rt://ticket/{}/attachments/{}',
 
])
 
def test_search_term_parse_invoice_shortcuts(link_fmt):
 
    key, regexp = accrual.SearchTerm.parse(link_fmt.format(330, 660))
 
    assert key == 'invoice'
 
    check_link_regexp(regexp, 'rt:330/660')
 
    check_link_regexp(regexp, 'rt://ticket/330/attachments/660')
 

	
 
@pytest.mark.parametrize('key', [
 
    'approval',
 
    'contract',
 
    'invoice',
 
])
 
def test_search_term_parse_metadata_rt_shortcut(key):
 
    actual_key, regexp = accrual.SearchTerm.parse(f'{key}=440/420')
 
    assert actual_key == key
 
    check_link_regexp(regexp, 'rt:440/420')
 
    check_link_regexp(regexp, 'rt://ticket/440/attachments/420')
 

	
 
@pytest.mark.parametrize('key', [
 
    None,
 
    'approval',
 
    'contract',
 
    'invoice',
 
])
 
def test_search_term_parse_repo_link(key):
 
    document = '1234.pdf'
 
    if key is None:
 
        key = 'invoice'
 
        search = document
 
    else:
 
        search = f'{key}={document}'
 
    actual_key, regexp = accrual.SearchTerm.parse(search)
 
    assert actual_key == key
 
    check_link_regexp(regexp, document)
 

	
 
@pytest.mark.parametrize('search,unmatched', [
 
    ('1234.pdf', '1234_pdf'),
 
])
 
def test_search_term_parse_regexp_escaping(search, unmatched):
 
    _, regexp = accrual.SearchTerm.parse(search)
 
    assert re.search(regexp, unmatched) is None
 

	
 
@pytest.mark.parametrize('search_terms,expect_count,check_func', [
 
    ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
 
        'Assets:Receivable:', 'Liabilities:Payable:',
 
    )),
 
    ([('rt-id', '^rt:505$')], 2, lambda post: post.meta['entity'] == 'DonorA'),
 
    ([('invoice', r'^rt:\D+515/')], 1, lambda post: post.meta['entity'] == 'MatchingProgram'),
 
    ([('entity', '^Lawyer$')], 3, lambda post: post.meta['rt-id'] == 'rt:510'),
 
    ([('entity', '^Lawyer$'), ('contract', '^rt:510/')], 2,
 
     lambda post: post.meta['invoice'].startswith('rt:510/')),
 
    ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False),
 
])
 
def test_filter_search(accrual_postings, search_terms, expect_count, check_func):
 
    actual = list(accrual.filter_search(accrual_postings, search_terms))
 
    if expect_count < ACCRUALS_COUNT:
 
        assert ACCRUALS_COUNT > len(actual) >= expect_count
 
    else:
 
        assert len(actual) == ACCRUALS_COUNT
 
    for post in actual:
 
        assert check_func(post)
 

	
 
@pytest.mark.parametrize('arg,expected', [
 
    ('balance', accrual.BalanceReport),
 
    ('outgoing', accrual.OutgoingReport),
 
    ('bal', accrual.BalanceReport),
 
    ('out', accrual.OutgoingReport),
 
    ('outgoings', accrual.OutgoingReport),
 
])
 
def test_report_type_by_name(arg, expected):
 
    assert accrual.ReportType.by_name(arg.lower()).value is expected
 
    assert accrual.ReportType.by_name(arg.title()).value is expected
 
    assert accrual.ReportType.by_name(arg.upper()).value is expected
 

	
 
@pytest.mark.parametrize('arg', [
 
    'unknown',
 
    'blance',
 
    'outgong',
 
])
 
def test_report_type_by_unknown_name(arg):
 
    # Raising ValueError helps argparse generate good messages.
 
    with pytest.raises(ValueError):
 
        accrual.ReportType.by_name(arg)
 

	
 
@pytest.mark.parametrize('meta_key,account', testutil.combine_values(
 
    CONSISTENT_METADATA,
 
    ACCOUNTS,
 
))
 
def test_consistency_check_when_consistent(meta_key, account):
 
    invoice = f'test-{meta_key}-invoice'
 
    meta = {
 
        'invoice': invoice,
 
        meta_key: f'test-{meta_key}-value',
 
    }
 
    txn = testutil.Transaction(postings=[
 
        (account, 100, meta),
 
        (account, -100, meta),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    assert not list(accrual.consistency_check({invoice: related}))
 

	
 
@pytest.mark.parametrize('meta_key,account', testutil.combine_values(
 
    ['approval', 'fx-rate', 'statement'],
 
    ACCOUNTS,
 
))
 
def test_consistency_check_ignored_metadata(meta_key, account):
 
    invoice = f'test-{meta_key}-invoice'
 
    txn = testutil.Transaction(postings=[
 
        (account, 100, {'invoice': invoice, meta_key: 'credit'}),
 
        (account, -100, {'invoice': invoice, meta_key: 'debit'}),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    assert not list(accrual.consistency_check({invoice: related}))
 

	
 
@pytest.mark.parametrize('meta_key,account', testutil.combine_values(
 
    CONSISTENT_METADATA,
 
    ACCOUNTS,
 
))
 
def test_consistency_check_when_inconsistent(meta_key, account):
 
    invoice = f'test-{meta_key}-invoice'
 
    txn = testutil.Transaction(postings=[
 
        (account, 100, {'invoice': invoice, meta_key: 'credit', 'lineno': 1}),
 
        (account, -100, {'invoice': invoice, meta_key: 'debit', 'lineno': 2}),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    errors = list(accrual.consistency_check({invoice: related}))
 
    for exp_lineno, (actual, exp_msg) in enumerate(itertools.zip_longest(errors, [
 
            f'inconsistent {meta_key} for invoice {invoice}: credit',
 
            f'inconsistent {meta_key} for invoice {invoice}: debit',
 
    ]), 1):
 
        assert actual.message == exp_msg
 
        assert actual.entry is txn
 
        assert actual.source.get('lineno') == exp_lineno
 

	
 
def test_consistency_check_cost():
 
    account = ACCOUNTS[0]
 
    invoice = 'test-cost-invoice'
 
    txn = testutil.Transaction(postings=[
 
        (account, 100, 'EUR', ('1.1251', 'USD'), {'invoice': invoice, 'lineno': 1}),
 
        (account, -100, 'EUR', ('1.125', 'USD'), {'invoice': invoice, 'lineno': 2}),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    errors = list(accrual.consistency_check({invoice: related}))
 
    for post, err in itertools.zip_longest(txn.postings, errors):
 
        assert err.message == f'inconsistent cost for invoice {invoice}: {post.cost}'
 
        assert err.entry is txn
 
        assert err.source.get('lineno') == post.meta['lineno']
 

	
 
def check_output(output, expect_patterns):
 
    output.seek(0)
 
    testutil.check_lines_match(iter(output), expect_patterns)
 

	
 
def run_outgoing(invoice, postings, rt_client=None):
 
    if rt_client is None:
 
        rt_client = RTClient()
 
    if not isinstance(postings, core.RelatedPostings):
 
        postings = relate_accruals_by_meta(postings, invoice)
 
    output = io.StringIO()
 
    report = accrual.OutgoingReport(rt_client, output)
 
    report.run({invoice: postings})
 
    return output
 

	
 
@pytest.mark.parametrize('invoice,expected', [
 
    ('rt:505/5050', "Zero balance outstanding since 2020-05-05"),
 
    ('rt:510/5100', "Zero balance outstanding since 2020-05-10"),
 
    ('rt:510/6100', "-280.00 USD outstanding since 2020-06-10"),
 
    ('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2020-05-15",),
 
])
 
def test_balance_report(accrual_postings, invoice, expected, caplog):
 
    related = relate_accruals_by_meta(accrual_postings, invoice)
 
    output = io.StringIO()
 
    report = accrual.BalanceReport(output)
 
    report.run({invoice: related})
 
    assert not caplog.records
 
    check_output(output, [invoice, expected])
 

	
 
def test_outgoing_report(accrual_postings, caplog):
 
    invoice = 'rt:510/6100'
 
    output = run_outgoing(invoice, accrual_postings)
 
    rt_url = RTClient.DEFAULT_URL[:-9]
 
    rt_id_url = rf'\b{re.escape(f"{rt_url}Ticket/Display.html?id=510")}\b'
 
    contract_url = rf'\b{re.escape(f"{rt_url}Ticket/Attachment/4000/4000/contract.pdf")}\b'
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
 
        r'^TOTAL TO PAY: \$280\.00$',
 
        fr'^AGREEMENT: {contract_url}',
 
        r'^PAYMENT TO: Hon\. Mx\. 510$',
 
        r'^PAYMENT METHOD: payment method 510$',
 
        r'^BEANCOUNT ENTRIES:$',
 
        # For each transaction, check for the date line, a metadata, and the
 
        # Expenses posting.
 
        r'^\s*2020-06-10\s',
 
        fr'^\s+rt-id: "{rt_id_url}"$',
 
        r'^\s+Expenses:Services:Legal\s+220\.00 USD$',
 
        r'^\s*2020-06-12\s',
 
        fr'^\s+contract: "{contract_url}"$',
 
        r'^\s+Expenses:FilingFees\s+60\.00 USD$',
 
    ])
 

	
 
def test_outgoing_report_custom_field_fallbacks(accrual_postings, caplog):
 
    invoice = 'rt:510/6100'
 
    rt_client = RTClient(want_cfs=False)
 
    output = run_outgoing(invoice, accrual_postings, rt_client)
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: <mx510@example\.org>$',
 
        r'^PAYMENT TO:\s*$',
 
        r'^PAYMENT METHOD:\s*$',
 
    ])
 

	
 
def test_outgoing_report_fx_amounts(accrual_postings, caplog):
 
    invoice = 'rt:520/5200'
 
    output = run_outgoing(invoice, accrual_postings)
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: Mx\. 520 <mx520@example\.org>$',
 
        r'^TOTAL TO PAY: 1,000\.00 EUR \(\$1,100.00\)$',
 
    ])
 

	
 
def test_outgoing_report_without_rt_id(accrual_postings, caplog):
 
    invoice = 'rt://ticket/515/attachments/5150'
 
    output = run_outgoing(invoice, accrual_postings)
 
    assert caplog.records
 
    log = caplog.records[0]
 
    assert log.message.startswith(
 
        f"can't generate outgoings report for {invoice} because no RT ticket available:",
 
    )
 
    assert not output.getvalue()
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/accruals.beancount'),
 
            rt_client=RTClient(),
 
        )
 
    output = io.StringIO()
 
    errors = io.StringIO()
 
    retcode = accrual.main(arglist, output, errors, config)
 
    return retcode, output, errors
 

	
 
def check_main_fails(arglist, config, error_flags, error_patterns):
 
    retcode, output, errors = run_main(arglist, config)
 
    assert retcode > 16
 
    assert (retcode - 16) & error_flags
 
    check_output(errors, error_patterns)
 
    assert not output.getvalue()
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['--report-type=balance', 'entity=EarlyBird'],
 
    ['--report-type=outgoing', 'entity=EarlyBird'],
 
])
 
def test_output_excludes_payments(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    output.seek(0)
 
    for line in output:
 
        assert not re.match(r'\brt:4\d\b', line)
 

	
 
@pytest.mark.parametrize('arglist,expect_invoice', [
 
    (['40'], 'rt:40/400'),
 
    (['44/440'], 'rt:44/440'),
 
])
 
def test_output_payments_when_only_match(arglist, expect_invoice):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        rf'^{re.escape(expect_invoice)}:$',
 
        r' outstanding since ',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['510'],
 
    ['510/6100'],
 
    ['entity=Lawyer'],
 
])
 
def test_main_outgoing_report(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    rt_url = RTClient.DEFAULT_URL[:-9]
 
    rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=510>')
 
    contract_url = re.escape(f'<{rt_url}Ticket/Attachment/4000/4000/contract.pdf>')
 
    check_output(output, [
 
        r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
 
        r'^TOTAL TO PAY: \$280\.00$',
 
        r'^\s*2020-06-12\s',
 
        r'^\s+Expenses:FilingFees\s+60\.00 USD$',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['-t', 'balance'],
 
    ['515/5150'],
 
    ['entity=MatchingProgram'],
 
])
 
def test_main_balance_report(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        r'\brt://ticket/515/attachments/5150:$',
 
        r'^\s+1,500\.00 USD outstanding since 2020-05-15$',
 
    ])
 

	
 
def test_main_no_books():
 
    check_main_fails([], testutil.TestConfig(), 1 | 8, [
 
        r':1: +no books to load in configuration\b',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['499'],
 
    ['505/99999'],
 
    ['entity=NonExistent'],
 
])
 
def test_main_no_matches(arglist):
 
    check_main_fails(arglist, None, 8, [
 
        r': WARNING: no matching entries found to report$',
 
    ])
 

	
 
def test_main_no_rt():
 
    config = testutil.TestConfig(
 
        books_path=testutil.test_path('books/accruals.beancount'),
 
    )
 
    check_main_fails(['-t', 'out'], config, 4, [
 
        r': ERROR: unable to generate outgoing report: RT client is required\b',
 
    ])
0 comments (0 inline, 0 general)