Changeset - 8333ed887646
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-15 15:20:30
reports: Add RelatedPostings.group_by_account() classmethod.
2 files changed with 24 insertions and 0 deletions:
0 comments (0 inline, 0 general)
Show inline comments
@@ -85,384 +85,390 @@ class Balance(Mapping[str, data.Amount]):
    __slots__ = ('_currency_map', 'tolerance')
    TOLERANCE = Decimal('0.01')

    def __init__(self,
                 source: Iterable[data.Amount]=(),
                 tolerance: Optional[Decimal]=None,
    ) -> None:
        if tolerance is None:
            tolerance = self.TOLERANCE
        self.tolerance = tolerance
        self._currency_map: Dict[str, data.Amount] = {}
        for amount in source:
            self._add_amount(self._currency_map, amount)

    def _add_amount(self,
                    currency_map: MutableMapping[str, data.Amount],
                    amount: data.Amount,
    ) -> None:
        code = amount.currency
            current_number = currency_map[code].number
        except KeyError:
            current_number = Decimal(0)
        currency_map[code] = data.Amount(current_number + amount.number, code)

    def _add_other(self,
                   currency_map: MutableMapping[str, data.Amount],
                   other: Union[data.Amount, 'Balance'],
    ) -> None:
        if isinstance(other, Balance):
            for amount in other.values():
                self._add_amount(currency_map, amount)
            self._add_amount(currency_map, other)

    def __repr__(self) -> str:
        values = [repr(amt) for amt in self.values()]
        return f"{type(self).__name__}({values!r})"

    def __str__(self) -> str:
        return self.format()

    def __abs__(self: BalanceType) -> BalanceType:
        return type(self)(bc_amount.abs(amt) for amt in self.values())

    def __add__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
        retval_map = self._currency_map.copy()
        self._add_other(retval_map, other)
        return type(self)(retval_map.values())

    def __sub__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
        return self.__add__(-other)

    def __eq__(self, other: Any) -> bool:
        if isinstance(other, Balance):
            clean_self = self.clean_copy()
            clean_other = other.clean_copy()
            return len(clean_self) == len(clean_other) and all(
                clean_self[key] == clean_other.get(key) for key in clean_self
            return super().__eq__(other)

    def __neg__(self: BalanceType) -> BalanceType:
        return type(self)(-amt for amt in self.values())

    def __pos__(self: BalanceType) -> BalanceType:
        return self

    def __getitem__(self, key: str) -> data.Amount:
        return self._currency_map[key]

    def __iter__(self) -> Iterator[str]:
        return iter(self._currency_map)

    def __len__(self) -> int:
        return len(self._currency_map)

    def _all_amounts(self,
                     op_func: Callable[[DecimalCompat, DecimalCompat], bool],
                     operand: DecimalCompat,
    ) -> bool:
        return all(op_func(amt.number, operand) for amt in self.values())

    def copy(self: BalanceType) -> BalanceType:
        return type(self)(self.values())

    def clean_copy(self: BalanceType, tolerance: Optional[Decimal]=None) -> BalanceType:
        if tolerance is None:
            tolerance = self.tolerance
        return type(self)(
            amount for amount in self.values()
            if abs(amount.number) >= tolerance

    def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
        dec = cast(Decimal, dec)
        return abs(dec) < tolerance

    def eq_zero(self) -> bool:
        """Returns true if all amounts in the balance == 0, within tolerance."""
        return self._all_amounts(self.within_tolerance, self.tolerance)

    is_zero = eq_zero

    def ge_zero(self) -> bool:
        """Returns true if all amounts in the balance >= 0, within tolerance."""
        op_func = if self.tolerance else
        return self._all_amounts(op_func, -self.tolerance)

    def le_zero(self) -> bool:
        """Returns true if all amounts in the balance <= 0, within tolerance."""
        op_func = if self.tolerance else operator.le
        return self._all_amounts(op_func, self.tolerance)

    def format(self,
               fmt: Optional[str]='#,#00.00 ¤¤',
               sep: str=', ',
               empty: str="Zero balance",
               tolerance: Optional[Decimal]=None,
    ) -> str:
        """Formats the balance as a string with the given parameters

        If the balance is zero (within tolerance), returns ``empty``.
        Otherwise, returns a string with each amount in the balance formatted
        as ``fmt``, separated by ``sep``.

        If you set ``fmt`` to None, amounts will be formatted according to the
        user's locale. The default format is Beancount's input format.
        amounts = list(self.clean_copy(tolerance).values())
        if not amounts:
            return empty
        amounts.sort(key=lambda amt: abs(amt.number), reverse=True)
        return sep.join(
            babel.numbers.format_currency(amt.number, amt.currency, fmt)
            for amt in amounts


class MutableBalance(Balance):
    __slots__ = ()

    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
        self._add_other(self._currency_map, other)
        return self

    def __isub__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
        self._add_other(self._currency_map, -other)
        return self


class RelatedPostings(Sequence[data.Posting]):
    """Collect and query related postings

    This class provides common functionality for collecting related postings
    and running queries on them: iterating over them, tallying their balance,

    This class doesn't know anything about how the postings are related. That's
    entirely up to the caller.

    A common pattern is to use this class with collections.defaultdict
    to organize postings based on some key. See the group_by_meta classmethod
    for an example.
    __slots__ = ('_postings',)

    def __init__(self,
                 source: Iterable[data.Posting]=(),
                 _can_own: bool=False,
    ) -> None:
        self._postings: List[data.Posting]
        if _can_own and isinstance(source, list):
            self._postings = source
            self._postings = list(source)

    def _group_by(cls: Type[RelatedType],
                  postings: Iterable[data.Posting],
                  key: Callable[[data.Posting], T],
    ) -> Iterator[Tuple[T, RelatedType]]:
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
        for post in postings:
        for value, posts in mapping.items():
            yield value, cls(posts, _can_own=True)

    def group_by_account(cls: Type[RelatedType],
                         postings: Iterable[data.Posting],
    ) -> Iterator[Tuple[data.Account, RelatedType]]:
        return cls._group_by(postings, operator.attrgetter('account'))

    def group_by_meta(cls: Type[RelatedType],
                      postings: Iterable[data.Posting],
                      key: MetaKey,
                      default: Optional[MetaValue]=None,
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
        """Relate postings by metadata value

        This method takes an iterable of postings and returns a mapping.
        The keys of the mapping are the values of post.meta.get(key, default).
        The values are RelatedPostings instances that contain all the postings
        that had that same metadata value.
        def key_func(post: data.Posting) -> Optional[MetaValue]:
            return post.meta.get(key, default)
        return cls._group_by(postings, key_func)

    def group_by_first_meta_link(
            cls: Type[RelatedType],
            postings: Iterable[data.Posting],
            key: MetaKey,
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
        """Relate postings by the first link in metadata

        This method takes an iterable of postings and returns a mapping.
        The keys of the mapping are the values of
        post.meta.first_link(key, None).
        The values are RelatedPostings instances that contain all the postings
        that had that same first metadata link.
        def key_func(post: data.Posting) -> Optional[MetaValue]:
            return post.meta.first_link(key, None)
        return cls._group_by(postings, key_func)

    def __repr__(self) -> str:
        return f'<{type(self).__name__} {self._postings!r}>'

    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...

    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...

    def __getitem__(self: RelatedType,
                    index: Union[int, slice],
    ) -> Union[data.Posting, RelatedType]:
        if isinstance(index, slice):
            return type(self)(self._postings[index], _can_own=True)
            return self._postings[index]

    def __len__(self) -> int:
        return len(self._postings)

    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
        for post in self:
                yield from post.meta.get_links(key)
            except TypeError:

    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
        return filters.iter_unique(self._all_meta_links(key))

    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...

    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...

    def first_meta_links(self,
                         key: MetaKey,
                         default: Optional[str]='',
    ) -> Iterator[Optional[str]]:
        retval = filters.iter_unique(
            post.meta.first_link(key, default) for post in self
        if default == '':
            retval = (s for s in retval if s)
        return retval

    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
        balance = MutableBalance()
        for post in self:
            balance += post.units
            yield post, balance

    def balance(self) -> Balance:
        return Balance(post.units for post in self)

    def balance_at_cost(self) -> Balance:
        return Balance(post.at_cost() for post in self)

    def balance_at_cost_by_date(self, date: -> Balance:
        for index, post in enumerate(self):
            if >= date:
            index += 1
        return Balance(post.at_cost() for post in self._postings[:index])

    def meta_values(self,
                    key: MetaKey,
                    default: Optional[MetaValue]=None,
    ) -> Set[Optional[MetaValue]]:
        return {post.meta.get(key, default) for post in self}


class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta):
    """Abstract base class to help write spreadsheets

    This class provides the very core logic to write an arbitrary set of data
    rows to arbitrary output. It calls hooks when it starts writing the
    spreadsheet, starts a new "section" of rows, ends a section, and ends the

    RT is the type of the input data rows. ST is the type of the section
    identifier that you create from each row. If you don't want to use the
    section logic at all, set ST to None and define section_key to return None.

    def section_key(self, row: RT) -> ST:
        """Return the section a row belongs to

        Given a data row, this method should return some identifier for the
        "section" the row belongs to. The write method uses this to
        determine when to call start_section and end_section.

        If your spreadsheet doesn't need sections, define this to return None.

    def write_row(self, row: RT) -> None:
        """Write a data row to the output spreadsheet

        This method is called once for each data row in the input.

    # The next four methods are all called by the write method when the name
    # says. You may override them to output headers or sums, record
    # state, etc. The default implementations are all noops.

    def start_spreadsheet(self) -> None:

    def start_section(self, key: ST) -> None:

    def end_section(self, key: ST) -> None:

    def end_spreadsheet(self) -> None:

    def write(self, rows: Iterable[RT]) -> None:
        prev_section: Optional[ST] = None
        for row in rows:
            section = self.section_key(row)
            if section != prev_section:
                if prev_section is not None:
                prev_section = section
            should_end = section is not None
        except NameError:
            should_end = False
        if should_end:


class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
    """Abstract base class to help write OpenDocument spreadsheets

    This class provides the very core logic to write an arbitrary set of data
    rows to an OpenDocument spreadsheet. It provides helper methods for
    building sheets, rows, and cells.

    See also the BaseSpreadsheet base class for additional documentation about
    methods you must and can define, the definition of RT and ST, etc.
    def __init__(self, rt_wrapper: Optional[rtutil.RT]=None) -> None:
        self.rt_wrapper = rt_wrapper
        self.locale = babel.core.Locale.default('LC_MONETARY')
        self.currency_fmt_key = 'accounting'
Show inline comments
@@ -209,192 +209,210 @@ def test_balance_at_cost_by_date(date, expected):
    dates = testutil.date_seq()
    jpy_cost = ('0.01', 'USD')
    entries = [
        testutil.Transaction(date=next(dates), postings=[
            ('Assets:Cash', 1000, 'JPY', jpy_cost),
            ('Assets:Cash', 15),
        testutil.Transaction(date=next(dates), postings=[
            ('Assets:Cash', 2000, 'JPY', jpy_cost),
            ('Assets:Cash', 25),
        testutil.Transaction(date=next(dates), postings=[
            ('Assets:Cash', 3000, 'JPY', jpy_cost),
            ('Assets:Cash', 35),
    related = core.RelatedPostings(data.Posting.from_entries(entries))
    actual = related.balance_at_cost_by_date(date)
    if not expected:
        assert actual.is_zero()
        assert actual == {'USD': testutil.Amount(expected)}

def test_meta_values_empty():
    related = core.RelatedPostings()
    assert related.meta_values('key') == set()

def test_meta_values_no_match():
    related = core.RelatedPostings([
        testutil.Posting('Income:Donations', -1, metakey='metavalue'),
    assert related.meta_values('key') == {None}

def test_meta_values_no_match_default_given():
    related = core.RelatedPostings([
        testutil.Posting('Income:Donations', -1, metakey='metavalue'),
    assert related.meta_values('key', '') == {''}

def test_meta_values_one_match():
    related = core.RelatedPostings([
        testutil.Posting('Income:Donations', -1, key='metavalue'),
    assert related.meta_values('key') == {'metavalue'}

def test_meta_values_some_match():
    related = core.RelatedPostings([
        testutil.Posting('Income:Donations', -1, key='1'),
        testutil.Posting('Income:Donations', -2, metakey='2'),
    assert related.meta_values('key') == {'1', None}

def test_meta_values_some_match_default_given():
    related = core.RelatedPostings([
        testutil.Posting('Income:Donations', -1, key='1'),
        testutil.Posting('Income:Donations', -2, metakey='2'),
    assert related.meta_values('key', '') == {'1', ''}

def test_meta_values_all_match():
    related = core.RelatedPostings([
        testutil.Posting('Income:Donations', -1, key='1'),
        testutil.Posting('Income:Donations', -2, key='2'),
    assert related.meta_values('key') == {'1', '2'}

def test_meta_values_all_match_one_value():
    related = core.RelatedPostings([
        testutil.Posting('Income:Donations', -1, key='1'),
        testutil.Posting('Income:Donations', -2, key='1'),
    assert related.meta_values('key') == {'1'}

def test_meta_values_all_match_default_given():
    related = core.RelatedPostings([
        testutil.Posting('Income:Donations', -1, key='1'),
        testutil.Posting('Income:Donations', -2, key='2'),
    assert related.meta_values('key', '') == {'1', '2'}

def test_meta_values_many_types():
    expected = {
, 4, 1),
    related = core.RelatedPostings(
        testutil.Posting('Income:Donations', -index, key=value)
        for index, value in enumerate(expected)
    assert related.meta_values('key') == expected

@pytest.mark.parametrize('count', range(3))
def test_all_meta_links_zero(count):
    related = core.RelatedPostings(testutil.Posting(
        'Income:Donations', -n, testkey=str(n), _meta_type=data.Metadata,
    ) for n in range(count))
    assert next(related.all_meta_links('approval'), None) is None

def test_all_meta_links_singletons():
    related = core.RelatedPostings(testutil.Posting(
        'Income:Donations', -10, statement=value, _meta_type=data.Metadata,
    ) for value in itertools.chain(
    assert set(related.all_meta_links('statement')) == testutil.LINK_METADATA_STRINGS

def test_all_meta_links_multiples():
    related = core.RelatedPostings(testutil.Posting(
        'Income:Donations', -10, approval=' '.join(value), _meta_type=data.Metadata,
    ) for value in itertools.permutations(testutil.LINK_METADATA_STRINGS, 2))
    assert set(related.all_meta_links('approval')) == testutil.LINK_METADATA_STRINGS

def test_all_meta_links_preserves_order():
    related = core.RelatedPostings(testutil.Posting(
        'Income:Donations', -10, approval=c, _meta_type=data.Metadata,
    ) for c in '121323')
    assert list(related.all_meta_links('approval')) == list('123')

def test_first_meta_links():
    related = core.RelatedPostings(testutil.Posting(
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
    ) for value in ['1 2', '', '1 3', testutil.PAST_DATE, '2 3', None])
    del related[-1].meta['contract']
    assert list(related.first_meta_links('contract')) == list('12')

def test_first_meta_links_fallback():
    related = core.RelatedPostings(testutil.Posting(
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
    ) for value in ['1 2', testutil.PAST_DATE, '1 3', None, '2 3'])
    del related[-2].meta['contract']
    assert list(related.first_meta_links('contract', None)) == ['1', None, '2']

def test_group_by_meta_zero():
    assert not list(core.RelatedPostings.group_by_meta([], 'metacurrency'))

def test_group_by_meta_one(credit_card_cycle):
    posting = next(post for post in data.Posting.from_entries(credit_card_cycle)
                   if post.account.is_credit_card())
    actual = core.RelatedPostings.group_by_meta([posting], 'metacurrency')
    assert set(key for key, _ in actual) == {'USD'}

def test_group_by_meta_many(two_accruals_three_payments):
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
                if post.account == 'Assets:Receivable:Accounts']
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
    assert set(actual) == {'USD', 'EUR'}
    for key, group in actual.items():
        assert 2 <= len(group) <= 3
        assert group.balance().is_zero()

def test_group_by_meta_many_single_posts(two_accruals_three_payments):
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
                if post.account == 'Assets:Receivable:Accounts']
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
    assert set(actual) == {post.units.number for post in postings}
    assert len(actual) == len(postings)

def test_group_by_first_meta_link_zero():
    assert not list(core.RelatedPostings.group_by_first_meta_link([], 'foo'))

def test_group_by_first_meta_link_no_key(link_swap_posts):
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
        iter(link_swap_posts), 'Nonexistent',
    assert len(actual) == 1
    assert list(actual[None]) == link_swap_posts

def test_group_by_first_meta_link_bad_type(link_swap_posts):
    assert all(post.meta.get('metanum') for post in link_swap_posts), \
        "did not find metadata required by test"
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
        iter(link_swap_posts), 'metanum',
    assert len(actual) == 1
    assert list(actual[None]) == link_swap_posts

def test_group_by_first_meta_link(link_swap_posts):
    actual_all = dict(core.RelatedPostings.group_by_first_meta_link(
        iter(link_swap_posts), 'rt-id',
    assert len(actual_all) == 2
    for key, expect_account in [
            ('rt:12', 'Assets:Receivable:Accounts'),
            ('rt:16', 'Liabilities:Payable:Accounts'),
        actual = actual_all.get(key, '')
        assert len(actual) == 2
        assert all(post.account == expect_account for post in actual)

def test_group_by_account():
    entries = [
            ('Income:Donations', -10),
            ('Assets:Cash', 10),
            ('Income:Donations', -20),
            ('Assets:Cash', 20),
    postings = data.Posting.from_entries(entries)
    actual = dict(core.RelatedPostings.group_by_account(postings))
    assert len(actual) == 2
    for key, related in actual.items():
        assert len(related) == 2
        assert all(post.account == key for post in related)
0 comments (0 inline, 0 general)