Changeset - f52ad4fbc1cc
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-11 17:07:14
brettcsmith@brettcsmith.org
reports: Add RelatedPostings.first_meta_links() method.

Basically moving this from AccrualPostings into the superclass.
3 files changed with 33 insertions and 5 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -148,119 +148,116 @@ class AccrualAccount(enum.Enum):
 
    @classmethod
 
    def by_account(cls, name: data.Account) -> 'AccrualAccount':
 
        for account in cls:
 
            if name.is_under(account.value.name):
 
                return account
 
        raise ValueError(f"unrecognized account {name!r}")
 

	
 
    @classmethod
 
    def classify(cls, related: core.RelatedPostings) -> 'AccrualAccount':
 
        for account in cls:
 
            account_name = account.value.name
 
            if all(post.account.is_under(account_name) for post in related):
 
                return account
 
        raise ValueError("unrecognized account set in related postings")
 

	
 
    @property
 
    def normalize_amount(self) -> Callable[[T], T]:
 
        return core.normalize_amount_func(self.value.name)
 

	
 

	
 
class AccrualPostings(core.RelatedPostings):
 
    __slots__ = (
 
        'accrual_type',
 
        'end_balance',
 
        'account',
 
        'entity',
 
        'invoice',
 
    )
 
    INCONSISTENT = Sentinel()
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        super().__init__(source, _can_own=_can_own)
 
        # The following type declarations tell mypy about values set in the for
 
        # loop that are important enough to be referenced directly elsewhere.
 
        self.account = self._single_item(post.account for post in self)
 
        if isinstance(self.account, Sentinel):
 
            self.accrual_type: Optional[AccrualAccount] = None
 
            norm_func: Callable[[T], T] = lambda x: x
 
            entity_pred: Callable[[data.Posting], bool] = bool
 
        else:
 
            self.accrual_type = AccrualAccount.by_account(self.account)
 
            norm_func = self.accrual_type.normalize_amount
 
            entity_pred = lambda post: norm_func(post.units).number > 0
 
        self.entity = self._single_item(self.entities(entity_pred))
 
        self.invoice = self._single_item(self.first_links('invoice'))
 
        self.invoice = self._single_item(self.first_meta_links('invoice', None))
 
        self.end_balance = norm_func(self.balance_at_cost())
 

	
 
    def _single_item(self, seq: Iterable[T]) -> Union[T, Sentinel]:
 
        items = iter(seq)
 
        try:
 
            item1 = next(items)
 
        except StopIteration:
 
            all_same = False
 
        else:
 
            all_same = all(item == item1 for item in items)
 
        return item1 if all_same else self.INCONSISTENT
 

	
 
    def entities(self, pred: Callable[[data.Posting], bool]=bool) -> Iterator[MetaValue]:
 
        return filters.iter_unique(
 
            post.meta['entity']
 
            for post in self
 
            if pred(post) and 'entity' in post.meta
 
        )
 

	
 
    def first_links(self, key: MetaKey, default: Optional[str]=None) -> Iterator[Optional[str]]:
 
        return (post.meta.first_link(key, default) for post in self)
 

	
 
    def make_consistent(self) -> Iterator[Tuple[MetaValue, 'AccrualPostings']]:
 
        account_ok = isinstance(self.account, str)
 
        entity_ok = isinstance(self.entity, str)
 
        # `'/' in self.invoice` is just our heuristic to ensure that the
 
        # invoice metadata is "unique enough," and not just a placeholder
 
        # value like "FIXME". It can be refined if needed.
 
        invoice_ok = isinstance(self.invoice, str) and '/' in self.invoice
 
        if account_ok and entity_ok and invoice_ok:
 
            yield (self.invoice, self)
 
            return
 
        groups = collections.defaultdict(list)
 
        for post in self:
 
            post_invoice = self.invoice if invoice_ok else (
 
                post.meta.get('invoice') or 'BlankInvoice'
 
            )
 
            post_entity = self.entity if entity_ok else (
 
                post.meta.get('entity') or 'BlankEntity'
 
            )
 
            groups[f'{post.account} {post_invoice} {post_entity}'].append(post)
 
        type_self = type(self)
 
        for group_key, posts in groups.items():
 
            yield group_key, type_self(posts, _can_own=True)
 

	
 
    def is_paid(self, default: Optional[bool]=None) -> Optional[bool]:
 
        if self.accrual_type is None:
 
            return default
 
        else:
 
            return self.end_balance.le_zero()
 

	
 
    def is_zero(self, default: Optional[bool]=None) -> Optional[bool]:
 
        if self.accrual_type is None:
 
            return default
 
        else:
 
            return self.end_balance.is_zero()
 

	
 
    def since_last_nonzero(self) -> 'AccrualPostings':
 
        for index, (post, balance) in enumerate(self.iter_with_balance()):
 
            if balance.is_zero():
 
                start_index = index
 
        try:
 
            empty = start_index == index
 
        except NameError:
 
            empty = True
 
        return self if empty else self[start_index + 1:]
 

	
 

	
 
class BaseReport:
 
    def __init__(self, out_file: TextIO) -> None:
...
 
@@ -461,97 +458,97 @@ class AgingReport(BaseReport):
 
                # Cheap optimization: don't slice and dice groups we're not
 
                # going to report anyway.
 
                continue
 
            elif group.accrual_type is None:
 
                group = group.since_last_nonzero()
 
            else:
 
                # Filter out new accruals after the report date.
 
                # e.g., cover the case that the same invoices has multiple
 
                # postings over time, and we don't want to report too-recent
 
                # ones.
 
                cutoff_date = self.ods.date - datetime.timedelta(
 
                    days=group.accrual_type.value.aging_thresholds[-1],
 
                )
 
                group = AccrualPostings(
 
                    post for post in group.since_last_nonzero()
 
                    if post.meta.date <= cutoff_date
 
                    or group.accrual_type.normalize_amount(post.units.number) < 0
 
                )
 
            if group and not group.is_zero():
 
                rows.append(group)
 
        rows.sort(key=lambda related: (
 
            related.account,
 
            related[0].meta.date,
 
            ('\0'.join(related.entities())
 
             if related.entity is related.INCONSISTENT
 
             else related.entity),
 
        ))
 
        self.ods.write(rows)
 
        self.ods.save_file(self.out_bin)
 

	
 

	
 
class BalanceReport(BaseReport):
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        date_s = posts[0].meta.date.strftime('%Y-%m-%d')
 
        if index:
 
            yield ""
 
        yield f"{posts.invoice}:"
 
        yield f"  {posts.balance_at_cost()} outstanding since {date_s}"
 

	
 

	
 
class OutgoingReport(BaseReport):
 
    def __init__(self, rt_client: rt.Rt, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_client = rt_client
 
        self.rt_wrapper = rtutil.RT(rt_client)
 

	
 
    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
 
        rt_ids = {url for url in posts.first_links('rt-id') if url is not None}
 
        rt_ids = list(posts.first_meta_links('rt-id'))
 
        rt_ids_count = len(rt_ids)
 
        if rt_ids_count != 1:
 
            raise ValueError(f"{rt_ids_count} rt-id links found")
 
        parsed = rtutil.RT.parse(rt_ids.pop())
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            self.logger.error(
 
                "can't generate outgoings report for %s because no RT ticket available: %s",
 
                posts.invoice, errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
 
                or ''
 
            )
 
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
 

	
 
        balance_s = posts.end_balance.format(None)
 
        raw_balance = -posts.balance()
 
        if raw_balance != posts.end_balance:
 
            balance_s = f'{raw_balance} ({balance_s})'
 

	
 
        contract_links = list(posts.all_meta_links('contract'))
 
        if contract_links:
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -259,96 +259,113 @@ class RelatedPostings(Sequence[data.Posting]):
 
    def group_by_meta(cls: Type[RelatedType],
 
                      postings: Iterable[data.Posting],
 
                      key: MetaKey,
 
                      default: Optional[MetaValue]=None,
 
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
 
        """Relate postings by metadata value
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of post.meta.get(key, default).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same metadata value.
 
        """
 
        mapping: DefaultDict[Optional[MetaValue], List[data.Posting]] = collections.defaultdict(list)
 
        for post in postings:
 
            mapping[post.meta.get(key, default)].append(post)
 
        for value, posts in mapping.items():
 
            yield value, cls(posts, _can_own=True)
 

	
 
    def __repr__(self) -> str:
 
        return f'<{type(self).__name__} {self._postings!r}>'
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...
 

	
 
    def __getitem__(self: RelatedType,
 
                    index: Union[int, slice],
 
    ) -> Union[data.Posting, RelatedType]:
 
        if isinstance(index, slice):
 
            return type(self)(self._postings[index], _can_own=True)
 
        else:
 
            return self._postings[index]
 

	
 
    def __len__(self) -> int:
 
        return len(self._postings)
 

	
 
    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        for post in self:
 
            try:
 
                yield from post.meta.get_links(key)
 
            except TypeError:
 
                pass
 

	
 
    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        return filters.iter_unique(self._all_meta_links(key))
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...
 

	
 
    def first_meta_links(self,
 
                         key: MetaKey,
 
                         default: Optional[str]='',
 
    ) -> Iterator[Optional[str]]:
 
        retval = filters.iter_unique(
 
            post.meta.first_link(key, default) for post in self
 
        )
 
        if default == '':
 
            retval = (s for s in retval if s)
 
        return retval
 

	
 
    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
 
        balance = MutableBalance()
 
        for post in self:
 
            balance += post.units
 
            yield post, balance
 

	
 
    def balance(self) -> Balance:
 
        for _, balance in self.iter_with_balance():
 
            pass
 
        try:
 
            return balance
 
        except NameError:
 
            return Balance()
 

	
 
    def balance_at_cost(self) -> Balance:
 
        balance = MutableBalance()
 
        for post in self:
 
            if post.cost is None:
 
                balance += post.units
 
            else:
 
                number = post.units.number * post.cost.number
 
                balance += data.Amount(number, post.cost.currency)
 
        return balance
 

	
 
    def meta_values(self,
 
                    key: MetaKey,
 
                    default: Optional[MetaValue]=None,
 
    ) -> Set[Optional[MetaValue]]:
 
        return {post.meta.get(key, default) for post in self}
 

	
 

	
 
class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta):
 
    """Abstract base class to help write spreadsheets
 

	
 
    This class provides the very core logic to write an arbitrary set of data
 
    rows to arbitrary output. It calls hooks when it starts writing the
 
    spreadsheet, starts a new "section" of rows, ends a section, and ends the
 
    spreadsheet.
 

	
 
    RT is the type of the input data rows. ST is the type of the section
 
    identifier that you create from each row. If you don't want to use the
 
    section logic at all, set ST to None and define section_key to return None.
 
    """
 

	
 
    @abc.abstractmethod
 
    def section_key(self, row: RT) -> ST:
 
        """Return the section a row belongs to
 

	
tests/test_reports_related_postings.py
Show inline comments
...
 
@@ -230,72 +230,86 @@ def test_meta_values_all_match_default_given():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, key='2'),
 
    ])
 
    assert related.meta_values('key', '') == {'1', '2'}
 

	
 
def test_meta_values_many_types():
 
    expected = {
 
        datetime.date(2020, 4, 1),
 
        Decimal(42),
 
        testutil.Amount(5),
 
        'rt:42',
 
    }
 
    related = core.RelatedPostings(
 
        testutil.Posting('Income:Donations', -index, key=value)
 
        for index, value in enumerate(expected)
 
    )
 
    assert related.meta_values('key') == expected
 

	
 
@pytest.mark.parametrize('count', range(3))
 
def test_all_meta_links_zero(count):
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -n, testkey=str(n), _meta_type=data.Metadata,
 
    ) for n in range(count))
 
    assert next(related.all_meta_links('approval'), None) is None
 

	
 
def test_all_meta_links_singletons():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, statement=value, _meta_type=data.Metadata,
 
    ) for value in itertools.chain(
 
        testutil.NON_LINK_METADATA_STRINGS,
 
        testutil.LINK_METADATA_STRINGS,
 
        testutil.NON_STRING_METADATA_VALUES,
 
    ))
 
    assert set(related.all_meta_links('statement')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_multiples():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=' '.join(value), _meta_type=data.Metadata,
 
    ) for value in itertools.permutations(testutil.LINK_METADATA_STRINGS, 2))
 
    assert set(related.all_meta_links('approval')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_preserves_order():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=c, _meta_type=data.Metadata,
 
    ) for c in '121323')
 
    assert list(related.all_meta_links('approval')) == list('123')
 

	
 
def test_first_meta_links():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
 
    ) for value in ['1 2', '', '1 3', testutil.PAST_DATE, '2 3', None])
 
    del related[-1].meta['contract']
 
    assert list(related.first_meta_links('contract')) == list('12')
 

	
 
def test_first_meta_links_fallback():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
 
    ) for value in ['1 2', testutil.PAST_DATE, '1 3', None, '2 3'])
 
    del related[-2].meta['contract']
 
    assert list(related.first_meta_links('contract', None)) == ['1', None, '2']
 

	
 
def test_group_by_meta_zero():
 
    assert not list(core.RelatedPostings.group_by_meta([], 'metacurrency'))
 

	
 
def test_group_by_meta_one(credit_card_cycle):
 
    posting = next(post for post in data.Posting.from_entries(credit_card_cycle)
 
                   if post.account.is_credit_card())
 
    actual = core.RelatedPostings.group_by_meta([posting], 'metacurrency')
 
    assert set(key for key, _ in actual) == {'USD'}
 

	
 
def test_group_by_meta_many(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
 
    assert set(actual) == {'USD', 'EUR'}
 
    for key, group in actual.items():
 
        assert 2 <= len(group) <= 3
 
        assert group.balance().is_zero()
 

	
 
def test_group_by_meta_many_single_posts(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
 
    assert set(actual) == {post.units.number for post in postings}
 
    assert len(actual) == len(postings)
0 comments (0 inline, 0 general)