Changeset - 1124842ea7e0
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-11 20:29:18
brettcsmith@brettcsmith.org
accrual: Actually use RT caching as intended.

Basically none of the reports were reading or writing the RT link cache
because they didn't instantiate an rtutil.RT properly to do that.
3 files changed with 18 insertions and 18 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -168,585 +168,584 @@ class AccrualAccount(enum.Enum):
 
class AccrualPostings(core.RelatedPostings):
 
    __slots__ = (
 
        'accrual_type',
 
        'end_balance',
 
        'account',
 
        'entity',
 
        'invoice',
 
    )
 
    INCONSISTENT = Sentinel()
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        super().__init__(source, _can_own=_can_own)
 
        # The following type declarations tell mypy about values set in the for
 
        # loop that are important enough to be referenced directly elsewhere.
 
        self.account = self._single_item(post.account for post in self)
 
        if isinstance(self.account, Sentinel):
 
            self.accrual_type: Optional[AccrualAccount] = None
 
            norm_func: Callable[[T], T] = lambda x: x
 
            entity_pred: Callable[[data.Posting], bool] = bool
 
        else:
 
            self.accrual_type = AccrualAccount.by_account(self.account)
 
            norm_func = self.accrual_type.normalize_amount
 
            entity_pred = lambda post: norm_func(post.units).number > 0
 
        self.entity = self._single_item(self.entities(entity_pred))
 
        self.invoice = self._single_item(self.first_meta_links('invoice', None))
 
        self.end_balance = norm_func(self.balance_at_cost())
 

	
 
    def _single_item(self, seq: Iterable[T]) -> Union[T, Sentinel]:
 
        items = iter(seq)
 
        try:
 
            item1 = next(items)
 
        except StopIteration:
 
            all_same = False
 
        else:
 
            all_same = all(item == item1 for item in items)
 
        return item1 if all_same else self.INCONSISTENT
 

	
 
    def entities(self, pred: Callable[[data.Posting], bool]=bool) -> Iterator[MetaValue]:
 
        return filters.iter_unique(
 
            post.meta['entity']
 
            for post in self
 
            if pred(post) and 'entity' in post.meta
 
        )
 

	
 
    def make_consistent(self) -> Iterator[Tuple[MetaValue, 'AccrualPostings']]:
 
        account_ok = isinstance(self.account, str)
 
        entity_ok = isinstance(self.entity, str)
 
        # `'/' in self.invoice` is just our heuristic to ensure that the
 
        # invoice metadata is "unique enough," and not just a placeholder
 
        # value like "FIXME". It can be refined if needed.
 
        invoice_ok = isinstance(self.invoice, str) and '/' in self.invoice
 
        if account_ok and entity_ok and invoice_ok:
 
            yield (self.invoice, self)
 
            return
 
        groups = collections.defaultdict(list)
 
        for post in self:
 
            post_invoice = self.invoice if invoice_ok else (
 
                post.meta.get('invoice') or 'BlankInvoice'
 
            )
 
            post_entity = self.entity if entity_ok else (
 
                post.meta.get('entity') or 'BlankEntity'
 
            )
 
            groups[f'{post.account} {post_invoice} {post_entity}'].append(post)
 
        type_self = type(self)
 
        for group_key, posts in groups.items():
 
            yield group_key, type_self(posts, _can_own=True)
 

	
 
    def is_paid(self, default: Optional[bool]=None) -> Optional[bool]:
 
        if self.accrual_type is None:
 
            return default
 
        else:
 
            return self.end_balance.le_zero()
 

	
 
    def is_zero(self, default: Optional[bool]=None) -> Optional[bool]:
 
        if self.accrual_type is None:
 
            return default
 
        else:
 
            return self.end_balance.is_zero()
 

	
 
    def since_last_nonzero(self) -> 'AccrualPostings':
 
        for index, (post, balance) in enumerate(self.iter_with_balance()):
 
            if balance.is_zero():
 
                start_index = index
 
        try:
 
            empty = start_index == index
 
        except NameError:
 
            empty = True
 
        return self if empty else self[start_index + 1:]
 

	
 
    @property
 
    def rt_id(self) -> Union[str, None, Sentinel]:
 
        return self._single_item(self.first_meta_links('rt-id', None))
 

	
 

	
 
class BaseReport:
 
    def __init__(self, out_file: TextIO) -> None:
 
        self.out_file = out_file
 
        self.logger = logger.getChild(type(self).__name__)
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        raise NotImplementedError("BaseReport._report")
 

	
 
    def run(self, groups: PostGroups) -> None:
 
        for index, invoice in enumerate(groups):
 
            for line in self._report(groups[invoice], index):
 
                print(line, file=self.out_file)
 

	
 

	
 
class AgingODS(core.BaseODS[AccrualPostings, Optional[data.Account]]):
 
    COLUMNS = [
 
        'Date',
 
        'Entity',
 
        'Invoice Amount',
 
        'Booked Amount',
 
        'Project',
 
        'Ticket',
 
        'Invoice',
 
        'Approval',
 
        'Contract',
 
        'Purchase Order',
 
    ]
 
    COL_COUNT = len(COLUMNS)
 

	
 
    def __init__(self,
 
                 rt_client: rt.Rt,
 
                 rt_wrapper: rtutil.RT,
 
                 date: datetime.date,
 
                 logger: logging.Logger,
 
    ) -> None:
 
        super().__init__()
 
        self.rt_client = rt_client
 
        self.rt_wrapper = rtutil.RT(self.rt_client)
 
        self.rt_wrapper = rt_wrapper
 
        self.date = date
 
        self.logger = logger
 

	
 
    def init_styles(self) -> None:
 
        super().init_styles()
 
        self.style_widecol = self.replace_child(
 
            self.document.automaticstyles,
 
            odf.style.Style,
 
            name='WideCol',
 
        )
 
        self.style_widecol.setAttribute('family', 'table-column')
 
        self.style_widecol.addElement(odf.style.TableColumnProperties(
 
            columnwidth='1.25in',
 
        ))
 

	
 
    def section_key(self, row: AccrualPostings) -> Optional[data.Account]:
 
        if isinstance(row.account, str):
 
            return row.account
 
        else:
 
            return None
 

	
 
    def start_spreadsheet(self) -> None:
 
        for accrual_type in AccrualAccount:
 
            self.use_sheet(accrual_type.name.title())
 
            for index in range(self.COL_COUNT):
 
                stylename = self.style_widecol if index else ''
 
                self.sheet.addElement(odf.table.TableColumn(stylename=stylename))
 
            self.add_row(*(
 
                self.string_cell(name, stylename=self.style_bold)
 
                for name in self.COLUMNS
 
            ))
 
            self.lock_first_row()
 

	
 
    def start_section(self, key: Optional[data.Account]) -> None:
 
        if key is None:
 
            return
 
        self.age_thresholds = list(AccrualAccount.by_account(key).value.aging_thresholds)
 
        self.age_balances = [core.MutableBalance() for _ in self.age_thresholds]
 
        accrual_date = self.date - datetime.timedelta(days=self.age_thresholds[-1])
 
        acct_parts = key.slice_parts()
 
        self.use_sheet(acct_parts[1])
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"{' '.join(acct_parts[2:])} {acct_parts[1]} Aging Report"
 
            f" Accrued by {accrual_date.isoformat()} Unpaid by {self.date.isoformat()}",
 
            stylename=self.merge_styles(self.style_bold, self.style_centertext),
 
            numbercolumnsspanned=self.COL_COUNT,
 
        ))
 
        self.add_row()
 

	
 
    def end_section(self, key: Optional[data.Account]) -> None:
 
        if key is None:
 
            return
 
        total_balance = core.MutableBalance()
 
        text_style = self.merge_styles(self.style_bold, self.style_endtext)
 
        text_span = 4
 
        last_age_text: Optional[str] = None
 
        self.add_row()
 
        for threshold, balance in zip(self.age_thresholds, self.age_balances):
 
            years, days = divmod(threshold, 365)
 
            years_text = f"{years} {'Year' if years == 1 else 'Years'}"
 
            days_text = f"{days} Days"
 
            if years and days:
 
                age_text = f"{years_text} {days_text}"
 
            elif years:
 
                age_text = years_text
 
            else:
 
                age_text = days_text
 
            if last_age_text is None:
 
                age_range = f"Over {age_text}"
 
            else:
 
                age_range = f"{age_text}–{last_age_text}"
 
            self.add_row(
 
                self.string_cell(
 
                    f"Total Aged {age_range}: ",
 
                    stylename=text_style,
 
                    numbercolumnsspanned=text_span,
 
                ),
 
                *(odf.table.TableCell() for _ in range(1, text_span)),
 
                self.balance_cell(balance),
 
            )
 
            last_age_text = age_text
 
            total_balance += balance
 
        self.add_row(
 
            self.string_cell(
 
                "Total Unpaid: ",
 
                stylename=text_style,
 
                numbercolumnsspanned=text_span,
 
            ),
 
            *(odf.table.TableCell() for _ in range(1, text_span)),
 
            self.balance_cell(total_balance),
 
        )
 

	
 
    def _link_seq(self, row: AccrualPostings, key: MetaKey) -> Iterator[Tuple[str, str]]:
 
        for href in row.all_meta_links(key):
 
            text: Optional[str] = None
 
            rt_ids = self.rt_wrapper.parse(href)
 
            if rt_ids is not None:
 
                ticket_id, attachment_id = rt_ids
 
                if attachment_id is None:
 
                    text = f'RT#{ticket_id}'
 
                href = self.rt_wrapper.url(ticket_id, attachment_id) or href
 
            else:
 
                # '..' pops the ODS filename off the link path. In other words,
 
                # make the link relative to the directory the ODS is in.
 
                href = f'../{href}'
 
            if text is None:
 
                href_path = Path(urlparse.urlparse(href).path)
 
                text = urlparse.unquote(href_path.name)
 
            yield (href, text)
 

	
 
    def write_row(self, row: AccrualPostings) -> None:
 
        age = (self.date - row[0].meta.date).days
 
        if row.end_balance.ge_zero():
 
            for index, threshold in enumerate(self.age_thresholds):
 
                if age >= threshold:
 
                    self.age_balances[index] += row.end_balance
 
                    break
 
            else:
 
                return
 
        raw_balance = row.balance()
 
        if row.accrual_type is not None:
 
            raw_balance = row.accrual_type.normalize_amount(raw_balance)
 
        if raw_balance == row.end_balance:
 
            amount_cell = odf.table.TableCell()
 
        else:
 
            amount_cell = self.balance_cell(raw_balance)
 
        projects = {post.meta.get('project') or None for post in row}
 
        projects.discard(None)
 
        self.add_row(
 
            self.date_cell(row[0].meta.date),
 
            self.multiline_cell(row.entities()),
 
            amount_cell,
 
            self.balance_cell(row.end_balance),
 
            self.multiline_cell(sorted(projects)),
 
            self.multilink_cell(self._link_seq(row, 'rt-id')),
 
            self.multilink_cell(self._link_seq(row, 'invoice')),
 
            self.multilink_cell(self._link_seq(row, 'approval')),
 
            self.multilink_cell(self._link_seq(row, 'contract')),
 
            self.multilink_cell(self._link_seq(row, 'purchase-order')),
 
        )
 

	
 

	
 
class AgingReport(BaseReport):
 
    def __init__(self,
 
                 rt_client: rt.Rt,
 
                 rt_wrapper: rtutil.RT,
 
                 out_file: BinaryIO,
 
                 date: Optional[datetime.date]=None,
 
    ) -> None:
 
        if date is None:
 
            date = datetime.date.today()
 
        self.out_bin = out_file
 
        self.logger = logger.getChild(type(self).__name__)
 
        self.ods = AgingODS(rt_client, date, self.logger)
 
        self.ods = AgingODS(rt_wrapper, date, self.logger)
 

	
 
    def run(self, groups: PostGroups) -> None:
 
        rows: List[AccrualPostings] = []
 
        for group in groups.values():
 
            if group.is_zero():
 
                # Cheap optimization: don't slice and dice groups we're not
 
                # going to report anyway.
 
                continue
 
            elif group.accrual_type is None:
 
                group = group.since_last_nonzero()
 
            else:
 
                # Filter out new accruals after the report date.
 
                # e.g., cover the case that the same invoices has multiple
 
                # postings over time, and we don't want to report too-recent
 
                # ones.
 
                cutoff_date = self.ods.date - datetime.timedelta(
 
                    days=group.accrual_type.value.aging_thresholds[-1],
 
                )
 
                group = AccrualPostings(
 
                    post for post in group.since_last_nonzero()
 
                    if post.meta.date <= cutoff_date
 
                    or group.accrual_type.normalize_amount(post.units.number) < 0
 
                )
 
            if group and not group.is_zero():
 
                rows.append(group)
 
        rows.sort(key=lambda related: (
 
            related.account,
 
            related[0].meta.date,
 
            ('\0'.join(related.entities())
 
             if related.entity is related.INCONSISTENT
 
             else related.entity),
 
        ))
 
        self.ods.write(rows)
 
        self.ods.save_file(self.out_bin)
 

	
 

	
 
class BalanceReport(BaseReport):
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        date_s = posts[0].meta.date.strftime('%Y-%m-%d')
 
        if index:
 
            yield ""
 
        yield f"{posts.invoice}:"
 
        yield f"  {posts.balance_at_cost()} outstanding since {date_s}"
 

	
 

	
 
class OutgoingReport(BaseReport):
 
    def __init__(self, rt_client: rt.Rt, out_file: TextIO) -> None:
 
    def __init__(self, rt_wrapper: rtutil.RT, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_client = rt_client
 
        self.rt_wrapper = rtutil.RT(rt_client)
 
        self.rt_wrapper = rt_wrapper
 
        self.rt_client = rt_wrapper.rt
 

	
 
    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
 
        rt_id = posts.rt_id
 
        if rt_id is None:
 
            raise ValueError("no rt-id links found")
 
        elif isinstance(rt_id, Sentinel):
 
            raise ValueError("multiple rt-id links found")
 
        parsed = rtutil.RT.parse(rt_id)
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            self.logger.error(
 
                "can't generate outgoings report for %s because no RT ticket available: %s",
 
                posts.invoice, errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
 
                or ''
 
            )
 
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
 

	
 
        balance_s = posts.end_balance.format(None)
 
        raw_balance = -posts.balance()
 
        if raw_balance != posts.end_balance:
 
            balance_s = f'{raw_balance} ({balance_s})'
 

	
 
        contract_links = list(posts.all_meta_links('contract'))
 
        if contract_links:
 
            contract_s = ' , '.join(self.rt_wrapper.iter_urls(
 
                contract_links, missing_fmt='<BROKEN RT LINK: {}>',
 
            ))
 
        else:
 
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
 
        projects = [v for v in posts.meta_values('project')
 
                    if isinstance(v, str)]
 

	
 
        yield "PAYMENT FOR APPROVAL:"
 
        yield f"REQUESTOR: {requestor}"
 
        yield f"TOTAL TO PAY: {balance_s}"
 
        yield f"AGREEMENT: {contract_s}"
 
        yield f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}"
 
        yield f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}"
 
        yield f"PROJECT: {', '.join(projects)}"
 
        yield "\nBEANCOUNT ENTRIES:\n"
 

	
 
        last_txn: Optional[Transaction] = None
 
        for post in posts:
 
            txn = post.meta.txn
 
            if txn is not last_txn:
 
                last_txn = txn
 
                txn = self.rt_wrapper.txn_with_urls(txn, '{}')
 
                yield bc_printer.format_entry(txn)
 

	
 

	
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    # 2 was used in the past, it can probably be reclaimed.
 
    REPORT_ERRORS = 4
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, one of `aging`, `balance`, or
 
`outgoing`. If not specified, the default is `aging` when no search terms are
 
given, `outgoing` for search terms that return a single outstanding payable,
 
and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
 
        type=int,
 
        default=-1,
 
        help="""How far back to search the books for related transactions.
 
You can either specify a fiscal year, or a negative offset from the current
 
fiscal year, to start loading entries from. The default is -1 (start from the
 
previous fiscal year).
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for the balance and outgoing reports, and a generated
 
filename for other reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None and not args.search_terms:
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    if cliutil.is_main_script(PROGNAME):
 
        global logger
 
        logger = logging.getLogger(PROGNAME)
 
        sys.excepthook = cliutil.ExceptHook(logger)
 
    args = parse_arguments(arglist)
 
    cliutil.setup_logger(logger, args.loglevel, stderr)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    elif args.report_type is ReportType.AGING:
 
        entries, load_errors, _ = books_loader.load_all()
 
    else:
 
        entries, load_errors, _ = books_loader.load_all(args.since)
 
    filters.remove_opening_balance_txn(entries)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 

	
 
    postings = list(filter_search(
 
        data.Posting.from_entries(entries), args.search_terms,
 
    ))
 
    if not postings:
 
        logger.warning("no matching entries found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 
    groups: PostGroups
 
    if args.report_type is None or args.report_type is ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.group_by_first_meta_link(postings, 'rt-id'))
 
        if (args.report_type is None
 
            and len(groups) == 1
 
            and all(g.accrual_type is AccrualAccount.PAYABLE and not g.is_paid()
 
                    for g in groups.values())
 
        ):
 
            args.report_type = ReportType.OUTGOING
 
    if args.report_type is not ReportType.OUTGOING:
 
        groups = {
 
            key: group
 
            for _, source in AccrualPostings.group_by_first_meta_link(postings, 'invoice')
 
            for key, group in source.make_consistent()
 
        }
 
    if args.report_type is not ReportType.AGING:
 
        groups = {
 
            key: posts for key, posts in groups.items() if not posts.is_paid()
 
        } or groups
 
    del postings
 

	
 
    report: Optional[BaseReport] = None
 
    output_path: Optional[Path] = None
 
    if args.report_type is ReportType.AGING:
 
        rt_client = config.rt_client()
 
        if rt_client is None:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate aging report: RT client is required")
 
        else:
 
            now = datetime.datetime.now()
 
            if args.output_file is None:
 
                args.output_file = Path(now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods'))
 
                logger.info("Writing report to %s", args.output_file)
 
            out_bin = cliutil.bytes_output(args.output_file, stdout)
 
            report = AgingReport(rt_client, out_bin)
 
            report = AgingReport(rt_wrapper, out_bin)
 
    elif args.report_type is ReportType.OUTGOING:
 
        rt_client = config.rt_client()
 
        if rt_client is None:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate outgoing report: RT client is required")
 
        else:
 
            out_file = cliutil.text_output(args.output_file, stdout)
 
            report = OutgoingReport(rt_client, out_file)
 
            report = OutgoingReport(rt_wrapper, out_file)
 
    else:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = BalanceReport(out_file)
 

	
 
    if report is None:
 
        returncode |= ReturnFlag.REPORT_ERRORS
 
    else:
 
        report.run(groups)
 
    return 0 if returncode == 0 else 16 + returncode
 

	
 
if __name__ == '__main__':
 
    exit(main())
setup.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.1.9',
 
    version='1.1.10',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'babel>=2.6',  # Debian:python3-babel
 
        'beancount>=2.2',  # Debian:beancount
 
        # 1.4.1 crashes when trying to save some documents.
 
        'odfpy>=1.4.0,!=1.4.1',  # Debian:python3-odf
 
        'PyYAML>=3.0',  # Debian:python3-yaml
 
        'regex',  # Debian:python3-regex
 
        'rt>=2.0',
 
    ],
 
    setup_requires=[
 
        'pytest-mypy',
 
        'pytest-runner',  # Debian:python3-pytest-runner
 
    ],
 
    tests_require=[
 
        'mypy>=0.770',  # Debian:python3-mypy
 
        'pytest',  # Debian:python3-pytest
 
    ],
 

	
 
    packages=[
 
        'conservancy_beancount',
 
        'conservancy_beancount.plugin',
 
        'conservancy_beancount.reports',
 
    ],
 
    entry_points={
 
        'console_scripts': [
 
            'accrual-report = conservancy_beancount.reports.accrual:main',
 
        ],
 
    },
 
)
tests/test_reports_accrual.py
Show inline comments
...
 
@@ -300,358 +300,359 @@ def test_accrual_postings_entities_no_duplicates():
 
        (ACCOUNTS[0], -10, {'entity': 'Other'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    actual = related.entities()
 
    assert next(actual, None) == 'Accruee'
 
    assert next(actual, None) == 'Other'
 
    assert next(actual, None) is None
 

	
 
def test_accrual_postings_inconsistent_account():
 
    meta = {'invoice': 'invoice.pdf'}
 
    txn = testutil.Transaction(postings=[
 
        (acct_name, index, meta)
 
        for index, acct_name in enumerate(ACCOUNTS)
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.account is related.INCONSISTENT
 

	
 
def test_accrual_postings_rt_id():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:90'}),
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:90 rt:92'}),
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:90 rt:94 rt:92'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.rt_id == 'rt:90'
 

	
 
def test_accrual_postings_rt_id_inconsistent():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:96'}),
 
        (ACCOUNTS[0], 10, {'rt-id': 'rt:98 rt:96'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.rt_id is related.INCONSISTENT
 

	
 
def test_accrual_postings_rt_id_none():
 
    txn = testutil.Transaction(postings=[
 
        (ACCOUNTS[0], 10),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    assert related.rt_id is None
 

	
 
@pytest.mark.parametrize('acct_name,invoice,day', testutil.combine_values(
 
    ACCOUNTS,
 
    ['FIXME', '', None, *testutil.NON_STRING_METADATA_VALUES],
 
    itertools.count(1),
 
))
 
def test_make_consistent_bad_invoice(acct_name, invoice, day):
 
    txn = testutil.Transaction(date=datetime.date(2019, 1, day), postings=[
 
        (acct_name, index * 10, {'invoice': invoice, 'entity': f'BadInvoice{day}'})
 
        for index in range(1, 4)
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = dict(related.make_consistent())
 
    assert len(consistent) == 1
 
    key = next(iter(consistent))
 
    assert acct_name in key
 
    if invoice:
 
        assert str(invoice) in key
 
    actual = consistent[key]
 
    assert actual
 
    assert len(actual) == 3
 
    for act_post, exp_post in zip(actual, txn.postings):
 
        assert act_post.units == exp_post.units
 
        assert act_post.meta.get('invoice') == invoice
 

	
 
def test_make_consistent_across_accounts():
 
    invoice = 'Invoices/CrossAccount.pdf'
 
    txn = testutil.Transaction(date=datetime.date(2019, 2, 1), postings=[
 
        (acct_name, 100, {'invoice': invoice, 'entity': 'CrossAccount'})
 
        for acct_name in ACCOUNTS
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = dict(related.make_consistent())
 
    assert len(consistent) == len(ACCOUNTS)
 
    for key, posts in consistent.items():
 
        assert len(posts) == 1
 
        assert posts.account in key
 

	
 
def test_make_consistent_both_invoice_and_account():
 
    txn = testutil.Transaction(date=datetime.date(2019, 2, 2), postings=[
 
        (acct_name, 150) for acct_name in ACCOUNTS
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = dict(related.make_consistent())
 
    assert len(consistent) == len(ACCOUNTS)
 
    for key, posts in consistent.items():
 
        assert len(posts) == 1
 
        assert posts.account in key
 

	
 
@pytest.mark.parametrize('acct_name', ACCOUNTS)
 
def test_make_consistent_across_entity(acct_name):
 
    amt_sign = operator.pos if acct_name.startswith('Assets') else operator.neg
 
    txn = testutil.Transaction(postings=[
 
        (acct_name, amt_sign(n), {'invoice': 'Inv/1.pdf', 'entity': f'Entity{n}'})
 
        for n in range(1, 4)
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = dict(related.make_consistent())
 
    assert len(consistent) == 3
 
    for key, posts in consistent.items():
 
        assert len(posts) == 1
 
        entities = posts.entities()
 
        assert next(entities, None) == posts.entity
 
        assert next(entities, None) is None
 
        assert posts.entity in key
 

	
 
@pytest.mark.parametrize('acct_name', ACCOUNTS)
 
def test_make_consistent_entity_differs_accrual_payment(acct_name):
 
    invoice = 'Invoices/DifferPay.pdf'
 
    txn = testutil.Transaction(postings=[
 
        # Depending on the account, the order of the accrual and payment might
 
        # be swapped here, but that shouldn't matter.
 
        (acct_name, 125, {'invoice': invoice, 'entity': 'Positive'}),
 
        (acct_name, -125, {'invoice': invoice, 'entity': 'Negative'}),
 
    ])
 
    related = accrual.AccrualPostings(data.Posting.from_txn(txn))
 
    consistent = related.make_consistent()
 
    _, actual = next(consistent)
 
    assert actual is related
 
    assert next(consistent, None) is None
 

	
 
def check_output(output, expect_patterns):
 
    output.seek(0)
 
    testutil.check_lines_match(iter(output), expect_patterns)
 

	
 
def run_outgoing(rt_id, postings, rt_client=None):
 
    if rt_client is None:
 
        rt_client = RTClient()
 
    rt_wrapper = rtutil.RT(rt_client)
 
    if not isinstance(postings, core.RelatedPostings):
 
        postings = accruals_by_meta(postings, rt_id, 'rt-id', wrap_type=accrual.AccrualPostings)
 
    output = io.StringIO()
 
    report = accrual.OutgoingReport(rt_client, output)
 
    report = accrual.OutgoingReport(rt_wrapper, output)
 
    report.run({rt_id: postings})
 
    return output
 

	
 
@pytest.mark.parametrize('invoice,expected', [
 
    ('rt:505/5050', "Zero balance outstanding since 2010-05-05"),
 
    ('rt:510/5100', "Zero balance outstanding since 2010-05-10"),
 
    ('rt:510/6100', "-280.00 USD outstanding since 2010-06-10"),
 
    ('rt://ticket/515/attachments/5150', "1,500.00 USD outstanding since 2010-05-15",),
 
])
 
def test_balance_report(accrual_postings, invoice, expected, caplog):
 
    related = accruals_by_meta(accrual_postings, invoice, wrap_type=accrual.AccrualPostings)
 
    output = io.StringIO()
 
    report = accrual.BalanceReport(output)
 
    report.run({invoice: related})
 
    assert not caplog.records
 
    check_output(output, [invoice, expected])
 

	
 
def test_outgoing_report(accrual_postings, caplog):
 
    output = run_outgoing('rt:510', accrual_postings)
 
    rt_url = RTClient.DEFAULT_URL[:-9]
 
    rt_id_url = rf'\b{re.escape(f"{rt_url}Ticket/Display.html?id=510")}\b'
 
    contract_url = rf'\b{re.escape(f"{rt_url}Ticket/Attachment/4000/4000/contract.pdf")}\b'
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: Mx\. 510 <mx510@example\.org>$',
 
        r'^TOTAL TO PAY: \$280\.00$',
 
        fr'^AGREEMENT: {contract_url}',
 
        r'^PAYMENT TO: Hon\. Mx\. 510$',
 
        r'^PAYMENT METHOD: payment method 510$',
 
        r'^BEANCOUNT ENTRIES:$',
 
        # For each transaction, check for the date line, a metadata, and the
 
        # Expenses posting.
 
        r'^\s*2010-06-10\s',
 
        fr'^\s+rt-id: "{rt_id_url}"$',
 
        r'^\s+Expenses:Services:Legal\s+220\.00 USD$',
 
        r'^\s*2010-06-12\s',
 
        fr'^\s+contract: "{contract_url}"$',
 
        r'^\s+Expenses:FilingFees\s+60\.00 USD$',
 
    ])
 

	
 
def test_outgoing_report_custom_field_fallbacks(accrual_postings, caplog):
 
    rt_client = RTClient(want_cfs=False)
 
    output = run_outgoing('rt:510', accrual_postings, rt_client)
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: <mx510@example\.org>$',
 
        r'^PAYMENT TO:\s*$',
 
        r'^PAYMENT METHOD:\s*$',
 
    ])
 

	
 
def test_outgoing_report_fx_amounts(accrual_postings, caplog):
 
    output = run_outgoing('rt:520 rt:525', accrual_postings)
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: Mx\. 520 <mx520@example\.org>$',
 
        r'^TOTAL TO PAY: 1,000\.00 EUR \(\$1,100.00\)$',
 
    ])
 

	
 
def test_outgoing_report_multi_invoice(accrual_postings, caplog):
 
    output = run_outgoing('rt:310', accrual_postings)
 
    assert not caplog.records
 
    check_output(output, [
 
        r'^PAYMENT FOR APPROVAL:$',
 
        r'^REQUESTOR: Mx\. 310 <mx310@example\.org>$',
 
        r'^TOTAL TO PAY: \$420.00$',
 
    ])
 

	
 
def test_outgoing_report_without_rt_id(accrual_postings, caplog):
 
    invoice = 'rt://ticket/515/attachments/5150'
 
    related = accruals_by_meta(
 
        accrual_postings, invoice, wrap_type=accrual.AccrualPostings,
 
    )
 
    output = run_outgoing(None, related)
 
    assert caplog.records
 
    log = caplog.records[0]
 
    assert log.message.startswith(
 
        f"can't generate outgoings report for {invoice} because no RT ticket available:",
 
    )
 
    assert not output.getvalue()
 

	
 
def run_aging_report(postings, today=None):
 
    if today is None:
 
        today = datetime.date.today()
 
    postings = (
 
        post for post in postings
 
        if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
 
    )
 
    groups = {
 
        key: group
 
        for _, related in accrual.AccrualPostings.group_by_meta(postings, 'invoice')
 
        for key, group in related.make_consistent()
 
    }
 
    output = io.BytesIO()
 
    rt_client = RTClient()
 
    report = accrual.AgingReport(rt_client, output, today)
 
    rt_wrapper = rtutil.RT(RTClient())
 
    report = accrual.AgingReport(rt_wrapper, output, today)
 
    report.run(groups)
 
    return output
 

	
 
def test_aging_report(accrual_postings):
 
    output = run_aging_report(accrual_postings)
 
    check_aging_ods(output)
 

	
 
@pytest.mark.parametrize('date,recv_end,pay_end', [
 
    # Both these dates are chosen for their off-by-one potential:
 
    # the first is exactly 30 days after the 2010-06-10 payable;
 
    # the second is exactly 60 days after the 2010-05-15 receivable.
 
    (datetime.date(2010, 7, 10), 1, 5),
 
    (datetime.date(2010, 7, 14), 2, 5),
 
])
 
def test_aging_report_date_cutoffs(accrual_postings, date, recv_end, pay_end):
 
    expect_recv = AGING_AR[:recv_end]
 
    expect_pay = AGING_AP[:pay_end]
 
    if 10 <= date.day < 12:
 
        # Take the 60 USD posting out of the invoice 510/6100 payable.
 
        expect_pay[-1] = expect_pay[-1]._replace(
 
            at_cost=testutil.Amount(expect_pay[-1].at_cost.number - 60),
 
        )
 
    output = run_aging_report(accrual_postings, date)
 
    check_aging_ods(output, date, expect_recv, expect_pay)
 

	
 
def test_aging_report_entity_consistency(accrual_postings):
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:480'
 
        and post.units.number < 0
 
    ))
 
    check_aging_ods(output, None, [], [
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyA', 125, 'rt:480/4800'),
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyB', 125, 'rt:480/4800'),
 
    ])
 

	
 
def test_aging_report_does_not_include_too_recent_postings(accrual_postings):
 
    # This date is after the Q3 posting, but too soon after for that to be
 
    # included in the aging report.
 
    date = datetime.date(2010, 10, 1)
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:470'
 
    ), date)
 
    # Date+amount are both from the Q2 posting only.
 
    check_aging_ods(output, date, [
 
        AgingRow.make_simple('2010-06-15', 'GrantCo', 5500, 'rt:470/4700',
 
                             project='Development Grant'),
 
    ], [])
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/accruals.beancount'),
 
            rt_client=RTClient(),
 
        )
 
    output = io.StringIO()
 
    errors = io.StringIO()
 
    retcode = accrual.main(arglist, output, errors, config)
 
    return retcode, output, errors
 

	
 
def check_main_fails(arglist, config, error_flags, error_patterns):
 
    retcode, output, errors = run_main(arglist, config)
 
    assert retcode > 16
 
    assert (retcode - 16) & error_flags
 
    check_output(errors, error_patterns)
 
    assert not output.getvalue()
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['--report-type=balance', 'entity=EarlyBird'],
 
    ['--report-type=outgoing', 'entity=EarlyBird'],
 
])
 
def test_output_excludes_payments(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    output.seek(0)
 
    for line in output:
 
        assert not re.match(r'\brt:4\d\b', line)
 

	
 
@pytest.mark.parametrize('arglist,expect_invoice', [
 
    (['40'], 'rt:40/400'),
 
    (['44/440'], 'rt:44/440'),
 
])
 
def test_output_payments_when_only_match(arglist, expect_invoice):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        rf'^{re.escape(expect_invoice)}:$',
 
        r' outstanding since ',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist,expect_amount', [
 
    (['310'], 420),
 
    (['310/3120'], 220),
 
    (['entity=Vendor'], 420),
 
])
 
def test_main_outgoing_report(arglist, expect_amount):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    rt_url = RTClient.DEFAULT_URL[:-9]
 
    rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=310>')
 
    contract_url = re.escape(f'<{rt_url}Ticket/Attachment/3120/3120/VendorContract.pdf>')
 
    check_output(output, [
 
        r'^REQUESTOR: Mx\. 310 <mx310@example\.org>$',
 
        rf'^TOTAL TO PAY: \${expect_amount}\.00$',
 
        r'^\s*2010-04-30\s',
 
        r'^\s+Expenses:Travel\s+220 USD$',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['-t', 'balance'],
 
    ['515/5150'],
 
    ['entity=MatchingProgram'],
 
])
 
def test_main_balance_report(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        r'\brt://ticket/515/attachments/5150:$',
 
        r'^\s+1,500\.00 USD outstanding since 2010-05-15$',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    [],
0 comments (0 inline, 0 general)