Changeset - 175ac3bd7a30
[Not reviewed]
0 4 0
Brett Smith - 4 years ago 2020-06-11 19:27:36
brettcsmith@brettcsmith.org
accrual: Outgoing report groups by rt-id. RT#11594.
4 files changed with 78 insertions and 56 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -318,435 +318,435 @@ class AgingODS(core.BaseODS[AccrualPostings, Optional[data.Account]]):
 
    def section_key(self, row: AccrualPostings) -> Optional[data.Account]:
 
        if isinstance(row.account, str):
 
            return row.account
 
        else:
 
            return None
 

	
 
    def start_spreadsheet(self) -> None:
 
        for accrual_type in AccrualAccount:
 
            self.use_sheet(accrual_type.name.title())
 
            for index in range(self.COL_COUNT):
 
                stylename = self.style_widecol if index else ''
 
                self.sheet.addElement(odf.table.TableColumn(stylename=stylename))
 
            self.add_row(*(
 
                self.string_cell(name, stylename=self.style_bold)
 
                for name in self.COLUMNS
 
            ))
 
            self.lock_first_row()
 

	
 
    def start_section(self, key: Optional[data.Account]) -> None:
 
        if key is None:
 
            return
 
        self.age_thresholds = list(AccrualAccount.by_account(key).value.aging_thresholds)
 
        self.age_balances = [core.MutableBalance() for _ in self.age_thresholds]
 
        accrual_date = self.date - datetime.timedelta(days=self.age_thresholds[-1])
 
        acct_parts = key.slice_parts()
 
        self.use_sheet(acct_parts[1])
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"{' '.join(acct_parts[2:])} {acct_parts[1]} Aging Report"
 
            f" Accrued by {accrual_date.isoformat()} Unpaid by {self.date.isoformat()}",
 
            stylename=self.merge_styles(self.style_bold, self.style_centertext),
 
            numbercolumnsspanned=self.COL_COUNT,
 
        ))
 
        self.add_row()
 

	
 
    def end_section(self, key: Optional[data.Account]) -> None:
 
        if key is None:
 
            return
 
        total_balance = core.MutableBalance()
 
        text_style = self.merge_styles(self.style_bold, self.style_endtext)
 
        text_span = 4
 
        last_age_text: Optional[str] = None
 
        self.add_row()
 
        for threshold, balance in zip(self.age_thresholds, self.age_balances):
 
            years, days = divmod(threshold, 365)
 
            years_text = f"{years} {'Year' if years == 1 else 'Years'}"
 
            days_text = f"{days} Days"
 
            if years and days:
 
                age_text = f"{years_text} {days_text}"
 
            elif years:
 
                age_text = years_text
 
            else:
 
                age_text = days_text
 
            if last_age_text is None:
 
                age_range = f"Over {age_text}"
 
            else:
 
                age_range = f"{age_text}–{last_age_text}"
 
            self.add_row(
 
                self.string_cell(
 
                    f"Total Aged {age_range}: ",
 
                    stylename=text_style,
 
                    numbercolumnsspanned=text_span,
 
                ),
 
                *(odf.table.TableCell() for _ in range(1, text_span)),
 
                self.balance_cell(balance),
 
            )
 
            last_age_text = age_text
 
            total_balance += balance
 
        self.add_row(
 
            self.string_cell(
 
                "Total Unpaid: ",
 
                stylename=text_style,
 
                numbercolumnsspanned=text_span,
 
            ),
 
            *(odf.table.TableCell() for _ in range(1, text_span)),
 
            self.balance_cell(total_balance),
 
        )
 

	
 
    def _link_seq(self, row: AccrualPostings, key: MetaKey) -> Iterator[Tuple[str, str]]:
 
        for href in row.all_meta_links(key):
 
            text: Optional[str] = None
 
            rt_ids = self.rt_wrapper.parse(href)
 
            if rt_ids is not None:
 
                ticket_id, attachment_id = rt_ids
 
                if attachment_id is None:
 
                    text = f'RT#{ticket_id}'
 
                href = self.rt_wrapper.url(ticket_id, attachment_id) or href
 
            else:
 
                # '..' pops the ODS filename off the link path. In other words,
 
                # make the link relative to the directory the ODS is in.
 
                href = f'../{href}'
 
            if text is None:
 
                href_path = Path(urlparse.urlparse(href).path)
 
                text = urlparse.unquote(href_path.name)
 
            yield (href, text)
 

	
 
    def write_row(self, row: AccrualPostings) -> None:
 
        age = (self.date - row[0].meta.date).days
 
        if row.end_balance.ge_zero():
 
            for index, threshold in enumerate(self.age_thresholds):
 
                if age >= threshold:
 
                    self.age_balances[index] += row.end_balance
 
                    break
 
            else:
 
                return
 
        raw_balance = row.balance()
 
        if row.accrual_type is not None:
 
            raw_balance = row.accrual_type.normalize_amount(raw_balance)
 
        if raw_balance == row.end_balance:
 
            amount_cell = odf.table.TableCell()
 
        else:
 
            amount_cell = self.balance_cell(raw_balance)
 
        projects = {post.meta.get('project') or None for post in row}
 
        projects.discard(None)
 
        self.add_row(
 
            self.date_cell(row[0].meta.date),
 
            self.multiline_cell(row.entities()),
 
            amount_cell,
 
            self.balance_cell(row.end_balance),
 
            self.multiline_cell(sorted(projects)),
 
            self.multilink_cell(self._link_seq(row, 'rt-id')),
 
            self.multilink_cell(self._link_seq(row, 'invoice')),
 
            self.multilink_cell(self._link_seq(row, 'approval')),
 
            self.multilink_cell(self._link_seq(row, 'contract')),
 
            self.multilink_cell(self._link_seq(row, 'purchase-order')),
 
        )
 

	
 

	
 
class AgingReport(BaseReport):
 
    def __init__(self,
 
                 rt_client: rt.Rt,
 
                 out_file: BinaryIO,
 
                 date: Optional[datetime.date]=None,
 
    ) -> None:
 
        if date is None:
 
            date = datetime.date.today()
 
        self.out_bin = out_file
 
        self.logger = logger.getChild(type(self).__name__)
 
        self.ods = AgingODS(rt_client, date, self.logger)
 

	
 
    def run(self, groups: PostGroups) -> None:
 
        rows: List[AccrualPostings] = []
 
        for group in groups.values():
 
            if group.is_zero():
 
                # Cheap optimization: don't slice and dice groups we're not
 
                # going to report anyway.
 
                continue
 
            elif group.accrual_type is None:
 
                group = group.since_last_nonzero()
 
            else:
 
                # Filter out new accruals after the report date.
 
                # e.g., cover the case that the same invoices has multiple
 
                # postings over time, and we don't want to report too-recent
 
                # ones.
 
                cutoff_date = self.ods.date - datetime.timedelta(
 
                    days=group.accrual_type.value.aging_thresholds[-1],
 
                )
 
                group = AccrualPostings(
 
                    post for post in group.since_last_nonzero()
 
                    if post.meta.date <= cutoff_date
 
                    or group.accrual_type.normalize_amount(post.units.number) < 0
 
                )
 
            if group and not group.is_zero():
 
                rows.append(group)
 
        rows.sort(key=lambda related: (
 
            related.account,
 
            related[0].meta.date,
 
            ('\0'.join(related.entities())
 
             if related.entity is related.INCONSISTENT
 
             else related.entity),
 
        ))
 
        self.ods.write(rows)
 
        self.ods.save_file(self.out_bin)
 

	
 

	
 
class BalanceReport(BaseReport):
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        date_s = posts[0].meta.date.strftime('%Y-%m-%d')
 
        if index:
 
            yield ""
 
        yield f"{posts.invoice}:"
 
        yield f"  {posts.balance_at_cost()} outstanding since {date_s}"
 

	
 

	
 
class OutgoingReport(BaseReport):
 
    def __init__(self, rt_client: rt.Rt, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_client = rt_client
 
        self.rt_wrapper = rtutil.RT(rt_client)
 

	
 
    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
 
        rt_ids = list(posts.first_meta_links('rt-id'))
 
        rt_ids_count = len(rt_ids)
 
        if rt_ids_count != 1:
 
            raise ValueError(f"{rt_ids_count} rt-id links found")
 
        parsed = rtutil.RT.parse(rt_ids.pop())
 
        rt_id = posts.rt_id
 
        if rt_id is None:
 
            raise ValueError("no rt-id links found")
 
        elif isinstance(rt_id, Sentinel):
 
            raise ValueError("multiple rt-id links found")
 
        parsed = rtutil.RT.parse(rt_id)
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            self.logger.error(
 
                "can't generate outgoings report for %s because no RT ticket available: %s",
 
                posts.invoice, errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
 
                or ''
 
            )
 
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
 

	
 
        balance_s = posts.end_balance.format(None)
 
        raw_balance = -posts.balance()
 
        if raw_balance != posts.end_balance:
 
            balance_s = f'{raw_balance} ({balance_s})'
 

	
 
        contract_links = list(posts.all_meta_links('contract'))
 
        if contract_links:
 
            contract_s = ' , '.join(self.rt_wrapper.iter_urls(
 
                contract_links, missing_fmt='<BROKEN RT LINK: {}>',
 
            ))
 
        else:
 
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
 
        projects = [v for v in posts.meta_values('project')
 
                    if isinstance(v, str)]
 

	
 
        yield "PAYMENT FOR APPROVAL:"
 
        yield f"REQUESTOR: {requestor}"
 
        yield f"TOTAL TO PAY: {balance_s}"
 
        yield f"AGREEMENT: {contract_s}"
 
        yield f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}"
 
        yield f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}"
 
        yield f"PROJECT: {', '.join(projects)}"
 
        yield "\nBEANCOUNT ENTRIES:\n"
 

	
 
        last_txn: Optional[Transaction] = None
 
        for post in posts:
 
            txn = post.meta.txn
 
            if txn is not last_txn:
 
                last_txn = txn
 
                txn = self.rt_wrapper.txn_with_urls(txn, '{}')
 
                yield bc_printer.format_entry(txn)
 

	
 

	
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 
    @classmethod
 
    def default_for(cls, groups: PostGroups) -> 'ReportType':
 
        if len(groups) == 1 and all(
 
                group.accrual_type is AccrualAccount.PAYABLE
 
                and not group.is_paid()
 
                for group in groups.values()
 
        ):
 
            return cls.OUTGOING
 
        else:
 
            return cls.BALANCE
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    # 2 was used in the past, it can probably be reclaimed.
 
    REPORT_ERRORS = 4
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, one of `aging`, `balance`, or
 
`outgoing`. If not specified, the default is `aging` when no search terms are
 
given, `outgoing` for search terms that return a single outstanding payable,
 
and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
 
        type=int,
 
        default=-1,
 
        help="""How far back to search the books for related transactions.
 
You can either specify a fiscal year, or a negative offset from the current
 
fiscal year, to start loading entries from. The default is -1 (start from the
 
previous fiscal year).
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for the balance and outgoing reports, and a generated
 
filename for other reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None and not args.search_terms:
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    if cliutil.is_main_script(PROGNAME):
 
        global logger
 
        logger = logging.getLogger(PROGNAME)
 
        sys.excepthook = cliutil.ExceptHook(logger)
 
    args = parse_arguments(arglist)
 
    cliutil.setup_logger(logger, args.loglevel, stderr)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    elif args.report_type is ReportType.AGING:
 
        entries, load_errors, _ = books_loader.load_all()
 
    else:
 
        entries, load_errors, _ = books_loader.load_all(args.since)
 
    filters.remove_opening_balance_txn(entries)
 

	
 
    returncode = 0
 
    postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
 
    groups: PostGroups = dict(AccrualPostings.group_by_first_meta_link(postings, 'invoice'))
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
    if not groups:
 

	
 
    postings = list(filter_search(
 
        data.Posting.from_entries(entries), args.search_terms,
 
    ))
 
    if not postings:
 
        logger.warning("no matching entries found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 

	
 
    groups = {
 
        key: posts
 
        for source_posts in groups.values()
 
        for key, posts in source_posts.make_consistent()
 
    }
 
    groups: PostGroups
 
    if args.report_type is None or args.report_type is ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.group_by_first_meta_link(postings, 'rt-id'))
 
        if (args.report_type is None
 
            and len(groups) == 1
 
            and all(g.accrual_type is AccrualAccount.PAYABLE and not g.is_paid()
 
                    for g in groups.values())
 
        ):
 
            args.report_type = ReportType.OUTGOING
 
    if args.report_type is not ReportType.OUTGOING:
 
        groups = {
 
            key: group
 
            for _, source in AccrualPostings.group_by_first_meta_link(postings, 'invoice')
 
            for key, group in source.make_consistent()
 
        }
 
    if args.report_type is not ReportType.AGING:
 
        groups = {
 
            key: posts for key, posts in groups.items() if not posts.is_paid()
 
        } or groups
 
    del postings
 

	
 
    if args.report_type is None:
 
        args.report_type = ReportType.default_for(groups)
 
    report: Optional[BaseReport] = None
 
    output_path: Optional[Path] = None
 
    if args.report_type is ReportType.AGING:
 
        rt_client = config.rt_client()
 
        if rt_client is None:
 
            logger.error("unable to generate aging report: RT client is required")
 
        else:
 
            now = datetime.datetime.now()
 
            if args.output_file is None:
 
                args.output_file = Path(now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods'))
 
                logger.info("Writing report to %s", args.output_file)
 
            out_bin = cliutil.bytes_output(args.output_file, stdout)
 
            report = AgingReport(rt_client, out_bin)
 
    elif args.report_type is ReportType.OUTGOING:
 
        rt_client = config.rt_client()
 
        if rt_client is None:
 
            logger.error("unable to generate outgoing report: RT client is required")
 
        else:
 
            out_file = cliutil.text_output(args.output_file, stdout)
 
            report = OutgoingReport(rt_client, out_file)
 
    else:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = args.report_type.value(out_file)
 
        report = BalanceReport(out_file)
 

	
 
    if report is None:
 
        returncode |= ReturnFlag.REPORT_ERRORS
 
    else:
 
        report.run(groups)
 
    return 0 if returncode == 0 else 16 + returncode
 

	
 
if __name__ == '__main__':
 
    exit(main())
setup.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.1.8',
 
    version='1.1.9',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'babel>=2.6',  # Debian:python3-babel
 
        'beancount>=2.2',  # Debian:beancount
 
        # 1.4.1 crashes when trying to save some documents.
 
        'odfpy>=1.4.0,!=1.4.1',  # Debian:python3-odf
 
        'PyYAML>=3.0',  # Debian:python3-yaml
 
        'regex',  # Debian:python3-regex
 
        'rt>=2.0',
 
    ],
 
    setup_requires=[
 
        'pytest-mypy',
 
        'pytest-runner',  # Debian:python3-pytest-runner
 
    ],
 
    tests_require=[
 
        'mypy>=0.770',  # Debian:python3-mypy
 
        'pytest',  # Debian:python3-pytest
 
    ],
 

	
 
    packages=[
 
        'conservancy_beancount',
 
        'conservancy_beancount.plugin',
 
        'conservancy_beancount.reports',
 
    ],
 
    entry_points={
 
        'console_scripts': [
 
            'accrual-report = conservancy_beancount.reports.accrual:main',
 
        ],
 
    },
 
)
tests/books/accruals.beancount
Show inline comments
 
2010-01-01 open Assets:Checking
 
2010-01-01 open Assets:Receivable:Accounts
 
2010-01-01 open Expenses:FilingFees
 
2010-01-01 open Expenses:Services:Legal
 
2010-01-01 open Expenses:Travel
 
2010-01-01 open Income:Donations
 
2010-01-01 open Liabilities:Payable:Accounts
 
2010-01-01 open Equity:Funds:Opening
 

	
 
2010-03-01 * "Opening balances"
 
  Equity:Funds:Opening  -1000 USD
 
  Assets:Receivable:Accounts  6000 USD
 
  Liabilities:Payable:Accounts  -5000 USD
 

	
 
2010-03-05 * "EarlyBird" "Payment for receivable from previous FY"
 
  rt-id: "rt:40"
 
  invoice: "rt:40/400"
 
  project: "Conservancy"
 
  Assets:Receivable:Accounts  -500 USD
 
  Assets:Checking  500 USD
 

	
 
2010-03-06 * "EarlyBird" "Payment for payment from previous FY"
 
  rt-id: "rt:44"
 
  invoice: "rt:44/440"
 
  project: "Conservancy"
 
  Liabilities:Payable:Accounts  125 USD
 
  Assets:Checking  -125 USD
 

	
 
2010-03-15 * "GrantCo" "2010Q1 grant"
 
  rt-id: "rt:470"
 
  invoice: "rt:470/4700"
 
  project: "Development Grant"
 
  Assets:Receivable:Accounts  5000 USD
 
  Income:Donations           -5000 USD
 

	
 
2010-03-25 * "GrantCo" "2010Q1 grant ACH payment"
 
  rt-id: "rt:470"
 
  invoice: "rt:470/4700"
 
  project: "Development Grant"
 
  Assets:Receivable:Accounts  -5000 USD
 
  Assets:Checking              5000 USD
 

	
 
2010-03-30 * "EarlyBird" "Travel reimbursement"
 
  rt-id: "rt:490"
 
  invoice: "rt:490/4900"
 
  project: "Conservancy"
 
  Liabilities:Payable:Accounts  -75 USD
 
  Expenses:Travel  75 USD
 

	
 
2010-04-15 * "Multiparty invoice"
 
  rt-id: "rt:480"
 
  invoice: "rt:480/4800"
 
  project: "Conservancy"
 
  Expenses:Travel  250 USD
 
  Liabilities:Payable:Accounts  -125 USD
 
  entity: "MultiPartyA"
 
  Liabilities:Payable:Accounts  -125 USD
 
  entity: "MultiPartyB"
 

	
 
2010-04-18 * "MultiPartyA" "Payment for 480"
 
  rt-id: "rt:480"
 
  invoice: "rt:480/4800"
 
  project: "Conservancy"
 
  Liabilities:Payable:Accounts  125 USD
 
  Assets:Checking  -125 USD
 

	
 
2010-04-20 * "MultiPartyB" "Payment for 480"
 
  rt-id: "rt:480"
 
  invoice: "rt:480/4800"
 
  project: "Conservancy"
 
  Liabilities:Payable:Accounts  125 USD
 
  Assets:Checking  -125 USD
 

	
 
2010-04-30 ! "Vendor" "Travel reimbursement"
 
2010-04-25 ! "Vendor" "First trip travel reimbursement"
 
  rt-id: "rt:310"
 
  contract: "rt:310/3100"
 
  invoice: "FIXME"  ; still waiting on them to send it
 
  project: "Conservancy"
 
  Liabilities:Payable:Accounts  -200 USD
 
  Expenses:Travel  200 USD
 

	
 
2010-04-30 * "Vendor" "Second trip travel reimbursement"
 
  rt-id: "rt:310"
 
  contract: "rt:310/3100"
 
  invoice: "rt:310/3120"
 
  project: "Conservancy"
 
  Liabilities:Payable:Accounts  -220 USD
 
  Expenses:Travel  220 USD
 

	
 
2010-05-05 * "DonorA" "Donation pledge"
 
  rt-id: "rt:505"
 
  invoice: "rt:505/5050"
 
  approval: "rt:505/5040"
 
  project: "Conservancy"
 
  Income:Donations  -2,500 EUR {1.100 USD}
 
  Assets:Receivable:Accounts  2,500 EUR {1.100 USD}
 

	
 
2010-05-10 * "Lawyer" "April legal services"
 
  rt-id: "rt:510"
 
  invoice: "rt:510/5100"
 
  contract: "rt:510/4000"
 
  project: "Conservancy"
 
  Expenses:Services:Legal  200.00 USD
 
  Liabilities:Payable:Accounts  -200.00 USD
 

	
 
2010-05-15 * "MatchingProgram" "May matched donations"
 
  invoice: "rt://ticket/515/attachments/5150"
 
  approval: "rt://ticket/515/attachments/5140"
 
  project: "Conservancy"
 
  Income:Donations  -1500.00 USD
 
  Assets:Receivable:Accounts  1500.00 USD
 

	
 
2010-05-20 * "DonorA" "Donation made"
 
  rt-id: "rt:505"
 
  invoice: "rt:505/5050"
 
  project: "Conservancy"
 
  Assets:Receivable:Accounts  -2,750.00 USD
 
  Assets:Checking  2,750.00 USD
 
  receipt: "DonorAWire.pdf"
 

	
 
2010-05-25 * "Lawyer" "May payment"
 
  rt-id: "rt:510"
 
  invoice: "rt:510/5100"
 
  project: "Conservancy"
 
  Liabilities:Payable:Accounts  200.00 USD
 
  contract: "rt:510/4000"
 
  Assets:Checking  -200.00 USD
 
  receipt: "rt:510/5105"
 

	
 
2010-06-10 * "Lawyer" "May legal services"
 
  rt-id: "rt:510"
 
  invoice: "rt:510/6100"
 
  contract: "rt:510/4000"
 
  project: "Conservancy"
 
  Expenses:Services:Legal  220.00 USD
 
  Liabilities:Payable:Accounts  -220.00 USD
 

	
 
2010-06-12 * "Lawyer" "Additional legal fees for May"
 
  rt-id: "rt:510"
 
  invoice: "rt:510/6100"
 
  contract: "rt:510/4000"
 
  project: "Conservancy"
 
  Expenses:FilingFees  60.00 USD
 
  Liabilities:Payable:Accounts  -60.00 USD
 

	
 
2010-06-18 * "EuroGov" "European legal fees"
 
  ; Multiple rt-ids are used to test proper handling for both
 
  ; searching and generating the outgoing report.
 
  rt-id: "rt:520 rt:525"
 
  invoice: "rt:520/5200"
 
  contract: "rt:520/5220"
 
  project: "Conservancy"
 
  Liabilities:Payable:Accounts  -1,000 EUR {1.100 USD}
 
  Expenses:FilingFees  1,000 EUR {1.100 USD}
 

	
 
2010-06-15 * "GrantCo" "2010Q2 grant"
 
  rt-id: "rt:470"
 
  invoice: "rt:470/4700"
 
  project: "Development Grant"
 
  Assets:Receivable:Accounts  5500 USD
 
  Income:Donations           -5500 USD
 

	
 
2010-09-15 * "GrantCo" "2010Q3 grant"
 
  rt-id: "rt:470"
 
  invoice: "rt:470/4700"
 
  project: "Development Grant"
 
  Assets:Receivable:Accounts  6000 USD
 
  Income:Donations           -6000 USD
tests/test_reports_accrual.py
Show inline comments
 
"""test_reports_accrual - Unit tests for accrual report"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import copy
 
import datetime
 
import io
 
import itertools
 
import logging
 
import operator
 
import re
 

	
 
import babel.numbers
 
import odf.opendocument
 
import odf.table
 
import odf.text
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from decimal import Decimal
 
from typing import NamedTuple, Optional, Sequence
 

	
 
from beancount.core import data as bc_data
 
from beancount import loader as bc_loader
 
from conservancy_beancount import cliutil
 
from conservancy_beancount import data
 
from conservancy_beancount import rtutil
 
from conservancy_beancount.reports import accrual
 
from conservancy_beancount.reports import core
 

	
 
_accruals_load = bc_loader.load_file(testutil.test_path('books/accruals.beancount'))
 
ACCRUAL_TXNS = [
 
    entry for entry in _accruals_load[0]
 
    if hasattr(entry, 'narration')
 
    and entry.narration != 'Opening balances'
 
]
 
ACCRUALS_COUNT = sum(
 
    1
 
    for txn in ACCRUAL_TXNS
 
    for post in txn.postings
 
    if post.account.startswith(('Assets:Receivable:', 'Liabilities:Payable:'))
 
)
 

	
 
ACCOUNTS = [
 
    'Assets:Receivable:Accounts',
 
    'Assets:Receivable:Loans',
 
    'Liabilities:Payable:Accounts',
 
    'Liabilities:Payable:Vacation',
 
]
 

	
 
class AgingRow(NamedTuple):
 
    date: datetime.date
 
    entity: Sequence[str]
 
    amount: Optional[Sequence[bc_data.Amount]]
 
    at_cost: bc_data.Amount
 
    rt_id: Sequence[str]
 
    invoice: Sequence[str]
 
    project: Sequence[str]
 

	
 
    @classmethod
 
    def make_simple(cls, date, entity, at_cost, invoice,
 
                    rt_id=None, orig_amount=None, project='Conservancy'):
 
        if isinstance(date, str):
 
            date = datetime.datetime.strptime(date, '%Y-%m-%d').date()
 
        if not isinstance(at_cost, tuple):
 
            at_cost = testutil.Amount(at_cost)
 
        if rt_id is None:
 
            rt_id, _, _ = invoice.partition('/')
 
        return cls(date, [entity], orig_amount, at_cost, [rt_id], [invoice], [project])
 

	
 
    def check_row_match(self, sheet_row):
 
        cells = testutil.ODSCell.from_row(sheet_row)
 
        assert len(cells) >= len(self)
 
        cells = iter(cells)
 
        assert next(cells).value == self.date
 
        assert next(cells).text == '\0'.join(self.entity)
 
        assert next(cells).text == '\0'.join(
 
            babel.numbers.format_currency(number, currency, format_type='accounting')
 
            for number, currency in self.amount or ()
 
        )
 
        usd_cell = next(cells)
 
        assert usd_cell.value_type == 'currency'
 
        assert usd_cell.value == self.at_cost.number
 
        assert next(cells).text == '\0'.join(self.project)
 
        for index, cell in enumerate(cells):
 
            links = cell.getElementsByType(odf.text.A)
 
            assert len(links) == len(cell.childNodes)
 
        assert index >= 1
 

	
 

	
 
AGING_AP = [
 
    AgingRow.make_simple('2010-03-06', 'EarlyBird', -125, 'rt:44/440'),
 
    AgingRow.make_simple('2010-03-30', 'EarlyBird', 75, 'rt:490/4900'),
 
    AgingRow.make_simple('2010-04-30', 'Vendor', 200, 'FIXME'),
 
    AgingRow.make_simple('2010-04-25', 'Vendor', 200, 'FIXME'),
 
    AgingRow.make_simple('2010-04-30', 'Vendor', 220, 'rt:310/3120'),
 
    AgingRow.make_simple('2010-06-10', 'Lawyer', 280, 'rt:510/6100'),
 
    AgingRow.make_simple('2010-06-18', 'EuroGov', 1100, 'rt:520/5200',
 
                         orig_amount=[testutil.Amount(1000, 'EUR')]),
 
]
 

	
 
AGING_AR = [
 
    AgingRow.make_simple('2010-03-05', 'EarlyBird', -500, 'rt:40/400'),
 
    AgingRow.make_simple('2010-05-15', 'MatchingProgram', 1500,
 
                         'rt://ticket/515/attachments/5150'),
 
    AgingRow.make_simple('2010-06-15', 'GrantCo', 11500, 'rt:470/4700',
 
                         project='Development Grant'),
 
]
 

	
 
class RTClient(testutil.RTClient):
 
    TICKET_DATA = {
 
        '40': [
 
            ('400', 'invoice feb.csv', 'text/csv', '40.4k'),
 
        ],
 
        '44': [
 
            ('440', 'invoice feb.csv', 'text/csv', '40.4k'),
 
        ],
 
        '310': [
 
            ('3100', 'VendorContract.pdf', 'application/pdf', '1.7m'),
 
            ('3120', 'VendorInvoiceB.pdf', 'application/pdf', '1.8m'),
 
        ],
 
        '490': [],
 
        '505': [],
 
        '510': [
 
            ('4000', 'contract.pdf', 'application/pdf', '1.4m'),
 
            ('5100', 'invoice april.pdf', 'application/pdf', '1.5m'),
 
            ('5105', 'payment.png', 'image/png', '51.5k'),
 
            ('6100', 'invoice may.pdf', 'application/pdf', '1.6m'),
 
        ],
 
        '515': [],
 
        '520': [],
 
    }
 

	
 

	
 
@pytest.fixture
 
def accrual_postings():
 
    return data.Posting.from_entries(copy.deepcopy(ACCRUAL_TXNS))
 

	
 
def accruals_by_meta(postings, value, key='invoice', wrap_type=iter):
 
    return wrap_type(
 
        post for post in postings
 
        if post.meta.get(key) == value
 
        and post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
 
    )
 

	
 
def find_row_by_text(row_source, want_text):
 
    for row in row_source:
 
        try:
 
            found_row = row.childNodes[0].text == want_text
 
        except IndexError:
 
            found_row = False
 
        if found_row:
 
            return row
 
    return None
 

	
 
def check_aging_sheet(sheet, aging_rows, date, accrue_date):
 
    if not aging_rows:
 
        return
 
    if isinstance(accrue_date, int):
 
        accrue_date = date + datetime.timedelta(days=accrue_date)
 
    rows = iter(sheet.getElementsByType(odf.table.TableRow))
 
    for row in rows:
 
        if "Aging Report" in row.text:
 
            break
 
    else:
 
        assert None, "Header row not found"
 
    assert f"Accrued by {accrue_date.isoformat()}" in row.text
 
    assert f"Unpaid by {date.isoformat()}" in row.text
 
    expect_rows = iter(aging_rows)
 
    row0 = find_row_by_text(rows, aging_rows[0].date.isoformat())
 
    next(expect_rows).check_row_match(row0)
 
    for actual, expected in zip(rows, expect_rows):
 
        expected.check_row_match(actual)
 
    for row in rows:
 
        if row.text.startswith("Total Aged "):
 
            break
 
    else:
 
        assert None, "Totals rows not found"
 
    actual_sum = Decimal(row.childNodes[-1].value)
 
    for row in rows:
 
        if row.text.startswith("Total Aged "):
 
            actual_sum += Decimal(row.childNodes[-1].value)
 
        else:
 
            break
 
    assert actual_sum == sum(
 
        row.at_cost.number
 
        for row in aging_rows
 
        if row.date <= accrue_date
 
        and row.at_cost.number > 0
 
    )
 

	
 
def check_aging_ods(ods_file,
 
                    date=None,
 
                    recv_rows=AGING_AR,
 
                    pay_rows=AGING_AP,
 
):
 
    if date is None:
 
        date = datetime.date.today()
 
    ods_file.seek(0)
 
    ods = odf.opendocument.load(ods_file)
 
    sheets = ods.spreadsheet.getElementsByType(odf.table.Table)
 
    assert len(sheets) == 2
 
    check_aging_sheet(sheets[0], recv_rows, date, -60)
 
    check_aging_sheet(sheets[1], pay_rows, date, -30)
 

	
 
@pytest.mark.parametrize('search_terms,expect_count,check_func', [
 
    ([], ACCRUALS_COUNT, lambda post: post.account.is_under(
 
        'Assets:Receivable:', 'Liabilities:Payable:',
 
    )),
 
    ([('rt-id', '^rt:505$')], 2, lambda post: post.meta['entity'] == 'DonorA'),
 
    ([('invoice', r'^rt:\D+515/')], 1, lambda post: post.meta['entity'] == 'MatchingProgram'),
 
    ([('entity', '^Lawyer$')], 3, lambda post: post.meta['rt-id'] == 'rt:510'),
 
    ([('entity', '^Lawyer$'), ('contract', '^rt:510/')], 2,
 
     lambda post: post.meta['invoice'].startswith('rt:510/')),
 
    ([('rt-id', '^rt:510$'), ('approval', '.')], 0, lambda post: False),
 
])
 
def test_filter_search(accrual_postings, search_terms, expect_count, check_func):
 
    search_terms = [cliutil.SearchTerm._make(query) for query in search_terms]
 
    actual = list(accrual.filter_search(accrual_postings, search_terms))
 
    if expect_count < ACCRUALS_COUNT:
 
        assert ACCRUALS_COUNT > len(actual) >= expect_count
 
    else:
 
        assert len(actual) == ACCRUALS_COUNT
 
    for post in actual:
 
        assert check_func(post)
 

	
 
@pytest.mark.parametrize('arg,expected', [
 
    ('aging', accrual.AgingReport),
 
    ('balance', accrual.BalanceReport),
 
    ('outgoing', accrual.OutgoingReport),
 
    ('age', accrual.AgingReport),
 
    ('bal', accrual.BalanceReport),
 
    ('out', accrual.OutgoingReport),
 
    ('outgoings', accrual.OutgoingReport),
 
])
 
def test_report_type_by_name(arg, expected):
 
    assert accrual.ReportType.by_name(arg.lower()).value is expected
 
    assert accrual.ReportType.by_name(arg.title()).value is expected
 
    assert accrual.ReportType.by_name(arg.upper()).value is expected
 

	
 
@pytest.mark.parametrize('arg', [
 
    'unknown',
 
    'blance',
 
    'outgong',
 
])
 
def test_report_type_by_unknown_name(arg):
 
    # Raising ValueError helps argparse generate good messages.
 
    with pytest.raises(ValueError):
 
        accrual.ReportType.by_name(arg)
 

	
 
@pytest.mark.parametrize('acct_name', ACCOUNTS)
 
def test_accrual_postings_consistent_account(acct_name):
 
    meta = {'invoice': '{acct_name} invoice.pdf'}
 
    txn = testutil.Transaction(postings=[
 
        (acct_name, 50, meta),
 
        (acct_name, 25, meta),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.account == acct_name
 

	
 
def test_accrual_postings_entity():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 25, {'entity': 'Accruee'}),
 
        (ACCOUNTS[0], -15, {'entity': 'Payee15'}),
 
        (ACCOUNTS[0], -10, {'entity': 'Payee10'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.entity == 'Accruee'
 
    assert set(related.entities()) == {'Accruee', 'Payee10', 'Payee15'}
 

	
 
def test_accrual_postings_entities():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 25, {'entity': 'Accruee'}),
 
        (ACCOUNTS[0], -15, {'entity': 'Payee15'}),
 
        (ACCOUNTS[0], -10, {'entity': 'Payee10'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    actual = related.entities()
 
    assert next(actual, None) == 'Accruee'
 
    assert set(actual) == {'Payee10', 'Payee15'}
 

	
 
def test_accrual_postings_entities_no_duplicates():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 25, {'entity': 'Accruee'}),
 
        (ACCOUNTS[0], -15, {'entity': 'Accruee'}),
 
        (ACCOUNTS[0], -10, {'entity': 'Other'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    actual = related.entities()
 
    assert next(actual, None) == 'Accruee'
 
    assert next(actual, None) == 'Other'
 
    assert next(actual, None) is None
 

	
 
def test_accrual_postings_inconsistent_account():
 
    meta = {'invoice': 'invoice.pdf'}
 
    txn = testutil.Transaction(postings=[
 
        (acct_name, index, meta)
 
        for index, acct_name in enumerate(ACCOUNTS)
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.account is related.INCONSISTENT
 

	
 
def test_accrual_postings_rt_id():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:90'}),
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:90 rt:92'}),
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:90 rt:94 rt:92'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.rt_id == 'rt:90'
 

	
 
def test_accrual_postings_rt_id_inconsistent():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:96'}),
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:98 rt:96'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.rt_id is related.INCONSISTENT
 

	
 
def test_accrual_postings_rt_id_none():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 10),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.rt_id is None
 

	
 
@pytest.mark.parametrize('acct_name,invoice,day', testutil.combine_values(
 
    ACCOUNTS,
 
    ['FIXME', '', None, *testutil.NON_STRING_METADATA_VALUES],
 
    itertools.count(1),
 
))
 
def test_make_consistent_bad_invoice(acct_name, invoice, day):
 
    txn = testutil.Transaction(date=datetime.date(2019, 1, day), postings=[
 
        (acct_name, index * 10, {'invoice': invoice, 'entity': f'BadInvoice{day}'})
 
        for index in range(1, 4)
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = dict(related.make_consistent())
 
    assert len(consistent) == 1
 
    key = next(iter(consistent))
 
    assert acct_name in key
 
    if invoice:
 
        assert str(invoice) in key
 
    actual = consistent[key]
 
    assert actual
 
    assert len(actual) == 3
 
    for act_post, exp_post in zip(actual, txn.postings):
 
        assert act_post.units == exp_post.units
 
        assert act_post.meta.get('invoice') == invoice
 

	
 
def test_make_consistent_across_accounts():
 
    invoice = 'Invoices/CrossAccount.pdf'
 
    txn = testutil.Transaction(date=datetime.date(2019, 2, 1), postings=[
 
        (acct_name, 100, {'invoice': invoice, 'entity': 'CrossAccount'})
 
        for acct_name in ACCOUNTS
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = dict(related.make_consistent())
 
    assert len(consistent) == len(ACCOUNTS)
 
    for key, posts in consistent.items():
 
        assert len(posts) == 1
 
        assert posts.account in key
 

	
 
def test_make_consistent_both_invoice_and_account():
 
    txn = testutil.Transaction(date=datetime.date(2019, 2, 2), postings=[
 
        (acct_name, 150) for acct_name in ACCOUNTS
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = dict(related.make_consistent())
 
    assert len(consistent) == len(ACCOUNTS)
 
    for key, posts in consistent.items():
 
        assert len(posts) == 1
 
        assert posts.account in key
 

	
 
@pytest.mark.parametrize('acct_name', ACCOUNTS)
 
def test_make_consistent_across_entity(acct_name):
 
    amt_sign = operator.pos if acct_name.startswith('Assets') else operator.neg
 
    txn = testutil.Transaction(postings=[
 
        (acct_name, amt_sign(n), {'invoice': 'Inv/1.pdf', 'entity': f'Entity{n}'})
 
        for n in range(1, 4)
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = dict(related.make_consistent())
 
    assert len(consistent) == 3
 
    for key, posts in consistent.items():
 
        assert len(posts) == 1
 
        entities = posts.entities()
 
        assert next(entities, None) == posts.entity
 
        assert next(entities, None) is None
 
        assert posts.entity in key
 

	
 
@pytest.mark.parametrize('acct_name', ACCOUNTS)
 
def test_make_consistent_entity_differs_accrual_payment(acct_name):
 
    invoice = 'Invoices/DifferPay.pdf'
 
    txn = testutil.Transaction(postings=[
 
        # Depending on the account, the order of the accrual and payment might
 
        # be swapped here, but that shouldn't matter.
 
        (acct_name, 125, {'invoice': invoice, 'entity': 'Positive'}),
 
        (acct_name, -125, {'invoice': invoice, 'entity': 'Negative'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = related.make_consistent()
 
    _, actual = next(consistent)
 
    assert actual is related
 
    assert next(consistent, None) is None
 

	
 
def check_output(output, expect_patterns):
 
    output.seek(0)
 
    testutil.check_lines_match(iter(output), expect_patterns)
 

	
 
def run_outgoing(invoice, postings, rt_client=None):
 
def run_outgoing(rt_id, postings, rt_client=None):
 
    if rt_client is None:
 
        rt_client = RTClient()
 
    if not isinstance(postings, core.RelatedPostings):
 
        postings = accruals_by_meta(postings, invoice, wrap_type=accrual.AccrualPostings)
 
        postings = accruals_by_meta(postings, rt_id, 'rt-id', wrap_type=accrual.AccrualPostings)
 
    output = io.StringIO()
 
    report = accrual.OutgoingReport(rt_client, output)
 
    report.run({invoice: postings})
 
    report.run({rt_id: postings})
 
    return output
 

	
 
@pytest.mark.parametrize('invoice,expected', [
 
    ('rt:505/5050', "Zero balance outstanding since 2010-05-05"),
 
    ('rt:510/5100', "Zero balance outstanding since 2010-05-10"),
 
    ('rt:510/6100', "-280.00 USD outstanding since 2010-06-10"),
 
    ('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2010-05-15",),
 
])
 
def test_balance_report(accrual_postings, invoice, expected, caplog):
 
    related = accruals_by_meta(accrual_postings, invoice, wrap_type=accrual.AccrualPostings)
 
    output = io.StringIO()
 
    report = accrual.BalanceReport(output)
 
    report.run({invoice: related})
 
    assert not caplog.records
 
    check_output(output, [invoice, expected])
 

	
 
def test_outgoing_report(accrual_postings, caplog):
 
    invoice = 'rt:510/6100'
 
    output = run_outgoing(invoice, accrual_postings)
 
    output = run_outgoing('rt:510', accrual_postings)
 
    rt_url = RTClient.DEFAULT_URL[:-9]
 
    rt_id_url = rf'\b{re.escape(f"{rt_url}Ticket/Display.html?id=510")}\b'
 
    contract_url = rf'\b{re.escape(f"{rt_url}Ticket/Attachment/4000/4000/contract.pdf")}\b'
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
 
        r'^TOTAL TO PAY: \$280\.00$',
 
        fr'^AGREEMENT: {contract_url}',
 
        r'^PAYMENT TO: Hon\. Mx\. 510$',
 
        r'^PAYMENT METHOD: payment method 510$',
 
        r'^BEANCOUNT ENTRIES:$',
 
        # For each transaction, check for the date line, a metadata, and the
 
        # Expenses posting.
 
        r'^\s*2010-06-10\s',
 
        fr'^\s+rt-id: "{rt_id_url}"$',
 
        r'^\s+Expenses:Services:Legal\s+220\.00 USD$',
 
        r'^\s*2010-06-12\s',
 
        fr'^\s+contract: "{contract_url}"$',
 
        r'^\s+Expenses:FilingFees\s+60\.00 USD$',
 
    ])
 

	
 
def test_outgoing_report_custom_field_fallbacks(accrual_postings, caplog):
 
    invoice = 'rt:510/6100'
 
    rt_client = RTClient(want_cfs=False)
 
    output = run_outgoing(invoice, accrual_postings, rt_client)
 
    output = run_outgoing('rt:510', accrual_postings, rt_client)
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: <mx510@example\.org>$',
 
        r'^PAYMENT TO:\s*$',
 
        r'^PAYMENT METHOD:\s*$',
 
    ])
 

	
 
def test_outgoing_report_fx_amounts(accrual_postings, caplog):
 
    invoice = 'rt:520/5200'
 
    output = run_outgoing(invoice, accrual_postings)
 
    output = run_outgoing('rt:520 rt:525', accrual_postings)
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: Mx\. 520 <mx520@example\.org>$',
 
        r'^TOTAL TO PAY: 1,000\.00 EUR \(\$1,100.00\)$',
 
    ])
 

	
 
def test_outgoing_report_multi_invoice(accrual_postings, caplog):
 
    output = run_outgoing('rt:310', accrual_postings)
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: Mx\. 310 <mx310@example\.org>$',
 
        r'^TOTAL TO PAY: \$420.00$',
 
    ])
 

	
 
def test_outgoing_report_without_rt_id(accrual_postings, caplog):
 
    invoice = 'rt://ticket/515/attachments/5150'
 
    output = run_outgoing(invoice, accrual_postings)
 
    related = accruals_by_meta(
 
        accrual_postings, invoice, wrap_type=accrual.AccrualPostings,
 
    )
 
    output = run_outgoing(None, related)
 
    assert caplog.records
 
    log = caplog.records[0]
 
    assert log.message.startswith(
 
        f"can't generate outgoings report for {invoice} because no RT ticket available:",
 
    )
 
    assert not output.getvalue()
 

	
 
def run_aging_report(postings, today=None):
 
    if today is None:
 
        today = datetime.date.today()
 
    postings = (
 
        post for post in postings
 
        if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
 
    )
 
    groups = {
 
        key: group
 
        for _, related in accrual.AccrualPostings.group_by_meta(postings, 'invoice')
 
        for key, group in related.make_consistent()
 
    }
 
    output = io.BytesIO()
 
    rt_client = RTClient()
 
    report = accrual.AgingReport(rt_client, output, today)
 
    report.run(groups)
 
    return output
 

	
 
def test_aging_report(accrual_postings):
 
    output = run_aging_report(accrual_postings)
 
    check_aging_ods(output)
 

	
 
@pytest.mark.parametrize('date,recv_end,pay_end', [
 
    # Both these dates are chosen for their off-by-one potential:
 
    # the first is exactly 30 days after the 2010-06-10 payable;
 
    # the second is exactly 60 days after the 2010-05-15 receivable.
 
    (datetime.date(2010, 7, 10), 1, 4),
 
    (datetime.date(2010, 7, 14), 2, 4),
 
    (datetime.date(2010, 7, 10), 1, 5),
 
    (datetime.date(2010, 7, 14), 2, 5),
 
])
 
def test_aging_report_date_cutoffs(accrual_postings, date, recv_end, pay_end):
 
    expect_recv = AGING_AR[:recv_end]
 
    expect_pay = AGING_AP[:pay_end]
 
    if 10 <= date.day < 12:
 
        # Take the 60 USD posting out of the invoice 510/6100 payable.
 
        expect_pay[-1] = expect_pay[-1]._replace(
 
            at_cost=testutil.Amount(expect_pay[-1].at_cost.number - 60),
 
        )
 
    output = run_aging_report(accrual_postings, date)
 
    check_aging_ods(output, date, expect_recv, expect_pay)
 

	
 
def test_aging_report_entity_consistency(accrual_postings):
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:480'
 
        and post.units.number < 0
 
    ))
 
    check_aging_ods(output, None, [], [
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyA', 125, 'rt:480/4800'),
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyB', 125, 'rt:480/4800'),
 
    ])
 

	
 
def test_aging_report_does_not_include_too_recent_postings(accrual_postings):
 
    # This date is after the Q3 posting, but too soon after for that to be
 
    # included in the aging report.
 
    date = datetime.date(2010, 10, 1)
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:470'
 
    ), date)
 
    # Date+amount are both from the Q2 posting only.
 
    check_aging_ods(output, date, [
 
        AgingRow.make_simple('2010-06-15', 'GrantCo', 5500, 'rt:470/4700',
 
                             project='Development Grant'),
 
    ], [])
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/accruals.beancount'),
 
            rt_client=RTClient(),
 
        )
 
    output = io.StringIO()
 
    errors = io.StringIO()
 
    retcode = accrual.main(arglist, output, errors, config)
 
    return retcode, output, errors
 

	
 
def check_main_fails(arglist, config, error_flags, error_patterns):
 
    retcode, output, errors = run_main(arglist, config)
 
    assert retcode > 16
 
    assert (retcode - 16) & error_flags
 
    check_output(errors, error_patterns)
 
    assert not output.getvalue()
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['--report-type=balance', 'entity=EarlyBird'],
 
    ['--report-type=outgoing', 'entity=EarlyBird'],
 
])
 
def test_output_excludes_payments(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    output.seek(0)
 
    for line in output:
 
        assert not re.match(r'\brt:4\d\b', line)
 

	
 
@pytest.mark.parametrize('arglist,expect_invoice', [
 
    (['40'], 'rt:40/400'),
 
    (['44/440'], 'rt:44/440'),
 
])
 
def test_output_payments_when_only_match(arglist, expect_invoice):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        rf'^{re.escape(expect_invoice)}:$',
 
        r' outstanding since ',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['510'],
 
    ['510/6100'],
 
    ['entity=Lawyer'],
 
@pytest.mark.parametrize('arglist,expect_amount', [
 
    (['310'], 420),
 
    (['310/3120'], 220),
 
    (['entity=Vendor'], 420),
 
])
 
def test_main_outgoing_report(arglist):
 
def test_main_outgoing_report(arglist, expect_amount):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    rt_url = RTClient.DEFAULT_URL[:-9]
 
    rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=510>')
 
    contract_url = re.escape(f'<{rt_url}Ticket/Attachment/4000/4000/contract.pdf>')
 
    rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=310>')
 
    contract_url = re.escape(f'<{rt_url}Ticket/Attachment/3120/3120/VendorContract.pdf>')
 
    check_output(output, [
 
        r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
 
        r'^TOTAL TO PAY: \$280\.00$',
 
        r'^\s*2010-06-12\s',
 
        r'^\s+Expenses:FilingFees\s+60\.00 USD$',
 
        r'^REQUESTOR: Mx\. 310 <mx310@example\.org>$',
 
        rf'^TOTAL TO PAY: \${expect_amount}\.00$',
 
        r'^\s*2010-04-30\s',
 
        r'^\s+Expenses:Travel\s+220 USD$',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['-t', 'balance'],
 
    ['515/5150'],
 
    ['entity=MatchingProgram'],
 
])
 
def test_main_balance_report(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        r'\brt://ticket/515/attachments/5150:$',
 
        r'^\s+1,500\.00 USD outstanding since 2010-05-15$',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    [],
 
    ['-t', 'aging', 'entity=Lawyer'],
 
])
 
def test_main_aging_report(tmp_path, arglist):
 
    if arglist:
 
        recv_rows = [row for row in AGING_AR if 'Lawyer' in row.entity]
 
        pay_rows = [row for row in AGING_AP if 'Lawyer' in row.entity]
 
    else:
 
        recv_rows = AGING_AR
 
        pay_rows = AGING_AP
 
    output_path = tmp_path / 'AgingReport.ods'
 
    arglist.insert(0, f'--output-file={output_path}')
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    assert not output.getvalue()
 
    with output_path.open('rb') as ods_file:
 
        check_aging_ods(ods_file, None, recv_rows, pay_rows)
 

	
 
def test_main_no_books():
 
    check_main_fails([], testutil.TestConfig(), 1 | 8, [
 
        r':[01]: +no books to load in configuration\b',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['499'],
 
    ['505/99999'],
 
    ['entity=NonExistent'],
 
])
 
def test_main_no_matches(arglist):
 
    check_main_fails(arglist, None, 8, [
 
        r': WARNING: no matching entries found to report$',
 
    ])
 

	
 
def test_main_no_rt():
 
    config = testutil.TestConfig(
 
        books_path=testutil.test_path('books/accruals.beancount'),
 
    )
 
    check_main_fails(['-t', 'out'], config, 4, [
 
        r': ERROR: unable to generate outgoing report: RT client is required\b',
 
    ])
0 comments (0 inline, 0 general)