Changeset - 638bc8ccbb77
[Not reviewed]
0 1 0
Brett Smith - 4 years ago 2020-07-02 17:07:59
brettcsmith@brettcsmith.org
accrual: Make comment not an actual type:ignore.
1 file changed with 1 insertions and 1 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -281,385 +281,385 @@ class AgingODS(core.BaseODS[AccrualPostings, data.Account]):
 
                if index == 0:
 
                    style: Union[str, odf.style.Style] = ''
 
                elif index < 6:
 
                    style = self.column_style(1.2)
 
                else:
 
                    style = self.column_style(1.5)
 
                self.sheet.addElement(odf.table.TableColumn(stylename=style))
 
            self.add_row(*(
 
                self.string_cell(name, stylename=self.style_bold)
 
                for name in self.COLUMNS
 
            ))
 
            self.lock_first_row()
 

	
 
    def start_section(self, key: data.Account) -> None:
 
        accrual_type = AccrualAccount.by_account(key)
 
        self.norm_func = accrual_type.normalize_amount
 
        self.age_thresholds = list(accrual_type.value.aging_thresholds)
 
        self.age_thresholds.append(-sys.maxsize)
 
        self.age_balances = [core.MutableBalance() for _ in self.age_thresholds]
 
        self.age_styles = [
 
            self.merge_styles(self.style_date, self.border_style(
 
                core.Border.LEFT, '10pt', 'solid', color,
 
            )) for color in self.AGE_COLORS
 
        ]
 
        acct_parts = key.slice_parts()
 
        self.use_sheet(acct_parts[1])
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"{' '.join(acct_parts[2:])} {acct_parts[1]} Aging Report"
 
            f" for {self.date.isoformat()}",
 
            stylename=self.merge_styles(self.style_bold, self.style_centertext),
 
            numbercolumnsspanned=self.COL_COUNT,
 
        ))
 
        self.add_row()
 

	
 
    def end_section(self, key: data.Account) -> None:
 
        total_balance = core.MutableBalance()
 
        text_span = 4
 
        last_age_text: Optional[str] = None
 
        self.add_row()
 
        for threshold, balance, style in zip(
 
                self.age_thresholds, self.age_balances, self.age_styles,
 
        ):
 
            years, days = divmod(threshold, 365)
 
            years_text = f"{years} {'Year' if years == 1 else 'Years'}"
 
            days_text = f"{days} Days"
 
            if years and days:
 
                age_text = f"{years_text} {days_text}"
 
            elif years:
 
                age_text = years_text
 
            else:
 
                age_text = days_text
 
            if last_age_text is None:
 
                age_range = f"Over {age_text}"
 
            elif threshold < 0:
 
                self.add_row(
 
                    self.string_cell(
 
                        f"Total Unpaid Over {last_age_text}: ",
 
                        stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
                        numbercolumnsspanned=text_span,
 
                    ),
 
                    *(odf.table.TableCell() for _ in range(1, text_span)),
 
                    self.balance_cell(total_balance),
 
                )
 
                age_range = f"Under {last_age_text}"
 
            else:
 
                age_range = f"{age_text}–{last_age_text}"
 
            self.add_row(
 
                self.string_cell(
 
                    f"Total Aged {age_range}: ",
 
                    stylename=self.merge_styles(self.style_bold, self.style_endtext, style),
 
                    numbercolumnsspanned=text_span,
 
                ),
 
                *(odf.table.TableCell() for _ in range(1, text_span)),
 
                self.balance_cell(balance),
 
            )
 
            last_age_text = age_text
 
            total_balance += balance
 
        self.add_row(
 
            self.string_cell(
 
                "Total Unpaid: ",
 
                stylename=self.merge_styles(self.style_bold, self.style_endtext),
 
                numbercolumnsspanned=text_span,
 
            ),
 
            *(odf.table.TableCell() for _ in range(1, text_span)),
 
            self.balance_cell(total_balance),
 
        )
 

	
 
    def write_row(self, row: AccrualPostings) -> None:
 
        row_date = row[0].meta.date
 
        row_balance = self.norm_func(row.balance_at_cost())
 
        age = (self.date - row_date).days
 
        for index, threshold in enumerate(self.age_thresholds):
 
            if age >= threshold:
 
                if row_balance.ge_zero():
 
                    self.age_balances[index] += row_balance
 
                break
 
        else:
 
            return
 
        raw_balance = self.norm_func(row.balance())
 
        if raw_balance == row_balance:
 
            amount_cell = odf.table.TableCell()
 
        else:
 
            amount_cell = self.balance_cell(raw_balance)
 
        entities = row.meta_values('entity')
 
        entities.discard(None)
 
        projects = row.meta_values('project')
 
        projects.discard(None)
 
        self.add_row(
 
            self.date_cell(row_date, stylename=self.age_styles[index]),
 
            self.multiline_cell(sorted(entities)),
 
            amount_cell,
 
            self.balance_cell(row_balance),
 
            self.multiline_cell(sorted(projects)),
 
            *(self.meta_links_cell(row.all_meta_links(key))
 
              for key in self.DOC_COLUMNS),
 
        )
 

	
 

	
 
class AgingReport(BaseReport):
 
    def __init__(self,
 
                 rt_wrapper: rtutil.RT,
 
                 out_file: BinaryIO,
 
                 date: Optional[datetime.date]=None,
 
    ) -> None:
 
        if date is None:
 
            date = datetime.date.today()
 
        self.out_bin = out_file
 
        self.logger = logger.getChild(type(self).__name__)
 
        self.ods = AgingODS(rt_wrapper, date, self.logger)
 

	
 
    def run(self, groups: PostGroups) -> None:
 
        rows = [group for group in groups.values()
 
                if not group.balance_at_cost().is_zero()]
 
        rows.sort(key=lambda group: (
 
            group[0].account,
 
            group[0].meta.date,
 
            abs(sum(amt.number for amt in group.balance_at_cost().values())),
 
        ))
 
        self.ods.write(rows)
 
        self.ods.save_file(self.out_bin)
 

	
 

	
 
class BalanceReport(BaseReport):
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        meta = posts[0].meta
 
        date_s = meta.date.strftime('%Y-%m-%d')
 
        entity_s = meta.get('entity', '<no entity>')
 
        invoice_s = meta.get('invoice', '<no invoice>')
 
        balance_s = posts.balance_at_cost().format(zero="Zero balance")
 
        if index:
 
            yield ""
 
        yield f"{entity_s} {invoice_s}:"
 
        yield f"  {balance_s} outstanding since {date_s}"
 

	
 

	
 
class OutgoingReport(BaseReport):
 
    class PaymentMethods(enum.Enum):
 
        ach = 'ACH'
 
        check = 'Check'
 
        creditcard = 'Credit Card'
 
        debitcard = 'Debit Card'
 
        paypal = 'PayPal'
 
        vendorportal = 'Vendor Portal'
 
        wire = 'Wire'
 
        fxwire = wire
 
        uswire = wire
 

	
 

	
 
    def __init__(self, rt_wrapper: rtutil.RT, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_wrapper = rt_wrapper
 
        self.rt_client = rt_wrapper.rt
 

	
 
    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
 
        rt_ids = posts.first_meta_links('rt-id')
 
        rt_id = next(rt_ids, None)
 
        rt_id2 = next(rt_ids, None)
 
        if rt_id is None:
 
            raise ValueError("no rt-id links found")
 
        elif rt_id2 is not None:
 
            raise ValueError("multiple rt-id links found")
 
        parsed = rtutil.RT.parse(rt_id)
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _get_payment_method(self, posts: AccrualPostings, ticket_id: str) -> Optional[str]:
 
        payment_methods = posts.meta_values('payment-method')
 
        payment_methods.discard(None)
 
        if all(isinstance(s, str) for s in payment_methods):
 
            # type:ignore for <https://github.com/python/mypy/issues/7853>
 
            # type ignore for <https://github.com/python/mypy/issues/7853>
 
            payment_methods = {s.strip().lower() for s in payment_methods}  # type:ignore[union-attr]
 
        log_prefix = f"cannot set payment-method for rt:{ticket_id}:"
 
        payment_method_count = len(payment_methods)
 
        if payment_method_count != 1:
 
            self.logger.warning("%s %s metadata values found",
 
                                log_prefix, payment_method_count)
 
            return None
 
        payment_method = payment_methods.pop()
 
        if not isinstance(payment_method, str):
 
            self.logger.warning("%s %r is not a string value",
 
                                log_prefix, payment_method)
 
            return None
 
        try:
 
            currency, method_key = payment_method.split(None, 1)
 
        except ValueError:
 
            self.logger.warning("%s no method specified in %r",
 
                                log_prefix, payment_method)
 
            return None
 
        curr_match = re.fullmatch(r'[a-z]{3}', currency)
 
        if curr_match is None:
 
            self.logger.warning("%s invalid currency %r",
 
                                log_prefix, currency)
 
        try:
 
            method_enum = self.PaymentMethods[method_key]
 
        except KeyError:
 
            self.logger.warning("%s invalid method %r",
 
                                log_prefix, method_key)
 
            curr_match = None
 
        if curr_match is None:
 
            return None
 
        else:
 
            return f'{currency.upper()} {method_enum.value}'
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            meta = posts[0].meta
 
            self.logger.error(
 
                "can't generate outgoings report for %s %s %s because no RT ticket available: %s",
 
                meta.date.isoformat(),
 
                meta.get('entity', '<no entity>'),
 
                meta.get('invoice', '<no invoice>'),
 
                errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
 
                or ''
 
            )
 
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
 

	
 
        balance = -posts.balance_at_cost()
 
        balance_s = balance.format(None)
 
        raw_balance = -posts.balance()
 
        payment_amount = raw_balance.format('¤¤ #,##0.00')
 
        if raw_balance != balance:
 
            payment_amount += f' ({balance_s})'
 
            balance_s = f'{raw_balance} ({balance_s})'
 

	
 
        payment_to = ticket.get('CF.{payment-to}') or requestor_name
 
        contract_links = list(posts.all_meta_links('contract'))
 
        if contract_links:
 
            contract_s = ' , '.join(self.rt_wrapper.iter_urls(
 
                contract_links, missing_fmt='<BROKEN RT LINK: {}>',
 
            ))
 
        else:
 
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
 
        projects = [v for v in posts.meta_values('project')
 
                    if isinstance(v, str)]
 

	
 
        yield "PAYMENT FOR APPROVAL:"
 
        yield f"REQUESTOR: {requestor}"
 
        yield f"PAYMENT TO: {payment_to}"
 
        yield f"TOTAL TO PAY: {balance_s}"
 
        yield f"AGREEMENT: {contract_s}"
 
        yield f"PROJECT: {', '.join(projects)}"
 
        yield "\nBEANCOUNT ENTRIES:\n"
 

	
 
        last_txn: Optional[Transaction] = None
 
        for post in posts:
 
            txn = post.meta.txn
 
            if txn is not last_txn:
 
                last_txn = txn
 
                txn = self.rt_wrapper.txn_with_urls(txn, '{}')
 
                # Suppress payment-method metadata from the report.
 
                txn.meta.pop('payment-method', None)
 
                for txn_post in txn.postings:
 
                    if txn_post.meta:
 
                        txn_post.meta.pop('payment-method', None)
 
                yield bc_printer.format_entry(txn)
 

	
 
        cf_targets = {
 
            'payment-amount': payment_amount,
 
            'payment-method': (self._get_payment_method(posts, ticket_id)
 
                               or ticket.get('CF.{payment-method}')),
 
            'payment-to': payment_to,
 
        }
 

	
 
        cf_updates = {
 
            f'CF_{key}': value
 
            for key, value in cf_targets.items()
 
            if ticket.get(f'CF.{{{key}}}') != value
 
        }
 
        if cf_updates:
 
            try:
 
                ok = self.rt_client.edit_ticket(ticket_id, **cf_updates)
 
            except rt.RtError:
 
                self.logger.debug("RT exception on edit_ticket", exc_info=True)
 
                ok = False
 
            if not ok:
 
                self.logger.warning("failed to set custom fields for rt:%s", ticket_id)
 

	
 

	
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    # 2 was used in the past, it can probably be reclaimed.
 
    REPORT_ERRORS = 4
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, one of `aging`, `balance`, or
 
`outgoing`. If not specified, the default is `aging` when no search terms are
 
given, `outgoing` for search terms that return a single outstanding payable,
 
and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
 
        type=int,
 
        default=0,
 
        help="""How far back to search the books for related transactions.
 
You can either specify a fiscal year, or a negative offset from the current
 
fiscal year, to start loading entries from. The default is to load the current,
 
unaudited books.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for the balance and outgoing reports, and a generated
 
filename for other reports.
0 comments (0 inline, 0 general)