Changeset - f52ad4fbc1cc
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-11 17:07:14
reports: Add RelatedPostings.first_meta_links() method.

Basically moving this from AccrualPostings into the superclass.
3 files changed with 33 insertions and 5 deletions:
0 comments (0 inline, 0 general)
Show inline comments
@@ -148,119 +148,116 @@ class AccrualAccount(enum.Enum):
    def by_account(cls, name: data.Account) -> 'AccrualAccount':
        for account in cls:
            if name.is_under(
                return account
        raise ValueError(f"unrecognized account {name!r}")

    def classify(cls, related: core.RelatedPostings) -> 'AccrualAccount':
        for account in cls:
            account_name =
            if all(post.account.is_under(account_name) for post in related):
                return account
        raise ValueError("unrecognized account set in related postings")

    def normalize_amount(self) -> Callable[[T], T]:
        return core.normalize_amount_func(


class AccrualPostings(core.RelatedPostings):
    __slots__ = (
    INCONSISTENT = Sentinel()

    def __init__(self,
                 source: Iterable[data.Posting]=(),
                 _can_own: bool=False,
    ) -> None:
        super().__init__(source, _can_own=_can_own)
        # The following type declarations tell mypy about values set in the for
        # loop that are important enough to be referenced directly elsewhere.
        self.account = self._single_item(post.account for post in self)
        if isinstance(self.account, Sentinel):
            self.accrual_type: Optional[AccrualAccount] = None
            norm_func: Callable[[T], T] = lambda x: x
            entity_pred: Callable[[data.Posting], bool] = bool
            self.accrual_type = AccrualAccount.by_account(self.account)
            norm_func = self.accrual_type.normalize_amount
            entity_pred = lambda post: norm_func(post.units).number > 0
        self.entity = self._single_item(self.entities(entity_pred))
        self.invoice = self._single_item(self.first_links('invoice'))
        self.invoice = self._single_item(self.first_meta_links('invoice', None))
        self.end_balance = norm_func(self.balance_at_cost())

    def _single_item(self, seq: Iterable[T]) -> Union[T, Sentinel]:
        items = iter(seq)
            item1 = next(items)
        except StopIteration:
            all_same = False
            all_same = all(item == item1 for item in items)
        return item1 if all_same else self.INCONSISTENT

    def entities(self, pred: Callable[[data.Posting], bool]=bool) -> Iterator[MetaValue]:
        return filters.iter_unique(
            for post in self
            if pred(post) and 'entity' in post.meta

    def first_links(self, key: MetaKey, default: Optional[str]=None) -> Iterator[Optional[str]]:
        return (post.meta.first_link(key, default) for post in self)

    def make_consistent(self) -> Iterator[Tuple[MetaValue, 'AccrualPostings']]:
        account_ok = isinstance(self.account, str)
        entity_ok = isinstance(self.entity, str)
        # `'/' in self.invoice` is just our heuristic to ensure that the
        # invoice metadata is "unique enough," and not just a placeholder
        # value like "FIXME". It can be refined if needed.
        invoice_ok = isinstance(self.invoice, str) and '/' in self.invoice
        if account_ok and entity_ok and invoice_ok:
            yield (self.invoice, self)
        groups = collections.defaultdict(list)
        for post in self:
            post_invoice = self.invoice if invoice_ok else (
                post.meta.get('invoice') or 'BlankInvoice'
            post_entity = self.entity if entity_ok else (
                post.meta.get('entity') or 'BlankEntity'
            groups[f'{post.account} {post_invoice} {post_entity}'].append(post)
        type_self = type(self)
        for group_key, posts in groups.items():
            yield group_key, type_self(posts, _can_own=True)

    def is_paid(self, default: Optional[bool]=None) -> Optional[bool]:
        if self.accrual_type is None:
            return default
            return self.end_balance.le_zero()

    def is_zero(self, default: Optional[bool]=None) -> Optional[bool]:
        if self.accrual_type is None:
            return default
            return self.end_balance.is_zero()

    def since_last_nonzero(self) -> 'AccrualPostings':
        for index, (post, balance) in enumerate(self.iter_with_balance()):
            if balance.is_zero():
                start_index = index
            empty = start_index == index
        except NameError:
            empty = True
        return self if empty else self[start_index + 1:]


class BaseReport:
    def __init__(self, out_file: TextIO) -> None:
@@ -461,97 +458,97 @@ class AgingReport(BaseReport):
                # Cheap optimization: don't slice and dice groups we're not
                # going to report anyway.
            elif group.accrual_type is None:
                group = group.since_last_nonzero()
                # Filter out new accruals after the report date.
                # e.g., cover the case that the same invoices has multiple
                # postings over time, and we don't want to report too-recent
                # ones.
                cutoff_date = - datetime.timedelta(
                group = AccrualPostings(
                    post for post in group.since_last_nonzero()
                    if <= cutoff_date
                    or group.accrual_type.normalize_amount(post.units.number) < 0
            if group and not group.is_zero():
        rows.sort(key=lambda related: (
             if related.entity is related.INCONSISTENT
             else related.entity),


class BalanceReport(BaseReport):
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
        posts = posts.since_last_nonzero()
        date_s = posts[0]'%Y-%m-%d')
        if index:
            yield ""
        yield f"{posts.invoice}:"
        yield f"  {posts.balance_at_cost()} outstanding since {date_s}"


class OutgoingReport(BaseReport):
    def __init__(self, rt_client: rt.Rt, out_file: TextIO) -> None:
        self.rt_client = rt_client
        self.rt_wrapper = rtutil.RT(rt_client)

    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
        rt_ids = {url for url in posts.first_links('rt-id') if url is not None}
        rt_ids = list(posts.first_meta_links('rt-id'))
        rt_ids_count = len(rt_ids)
        if rt_ids_count != 1:
            raise ValueError(f"{rt_ids_count} rt-id links found")
        parsed = rtutil.RT.parse(rt_ids.pop())
        if parsed is None:
            raise ValueError("rt-id is not a valid RT reference")
            return parsed

    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
        posts = posts.since_last_nonzero()
            ticket_id, _ = self._primary_rt_id(posts)
            ticket = self.rt_client.get_ticket(ticket_id)
            # Note we only use this when ticket is None.
            errmsg = f"ticket {ticket_id} not found"
        except (ValueError, rt.RtError) as error:
            ticket = None
            errmsg = error.args[0]
        if ticket is None:
                "can't generate outgoings report for %s because no RT ticket available: %s",
                posts.invoice, errmsg,

            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
        except (IndexError, rt.RtError):
            rt_requestor = None
        if rt_requestor is None:
            requestor = ''
            requestor_name = ''
            requestor_name = (
                or ticket.get('CF.{payment-to}')
                or ''
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()

        balance_s = posts.end_balance.format(None)
        raw_balance = -posts.balance()
        if raw_balance != posts.end_balance:
            balance_s = f'{raw_balance} ({balance_s})'

        contract_links = list(posts.all_meta_links('contract'))
        if contract_links:
Show inline comments
@@ -259,96 +259,113 @@ class RelatedPostings(Sequence[data.Posting]):
    def group_by_meta(cls: Type[RelatedType],
                      postings: Iterable[data.Posting],
                      key: MetaKey,
                      default: Optional[MetaValue]=None,
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
        """Relate postings by metadata value

        This method takes an iterable of postings and returns a mapping.
        The keys of the mapping are the values of post.meta.get(key, default).
        The values are RelatedPostings instances that contain all the postings
        that had that same metadata value.
        mapping: DefaultDict[Optional[MetaValue], List[data.Posting]] = collections.defaultdict(list)
        for post in postings:
            mapping[post.meta.get(key, default)].append(post)
        for value, posts in mapping.items():
            yield value, cls(posts, _can_own=True)

    def __repr__(self) -> str:
        return f'<{type(self).__name__} {self._postings!r}>'

    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...

    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...

    def __getitem__(self: RelatedType,
                    index: Union[int, slice],
    ) -> Union[data.Posting, RelatedType]:
        if isinstance(index, slice):
            return type(self)(self._postings[index], _can_own=True)
            return self._postings[index]

    def __len__(self) -> int:
        return len(self._postings)

    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
        for post in self:
                yield from post.meta.get_links(key)
            except TypeError:

    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
        return filters.iter_unique(self._all_meta_links(key))

    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...

    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...

    def first_meta_links(self,
                         key: MetaKey,
                         default: Optional[str]='',
    ) -> Iterator[Optional[str]]:
        retval = filters.iter_unique(
            post.meta.first_link(key, default) for post in self
        if default == '':
            retval = (s for s in retval if s)
        return retval

    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
        balance = MutableBalance()
        for post in self:
            balance += post.units
            yield post, balance

    def balance(self) -> Balance:
        for _, balance in self.iter_with_balance():
            return balance
        except NameError:
            return Balance()

    def balance_at_cost(self) -> Balance:
        balance = MutableBalance()
        for post in self:
            if post.cost is None:
                balance += post.units
                number = post.units.number * post.cost.number
                balance += data.Amount(number, post.cost.currency)
        return balance

    def meta_values(self,
                    key: MetaKey,
                    default: Optional[MetaValue]=None,
    ) -> Set[Optional[MetaValue]]:
        return {post.meta.get(key, default) for post in self}


class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta):
    """Abstract base class to help write spreadsheets

    This class provides the very core logic to write an arbitrary set of data
    rows to arbitrary output. It calls hooks when it starts writing the
    spreadsheet, starts a new "section" of rows, ends a section, and ends the

    RT is the type of the input data rows. ST is the type of the section
    identifier that you create from each row. If you don't want to use the
    section logic at all, set ST to None and define section_key to return None.

    def section_key(self, row: RT) -> ST:
        """Return the section a row belongs to

Show inline comments
@@ -230,72 +230,86 @@ def test_meta_values_all_match_default_given():
    related = core.RelatedPostings([
        testutil.Posting('Income:Donations', -1, key='1'),
        testutil.Posting('Income:Donations', -2, key='2'),
    assert related.meta_values('key', '') == {'1', '2'}

def test_meta_values_many_types():
    expected = {
, 4, 1),
    related = core.RelatedPostings(
        testutil.Posting('Income:Donations', -index, key=value)
        for index, value in enumerate(expected)
    assert related.meta_values('key') == expected

@pytest.mark.parametrize('count', range(3))
def test_all_meta_links_zero(count):
    related = core.RelatedPostings(testutil.Posting(
        'Income:Donations', -n, testkey=str(n), _meta_type=data.Metadata,
    ) for n in range(count))
    assert next(related.all_meta_links('approval'), None) is None

def test_all_meta_links_singletons():
    related = core.RelatedPostings(testutil.Posting(
        'Income:Donations', -10, statement=value, _meta_type=data.Metadata,
    ) for value in itertools.chain(
    assert set(related.all_meta_links('statement')) == testutil.LINK_METADATA_STRINGS

def test_all_meta_links_multiples():
    related = core.RelatedPostings(testutil.Posting(
        'Income:Donations', -10, approval=' '.join(value), _meta_type=data.Metadata,
    ) for value in itertools.permutations(testutil.LINK_METADATA_STRINGS, 2))
    assert set(related.all_meta_links('approval')) == testutil.LINK_METADATA_STRINGS

def test_all_meta_links_preserves_order():
    related = core.RelatedPostings(testutil.Posting(
        'Income:Donations', -10, approval=c, _meta_type=data.Metadata,
    ) for c in '121323')
    assert list(related.all_meta_links('approval')) == list('123')

def test_first_meta_links():
    related = core.RelatedPostings(testutil.Posting(
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
    ) for value in ['1 2', '', '1 3', testutil.PAST_DATE, '2 3', None])
    del related[-1].meta['contract']
    assert list(related.first_meta_links('contract')) == list('12')

def test_first_meta_links_fallback():
    related = core.RelatedPostings(testutil.Posting(
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
    ) for value in ['1 2', testutil.PAST_DATE, '1 3', None, '2 3'])
    del related[-2].meta['contract']
    assert list(related.first_meta_links('contract', None)) == ['1', None, '2']

def test_group_by_meta_zero():
    assert not list(core.RelatedPostings.group_by_meta([], 'metacurrency'))

def test_group_by_meta_one(credit_card_cycle):
    posting = next(post for post in data.Posting.from_entries(credit_card_cycle)
                   if post.account.is_credit_card())
    actual = core.RelatedPostings.group_by_meta([posting], 'metacurrency')
    assert set(key for key, _ in actual) == {'USD'}

def test_group_by_meta_many(two_accruals_three_payments):
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
                if post.account == 'Assets:Receivable:Accounts']
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
    assert set(actual) == {'USD', 'EUR'}
    for key, group in actual.items():
        assert 2 <= len(group) <= 3
        assert group.balance().is_zero()

def test_group_by_meta_many_single_posts(two_accruals_three_payments):
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
                if post.account == 'Assets:Receivable:Accounts']
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
    assert set(actual) == {post.units.number for post in postings}
    assert len(actual) == len(postings)
0 comments (0 inline, 0 general)