Changeset - 8333ed887646
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-15 15:20:30
brettcsmith@brettcsmith.org
reports: Add RelatedPostings.group_by_account() classmethod.
2 files changed with 24 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -229,96 +229,102 @@ class MutableBalance(Balance):
 

	
 
    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, other)
 
        return self
 

	
 
    def __isub__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, -other)
 
        return self
 

	
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 
    __slots__ = ('_postings',)
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        self._postings: List[data.Posting]
 
        if _can_own and isinstance(source, list):
 
            self._postings = source
 
        else:
 
            self._postings = list(source)
 

	
 
    @classmethod
 
    def _group_by(cls: Type[RelatedType],
 
                  postings: Iterable[data.Posting],
 
                  key: Callable[[data.Posting], T],
 
    ) -> Iterator[Tuple[T, RelatedType]]:
 
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
 
        for post in postings:
 
            mapping[key(post)].append(post)
 
        for value, posts in mapping.items():
 
            yield value, cls(posts, _can_own=True)
 

	
 
    @classmethod
 
    def group_by_account(cls: Type[RelatedType],
 
                         postings: Iterable[data.Posting],
 
    ) -> Iterator[Tuple[data.Account, RelatedType]]:
 
        return cls._group_by(postings, operator.attrgetter('account'))
 

	
 
    @classmethod
 
    def group_by_meta(cls: Type[RelatedType],
 
                      postings: Iterable[data.Posting],
 
                      key: MetaKey,
 
                      default: Optional[MetaValue]=None,
 
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
 
        """Relate postings by metadata value
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of post.meta.get(key, default).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same metadata value.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.get(key, default)
 
        return cls._group_by(postings, key_func)
 

	
 
    @classmethod
 
    def group_by_first_meta_link(
 
            cls: Type[RelatedType],
 
            postings: Iterable[data.Posting],
 
            key: MetaKey,
 
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
 
        """Relate postings by the first link in metadata
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of
 
        post.meta.first_link(key, None).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same first metadata link.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.first_link(key, None)
 
        return cls._group_by(postings, key_func)
 

	
 
    def __repr__(self) -> str:
 
        return f'<{type(self).__name__} {self._postings!r}>'
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...
 

	
 
    def __getitem__(self: RelatedType,
 
                    index: Union[int, slice],
 
    ) -> Union[data.Posting, RelatedType]:
 
        if isinstance(index, slice):
tests/test_reports_related_postings.py
Show inline comments
...
 
@@ -353,48 +353,66 @@ def test_group_by_meta_one(credit_card_cycle):
 

	
 
def test_group_by_meta_many(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
 
    assert set(actual) == {'USD', 'EUR'}
 
    for key, group in actual.items():
 
        assert 2 <= len(group) <= 3
 
        assert group.balance().is_zero()
 

	
 
def test_group_by_meta_many_single_posts(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
 
    assert set(actual) == {post.units.number for post in postings}
 
    assert len(actual) == len(postings)
 

	
 
def test_group_by_first_meta_link_zero():
 
    assert not list(core.RelatedPostings.group_by_first_meta_link([], 'foo'))
 

	
 
def test_group_by_first_meta_link_no_key(link_swap_posts):
 
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
 
        iter(link_swap_posts), 'Nonexistent',
 
    ))
 
    assert len(actual) == 1
 
    assert list(actual[None]) == link_swap_posts
 

	
 
def test_group_by_first_meta_link_bad_type(link_swap_posts):
 
    assert all(post.meta.get('metanum') for post in link_swap_posts), \
 
        "did not find metadata required by test"
 
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
 
        iter(link_swap_posts), 'metanum',
 
    ))
 
    assert len(actual) == 1
 
    assert list(actual[None]) == link_swap_posts
 

	
 
def test_group_by_first_meta_link(link_swap_posts):
 
    actual_all = dict(core.RelatedPostings.group_by_first_meta_link(
 
        iter(link_swap_posts), 'rt-id',
 
    ))
 
    assert len(actual_all) == 2
 
    for key, expect_account in [
 
            ('rt:12', 'Assets:Receivable:Accounts'),
 
            ('rt:16', 'Liabilities:Payable:Accounts'),
 
    ]:
 
        actual = actual_all.get(key, '')
 
        assert len(actual) == 2
 
        assert all(post.account == expect_account for post in actual)
 

	
 
def test_group_by_account():
 
    entries = [
 
        testutil.Transaction(postings=[
 
            ('Income:Donations', -10),
 
            ('Assets:Cash', 10),
 
        ]),
 
        testutil.Transaction(postings=[
 
            ('Income:Donations', -20),
 
            ('Assets:Cash', 20),
 
        ]),
 
    ]
 
    postings = data.Posting.from_entries(entries)
 
    actual = dict(core.RelatedPostings.group_by_account(postings))
 
    assert len(actual) == 2
 
    for key, related in actual.items():
 
        assert len(related) == 2
 
        assert all(post.account == key for post in related)
0 comments (0 inline, 0 general)