Changeset - f52ad4fbc1cc
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-11 17:07:14
brettcsmith@brettcsmith.org
reports: Add RelatedPostings.first_meta_links() method.

Basically moving this from AccrualPostings into the superclass.
3 files changed with 33 insertions and 5 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -172,71 +172,68 @@ class AccrualPostings(core.RelatedPostings):
 
        'account',
 
        'entity',
 
        'invoice',
 
    )
 
    INCONSISTENT = Sentinel()
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        super().__init__(source, _can_own=_can_own)
 
        # The following type declarations tell mypy about values set in the for
 
        # loop that are important enough to be referenced directly elsewhere.
 
        self.account = self._single_item(post.account for post in self)
 
        if isinstance(self.account, Sentinel):
 
            self.accrual_type: Optional[AccrualAccount] = None
 
            norm_func: Callable[[T], T] = lambda x: x
 
            entity_pred: Callable[[data.Posting], bool] = bool
 
        else:
 
            self.accrual_type = AccrualAccount.by_account(self.account)
 
            norm_func = self.accrual_type.normalize_amount
 
            entity_pred = lambda post: norm_func(post.units).number > 0
 
        self.entity = self._single_item(self.entities(entity_pred))
 
        self.invoice = self._single_item(self.first_links('invoice'))
 
        self.invoice = self._single_item(self.first_meta_links('invoice', None))
 
        self.end_balance = norm_func(self.balance_at_cost())
 

	
 
    def _single_item(self, seq: Iterable[T]) -> Union[T, Sentinel]:
 
        items = iter(seq)
 
        try:
 
            item1 = next(items)
 
        except StopIteration:
 
            all_same = False
 
        else:
 
            all_same = all(item == item1 for item in items)
 
        return item1 if all_same else self.INCONSISTENT
 

	
 
    def entities(self, pred: Callable[[data.Posting], bool]=bool) -> Iterator[MetaValue]:
 
        return filters.iter_unique(
 
            post.meta['entity']
 
            for post in self
 
            if pred(post) and 'entity' in post.meta
 
        )
 

	
 
    def first_links(self, key: MetaKey, default: Optional[str]=None) -> Iterator[Optional[str]]:
 
        return (post.meta.first_link(key, default) for post in self)
 

	
 
    def make_consistent(self) -> Iterator[Tuple[MetaValue, 'AccrualPostings']]:
 
        account_ok = isinstance(self.account, str)
 
        entity_ok = isinstance(self.entity, str)
 
        # `'/' in self.invoice` is just our heuristic to ensure that the
 
        # invoice metadata is "unique enough," and not just a placeholder
 
        # value like "FIXME". It can be refined if needed.
 
        invoice_ok = isinstance(self.invoice, str) and '/' in self.invoice
 
        if account_ok and entity_ok and invoice_ok:
 
            yield (self.invoice, self)
 
            return
 
        groups = collections.defaultdict(list)
 
        for post in self:
 
            post_invoice = self.invoice if invoice_ok else (
 
                post.meta.get('invoice') or 'BlankInvoice'
 
            )
 
            post_entity = self.entity if entity_ok else (
 
                post.meta.get('entity') or 'BlankEntity'
 
            )
 
            groups[f'{post.account} {post_invoice} {post_entity}'].append(post)
 
        type_self = type(self)
 
        for group_key, posts in groups.items():
 
            yield group_key, type_self(posts, _can_own=True)
 

	
 
    def is_paid(self, default: Optional[bool]=None) -> Optional[bool]:
...
 
@@ -485,49 +482,49 @@ class AgingReport(BaseReport):
 
             if related.entity is related.INCONSISTENT
 
             else related.entity),
 
        ))
 
        self.ods.write(rows)
 
        self.ods.save_file(self.out_bin)
 

	
 

	
 
class BalanceReport(BaseReport):
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        date_s = posts[0].meta.date.strftime('%Y-%m-%d')
 
        if index:
 
            yield ""
 
        yield f"{posts.invoice}:"
 
        yield f"  {posts.balance_at_cost()} outstanding since {date_s}"
 

	
 

	
 
class OutgoingReport(BaseReport):
 
    def __init__(self, rt_client: rt.Rt, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_client = rt_client
 
        self.rt_wrapper = rtutil.RT(rt_client)
 

	
 
    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
 
        rt_ids = {url for url in posts.first_links('rt-id') if url is not None}
 
        rt_ids = list(posts.first_meta_links('rt-id'))
 
        rt_ids_count = len(rt_ids)
 
        if rt_ids_count != 1:
 
            raise ValueError(f"{rt_ids_count} rt-id links found")
 
        parsed = rtutil.RT.parse(rt_ids.pop())
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            self.logger.error(
 
                "can't generate outgoings report for %s because no RT ticket available: %s",
 
                posts.invoice, errmsg,
 
            )
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -283,48 +283,65 @@ class RelatedPostings(Sequence[data.Posting]):
 
    @overload
 
    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...
 

	
 
    def __getitem__(self: RelatedType,
 
                    index: Union[int, slice],
 
    ) -> Union[data.Posting, RelatedType]:
 
        if isinstance(index, slice):
 
            return type(self)(self._postings[index], _can_own=True)
 
        else:
 
            return self._postings[index]
 

	
 
    def __len__(self) -> int:
 
        return len(self._postings)
 

	
 
    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        for post in self:
 
            try:
 
                yield from post.meta.get_links(key)
 
            except TypeError:
 
                pass
 

	
 
    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        return filters.iter_unique(self._all_meta_links(key))
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...
 

	
 
    def first_meta_links(self,
 
                         key: MetaKey,
 
                         default: Optional[str]='',
 
    ) -> Iterator[Optional[str]]:
 
        retval = filters.iter_unique(
 
            post.meta.first_link(key, default) for post in self
 
        )
 
        if default == '':
 
            retval = (s for s in retval if s)
 
        return retval
 

	
 
    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
 
        balance = MutableBalance()
 
        for post in self:
 
            balance += post.units
 
            yield post, balance
 

	
 
    def balance(self) -> Balance:
 
        for _, balance in self.iter_with_balance():
 
            pass
 
        try:
 
            return balance
 
        except NameError:
 
            return Balance()
 

	
 
    def balance_at_cost(self) -> Balance:
 
        balance = MutableBalance()
 
        for post in self:
 
            if post.cost is None:
 
                balance += post.units
 
            else:
 
                number = post.units.number * post.cost.number
 
                balance += data.Amount(number, post.cost.currency)
 
        return balance
 

	
tests/test_reports_related_postings.py
Show inline comments
...
 
@@ -254,48 +254,62 @@ def test_all_meta_links_zero(count):
 
    assert next(related.all_meta_links('approval'), None) is None
 

	
 
def test_all_meta_links_singletons():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, statement=value, _meta_type=data.Metadata,
 
    ) for value in itertools.chain(
 
        testutil.NON_LINK_METADATA_STRINGS,
 
        testutil.LINK_METADATA_STRINGS,
 
        testutil.NON_STRING_METADATA_VALUES,
 
    ))
 
    assert set(related.all_meta_links('statement')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_multiples():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=' '.join(value), _meta_type=data.Metadata,
 
    ) for value in itertools.permutations(testutil.LINK_METADATA_STRINGS, 2))
 
    assert set(related.all_meta_links('approval')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_preserves_order():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=c, _meta_type=data.Metadata,
 
    ) for c in '121323')
 
    assert list(related.all_meta_links('approval')) == list('123')
 

	
 
def test_first_meta_links():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
 
    ) for value in ['1 2', '', '1 3', testutil.PAST_DATE, '2 3', None])
 
    del related[-1].meta['contract']
 
    assert list(related.first_meta_links('contract')) == list('12')
 

	
 
def test_first_meta_links_fallback():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
 
    ) for value in ['1 2', testutil.PAST_DATE, '1 3', None, '2 3'])
 
    del related[-2].meta['contract']
 
    assert list(related.first_meta_links('contract', None)) == ['1', None, '2']
 

	
 
def test_group_by_meta_zero():
 
    assert not list(core.RelatedPostings.group_by_meta([], 'metacurrency'))
 

	
 
def test_group_by_meta_one(credit_card_cycle):
 
    posting = next(post for post in data.Posting.from_entries(credit_card_cycle)
 
                   if post.account.is_credit_card())
 
    actual = core.RelatedPostings.group_by_meta([posting], 'metacurrency')
 
    assert set(key for key, _ in actual) == {'USD'}
 

	
 
def test_group_by_meta_many(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
 
    assert set(actual) == {'USD', 'EUR'}
 
    for key, group in actual.items():
 
        assert 2 <= len(group) <= 3
 
        assert group.balance().is_zero()
 

	
 
def test_group_by_meta_many_single_posts(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
 
    assert set(actual) == {post.units.number for post in postings}
 
    assert len(actual) == len(postings)
0 comments (0 inline, 0 general)