Changeset - 8333ed887646
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-15 15:20:30
brettcsmith@brettcsmith.org
reports: Add RelatedPostings.group_by_account() classmethod.
2 files changed with 24 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -181,192 +181,198 @@ class Balance(Mapping[str, data.Amount]):
 
    @staticmethod
 
    def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
 
        dec = cast(Decimal, dec)
 
        return abs(dec) < tolerance
 

	
 
    def eq_zero(self) -> bool:
 
        """Returns true if all amounts in the balance == 0, within tolerance."""
 
        return self._all_amounts(self.within_tolerance, self.tolerance)
 

	
 
    is_zero = eq_zero
 

	
 
    def ge_zero(self) -> bool:
 
        """Returns true if all amounts in the balance >= 0, within tolerance."""
 
        op_func = operator.gt if self.tolerance else operator.ge
 
        return self._all_amounts(op_func, -self.tolerance)
 

	
 
    def le_zero(self) -> bool:
 
        """Returns true if all amounts in the balance <= 0, within tolerance."""
 
        op_func = operator.lt if self.tolerance else operator.le
 
        return self._all_amounts(op_func, self.tolerance)
 

	
 
    def format(self,
 
               fmt: Optional[str]='#,#00.00 ¤¤',
 
               sep: str=', ',
 
               empty: str="Zero balance",
 
               tolerance: Optional[Decimal]=None,
 
    ) -> str:
 
        """Formats the balance as a string with the given parameters
 

	
 
        If the balance is zero (within tolerance), returns ``empty``.
 
        Otherwise, returns a string with each amount in the balance formatted
 
        as ``fmt``, separated by ``sep``.
 

	
 
        If you set ``fmt`` to None, amounts will be formatted according to the
 
        user's locale. The default format is Beancount's input format.
 
        """
 
        amounts = list(self.clean_copy(tolerance).values())
 
        if not amounts:
 
            return empty
 
        amounts.sort(key=lambda amt: abs(amt.number), reverse=True)
 
        return sep.join(
 
            babel.numbers.format_currency(amt.number, amt.currency, fmt)
 
            for amt in amounts
 
        )
 

	
 

	
 
class MutableBalance(Balance):
 
    __slots__ = ()
 

	
 
    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, other)
 
        return self
 

	
 
    def __isub__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, -other)
 
        return self
 

	
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 
    __slots__ = ('_postings',)
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        self._postings: List[data.Posting]
 
        if _can_own and isinstance(source, list):
 
            self._postings = source
 
        else:
 
            self._postings = list(source)
 

	
 
    @classmethod
 
    def _group_by(cls: Type[RelatedType],
 
                  postings: Iterable[data.Posting],
 
                  key: Callable[[data.Posting], T],
 
    ) -> Iterator[Tuple[T, RelatedType]]:
 
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
 
        for post in postings:
 
            mapping[key(post)].append(post)
 
        for value, posts in mapping.items():
 
            yield value, cls(posts, _can_own=True)
 

	
 
    @classmethod
 
    def group_by_account(cls: Type[RelatedType],
 
                         postings: Iterable[data.Posting],
 
    ) -> Iterator[Tuple[data.Account, RelatedType]]:
 
        return cls._group_by(postings, operator.attrgetter('account'))
 

	
 
    @classmethod
 
    def group_by_meta(cls: Type[RelatedType],
 
                      postings: Iterable[data.Posting],
 
                      key: MetaKey,
 
                      default: Optional[MetaValue]=None,
 
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
 
        """Relate postings by metadata value
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of post.meta.get(key, default).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same metadata value.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.get(key, default)
 
        return cls._group_by(postings, key_func)
 

	
 
    @classmethod
 
    def group_by_first_meta_link(
 
            cls: Type[RelatedType],
 
            postings: Iterable[data.Posting],
 
            key: MetaKey,
 
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
 
        """Relate postings by the first link in metadata
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of
 
        post.meta.first_link(key, None).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same first metadata link.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.first_link(key, None)
 
        return cls._group_by(postings, key_func)
 

	
 
    def __repr__(self) -> str:
 
        return f'<{type(self).__name__} {self._postings!r}>'
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...
 

	
 
    def __getitem__(self: RelatedType,
 
                    index: Union[int, slice],
 
    ) -> Union[data.Posting, RelatedType]:
 
        if isinstance(index, slice):
 
            return type(self)(self._postings[index], _can_own=True)
 
        else:
 
            return self._postings[index]
 

	
 
    def __len__(self) -> int:
 
        return len(self._postings)
 

	
 
    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        for post in self:
 
            try:
 
                yield from post.meta.get_links(key)
 
            except TypeError:
 
                pass
 

	
 
    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        return filters.iter_unique(self._all_meta_links(key))
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...
 

	
 
    def first_meta_links(self,
 
                         key: MetaKey,
 
                         default: Optional[str]='',
 
    ) -> Iterator[Optional[str]]:
 
        retval = filters.iter_unique(
 
            post.meta.first_link(key, default) for post in self
 
        )
 
        if default == '':
 
            retval = (s for s in retval if s)
 
        return retval
 

	
 
    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
 
        balance = MutableBalance()
 
        for post in self:
 
            balance += post.units
 
            yield post, balance
 

	
 
    def balance(self) -> Balance:
 
        return Balance(post.units for post in self)
 

	
 
    def balance_at_cost(self) -> Balance:
 
        return Balance(post.at_cost() for post in self)
 

	
 
    def balance_at_cost_by_date(self, date: datetime.date) -> Balance:
 
        for index, post in enumerate(self):
tests/test_reports_related_postings.py
Show inline comments
...
 
@@ -305,96 +305,114 @@ def test_all_meta_links_zero(count):
 
        'Income:Donations', -n, testkey=str(n), _meta_type=data.Metadata,
 
    ) for n in range(count))
 
    assert next(related.all_meta_links('approval'), None) is None
 

	
 
def test_all_meta_links_singletons():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, statement=value, _meta_type=data.Metadata,
 
    ) for value in itertools.chain(
 
        testutil.NON_LINK_METADATA_STRINGS,
 
        testutil.LINK_METADATA_STRINGS,
 
        testutil.NON_STRING_METADATA_VALUES,
 
    ))
 
    assert set(related.all_meta_links('statement')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_multiples():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=' '.join(value), _meta_type=data.Metadata,
 
    ) for value in itertools.permutations(testutil.LINK_METADATA_STRINGS, 2))
 
    assert set(related.all_meta_links('approval')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_preserves_order():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=c, _meta_type=data.Metadata,
 
    ) for c in '121323')
 
    assert list(related.all_meta_links('approval')) == list('123')
 

	
 
def test_first_meta_links():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
 
    ) for value in ['1 2', '', '1 3', testutil.PAST_DATE, '2 3', None])
 
    del related[-1].meta['contract']
 
    assert list(related.first_meta_links('contract')) == list('12')
 

	
 
def test_first_meta_links_fallback():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
 
    ) for value in ['1 2', testutil.PAST_DATE, '1 3', None, '2 3'])
 
    del related[-2].meta['contract']
 
    assert list(related.first_meta_links('contract', None)) == ['1', None, '2']
 

	
 
def test_group_by_meta_zero():
 
    assert not list(core.RelatedPostings.group_by_meta([], 'metacurrency'))
 

	
 
def test_group_by_meta_one(credit_card_cycle):
 
    posting = next(post for post in data.Posting.from_entries(credit_card_cycle)
 
                   if post.account.is_credit_card())
 
    actual = core.RelatedPostings.group_by_meta([posting], 'metacurrency')
 
    assert set(key for key, _ in actual) == {'USD'}
 

	
 
def test_group_by_meta_many(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
 
    assert set(actual) == {'USD', 'EUR'}
 
    for key, group in actual.items():
 
        assert 2 <= len(group) <= 3
 
        assert group.balance().is_zero()
 

	
 
def test_group_by_meta_many_single_posts(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
 
    assert set(actual) == {post.units.number for post in postings}
 
    assert len(actual) == len(postings)
 

	
 
def test_group_by_first_meta_link_zero():
 
    assert not list(core.RelatedPostings.group_by_first_meta_link([], 'foo'))
 

	
 
def test_group_by_first_meta_link_no_key(link_swap_posts):
 
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
 
        iter(link_swap_posts), 'Nonexistent',
 
    ))
 
    assert len(actual) == 1
 
    assert list(actual[None]) == link_swap_posts
 

	
 
def test_group_by_first_meta_link_bad_type(link_swap_posts):
 
    assert all(post.meta.get('metanum') for post in link_swap_posts), \
 
        "did not find metadata required by test"
 
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
 
        iter(link_swap_posts), 'metanum',
 
    ))
 
    assert len(actual) == 1
 
    assert list(actual[None]) == link_swap_posts
 

	
 
def test_group_by_first_meta_link(link_swap_posts):
 
    actual_all = dict(core.RelatedPostings.group_by_first_meta_link(
 
        iter(link_swap_posts), 'rt-id',
 
    ))
 
    assert len(actual_all) == 2
 
    for key, expect_account in [
 
            ('rt:12', 'Assets:Receivable:Accounts'),
 
            ('rt:16', 'Liabilities:Payable:Accounts'),
 
    ]:
 
        actual = actual_all.get(key, '')
 
        assert len(actual) == 2
 
        assert all(post.account == expect_account for post in actual)
 

	
 
def test_group_by_account():
 
    entries = [
 
        testutil.Transaction(postings=[
 
            ('Income:Donations', -10),
 
            ('Assets:Cash', 10),
 
        ]),
 
        testutil.Transaction(postings=[
 
            ('Income:Donations', -20),
 
            ('Assets:Cash', 20),
 
        ]),
 
    ]
 
    postings = data.Posting.from_entries(entries)
 
    actual = dict(core.RelatedPostings.group_by_account(postings))
 
    assert len(actual) == 2
 
    for key, related in actual.items():
 
        assert len(related) == 2
 
        assert all(post.account == key for post in related)
0 comments (0 inline, 0 general)