Changeset - 0581525c9824
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-09 13:04:27
brettcsmith@brettcsmith.org
reports: Add Balance.__pos__() method.

I did this while I was working on normalize_amount_func.
It turns out it's not immediately needed, but it's still nice to have.
2 files changed with 17 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -47,192 +47,195 @@ from typing import (
 
    Callable,
 
    DefaultDict,
 
    Dict,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    MutableMapping,
 
    Optional,
 
    Sequence,
 
    Set,
 
    Tuple,
 
    Type,
 
    TypeVar,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaKey,
 
    MetaValue,
 
)
 

	
 
DecimalCompat = data.DecimalCompat
 
BalanceType = TypeVar('BalanceType', bound='Balance')
 
ElementType = Callable[..., odf.element.Element]
 
LinkType = Union[str, Tuple[str, Optional[str]]]
 
RelatedType = TypeVar('RelatedType', bound='RelatedPostings')
 
RT = TypeVar('RT', bound=Sequence)
 
ST = TypeVar('ST')
 
T = TypeVar('T')
 

	
 
class Balance(Mapping[str, data.Amount]):
 
    """A collection of amounts mapped by currency
 

	
 
    Each key is a Beancount currency string, and each value represents the
 
    balance in that currency.
 
    """
 
    __slots__ = ('_currency_map', 'tolerance')
 
    TOLERANCE = Decimal('0.01')
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Amount]=(),
 
                 tolerance: Optional[Decimal]=None,
 
    ) -> None:
 
        if tolerance is None:
 
            tolerance = self.TOLERANCE
 
        self._currency_map = {amount.currency: amount for amount in source}
 
        self.tolerance = tolerance
 

	
 
    def _add_amount(self,
 
                    currency_map: MutableMapping[str, data.Amount],
 
                    amount: data.Amount,
 
    ) -> None:
 
        code = amount.currency
 
        try:
 
            current_number = currency_map[code].number
 
        except KeyError:
 
            current_number = Decimal(0)
 
        currency_map[code] = data.Amount(current_number + amount.number, code)
 

	
 
    def _add_other(self,
 
                   currency_map: MutableMapping[str, data.Amount],
 
                   other: Union[data.Amount, 'Balance'],
 
    ) -> None:
 
        if isinstance(other, Balance):
 
            for amount in other.values():
 
                self._add_amount(currency_map, amount)
 
        else:
 
            self._add_amount(currency_map, other)
 

	
 
    def __repr__(self) -> str:
 
        values = [repr(amt) for amt in self.values()]
 
        return f"{type(self).__name__}({values!r})"
 

	
 
    def __str__(self) -> str:
 
        return self.format()
 

	
 
    def __abs__(self: BalanceType) -> BalanceType:
 
        return type(self)(bc_amount.abs(amt) for amt in self.values())
 

	
 
    def __add__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
 
        retval_map = self._currency_map.copy()
 
        self._add_other(retval_map, other)
 
        return type(self)(retval_map.values())
 

	
 
    def __eq__(self, other: Any) -> bool:
 
        if (self.is_zero()
 
            and isinstance(other, Balance)
 
            and other.is_zero()):
 
            return True
 
        else:
 
            return super().__eq__(other)
 

	
 
    def __neg__(self: BalanceType) -> BalanceType:
 
        return type(self)(-amt for amt in self.values())
 

	
 
    def __pos__(self: BalanceType) -> BalanceType:
 
        return self
 

	
 
    def __getitem__(self, key: str) -> data.Amount:
 
        return self._currency_map[key]
 

	
 
    def __iter__(self) -> Iterator[str]:
 
        return iter(self._currency_map)
 

	
 
    def __len__(self) -> int:
 
        return len(self._currency_map)
 

	
 
    def _all_amounts(self,
 
                     op_func: Callable[[DecimalCompat, DecimalCompat], bool],
 
                     operand: DecimalCompat,
 
    ) -> bool:
 
        return all(op_func(amt.number, operand) for amt in self.values())
 

	
 
    @staticmethod
 
    def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
 
        dec = cast(Decimal, dec)
 
        return abs(dec) < tolerance
 

	
 
    def eq_zero(self) -> bool:
 
        """Returns true if all amounts in the balance == 0, within tolerance."""
 
        return self._all_amounts(self.within_tolerance, self.tolerance)
 

	
 
    is_zero = eq_zero
 

	
 
    def ge_zero(self) -> bool:
 
        """Returns true if all amounts in the balance >= 0, within tolerance."""
 
        op_func = operator.gt if self.tolerance else operator.ge
 
        return self._all_amounts(op_func, -self.tolerance)
 

	
 
    def le_zero(self) -> bool:
 
        """Returns true if all amounts in the balance <= 0, within tolerance."""
 
        op_func = operator.lt if self.tolerance else operator.le
 
        return self._all_amounts(op_func, self.tolerance)
 

	
 
    def format(self,
 
               fmt: Optional[str]='#,#00.00 ¤¤',
 
               sep: str=', ',
 
               empty: str="Zero balance",
 
    ) -> str:
 
        """Formats the balance as a string with the given parameters
 

	
 
        If the balance is zero, returns ``empty``. Otherwise, returns a string
 
        with each amount in the balance formatted as ``fmt``, separated by
 
        ``sep``.
 

	
 
        If you set ``fmt`` to None, amounts will be formatted according to the
 
        user's locale. The default format is Beancount's input format.
 
        """
 
        amounts = [amount for amount in self.values() if amount.number]
 
        if not amounts:
 
            return empty
 
        amounts.sort(key=lambda amt: abs(amt.number), reverse=True)
 
        return sep.join(
 
            babel.numbers.format_currency(amt.number, amt.currency, fmt)
 
            for amt in amounts
 
        )
 

	
 

	
 
class MutableBalance(Balance):
 
    __slots__ = ()
 

	
 
    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, other)
 
        return self
 

	
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 
    __slots__ = ('_postings',)
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        self._postings: List[data.Posting]
 
        if _can_own and isinstance(source, list):
 
            self._postings = source
 
        else:
 
            self._postings = list(source)
 

	
 
    @classmethod
tests/test_reports_balance.py
Show inline comments
...
 
@@ -75,192 +75,206 @@ def test_nonzero_balance(currencies):
 

	
 
def test_mixed_balance():
 
    amounts = {'USD': 0, 'EUR': 120}
 
    balance = core.Balance(amounts_from_map(amounts))
 
    assert balance
 
    assert len(balance) == 2
 
    assert not balance.is_zero()
 
    assert all(balance[key] == testutil.Amount(amt, key) for key, amt in amounts.items())
 

	
 
@pytest.mark.parametrize('mapping,expected', [
 
    ({}, True),
 
    ({'USD': 0}, True),
 
    ({'USD': 0, 'EUR': 0}, True),
 
    ({'USD': -10, 'EUR': 0}, False),
 
    ({'EUR': -10}, False),
 
    ({'USD': -10, 'EUR': -20}, False),
 
    ({'USD': 10, 'EUR': -20}, False),
 
    ({'JPY': 10}, False),
 
    ({'JPY': 10, 'BRL': 0}, False),
 
    ({'JPY': 10, 'BRL': 20}, False),
 
    ({'USD': '0.00015'}, True),
 
    ({'EUR': '-0.00052'}, True),
 
])
 
def test_eq_zero(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.eq_zero() == expected
 
    assert balance.is_zero() == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', [
 
    ({}, True),
 
    ({'USD': 0}, True),
 
    ({'USD': 0, 'EUR': 0}, True),
 
    ({'EUR': -10}, False),
 
    ({'USD': 10, 'EUR': -20}, False),
 
    ({'USD': -10, 'EUR': -20}, False),
 
    ({'JPY': 10}, True),
 
    ({'JPY': 10, 'BRL': 0}, True),
 
    ({'JPY': 10, 'BRL': 20}, True),
 
    ({'USD': '0.00015'}, True),
 
    ({'EUR': '-0.00052'}, True),
 
    ({'RUB': core.Balance.TOLERANCE}, True),
 
    ({'RUB': -core.Balance.TOLERANCE}, False),
 
])
 
def test_ge_zero(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.ge_zero() == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', [
 
    ({}, True),
 
    ({'USD': 0}, True),
 
    ({'USD': 0, 'EUR': 0}, True),
 
    ({'EUR': -10}, True),
 
    ({'USD': 10, 'EUR': -20}, False),
 
    ({'USD': -10, 'EUR': -20}, True),
 
    ({'JPY': 10}, False),
 
    ({'JPY': 10, 'BRL': 0}, False),
 
    ({'JPY': 10, 'BRL': 20}, False),
 
    ({'USD': '0.00015'}, True),
 
    ({'EUR': '-0.00052'}, True),
 
    ({'RUB': core.Balance.TOLERANCE}, False),
 
    ({'RUB': -core.Balance.TOLERANCE}, True),
 
])
 
def test_le_zero(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.le_zero() == expected
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_abs(mapping):
 
    actual = abs(core.Balance(amounts_from_map(mapping)))
 
    assert set(actual) == set(mapping)
 
    for key, number in mapping.items():
 
        assert actual[key] == testutil.Amount(abs(number), key)
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_neg(mapping):
 
    actual = -core.Balance(amounts_from_map(mapping))
 
    assert set(actual) == set(mapping)
 
    for key, number in mapping.items():
 
        assert actual[key] == testutil.Amount(-number, key)
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_pos(mapping):
 
    amounts = frozenset(amounts_from_map(mapping))
 
    actual = +core.Balance(amounts)
 
    assert set(actual.values()) == amounts
 

	
 
@pytest.mark.parametrize('map1,map2,expected', [
 
    ({}, {}, True),
 
    ({}, {'USD': 0}, True),
 
    ({}, {'EUR': 1}, False),
 
    ({'USD': 1}, {'EUR': 1}, False),
 
    ({'USD': 1}, {'USD': '1.0'}, True),
 
    ({'USD': 1}, {'USD': '1.0', 'EUR': '2.0'}, False),
 
    ({'USD': 1, 'BRL': '2.0'}, {'USD': '1.0', 'EUR': '2.0'}, False),
 
    ({'USD': 1, 'EUR': 2, 'BRL': '3.0'}, {'USD': '1.0', 'EUR': '2.0'}, False),
 
    ({'USD': 1, 'EUR': 2}, {'USD': '1.0', 'EUR': '2.0'}, True),
 
])
 
def test_eq(map1, map2, expected):
 
    bal1 = core.Balance(amounts_from_map(map1))
 
    bal2 = core.Balance(amounts_from_map(map2))
 
    actual = bal1 == bal2
 
    assert actual == expected
 

	
 
@pytest.mark.parametrize('number,currency', {
 
    (50, 'USD'),
 
    (-50, 'USD'),
 
    (50000, 'BRL'),
 
    (-4000, 'BRL'),
 
})
 
def test_add_amount(number, currency):
 
    start_amount = testutil.Amount(500, 'USD')
 
    start_bal = core.Balance([start_amount])
 
    add_amount = testutil.Amount(number, currency)
 
    actual = start_bal + add_amount
 
    if currency == 'USD':
 
        assert len(actual) == 1
 
        assert actual['USD'] == testutil.Amount(500 + number)
 
    else:
 
        assert len(actual) == 2
 
        assert actual['USD'] == start_amount
 
        assert actual[currency] == add_amount
 
    assert start_bal == {'USD': start_amount}
 

	
 
@pytest.mark.parametrize('number,currency', {
 
    (50, 'USD'),
 
    (-50, 'USD'),
 
    (50000, 'BRL'),
 
    (-4000, 'BRL'),
 
})
 
def test_iadd_amount(number, currency):
 
    balance = core.MutableBalance([testutil.Amount(500, 'USD')])
 
    add_amount = testutil.Amount(number, currency)
 
    balance += add_amount
 
    if currency == 'USD':
 
        assert len(balance) == 1
 
        assert balance['USD'] == testutil.Amount(500 + number)
 
    else:
 
        assert len(balance) == 2
 
        assert balance['USD'] == testutil.Amount(500)
 
        assert balance[currency] == add_amount
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_add_balance(mapping):
 
    expect_numbers = {'USD': 500, 'BRL': 40000}
 
    start_bal = core.Balance(amounts_from_map(expect_numbers))
 
    for code, number in mapping.items():
 
        expect_numbers[code] = expect_numbers.get(code, 0) + number
 
    add_bal = core.Balance(amounts_from_map(mapping))
 
    actual = start_bal + add_bal
 
    expected = core.Balance(amounts_from_map(expect_numbers))
 
    assert actual == expected
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_iadd_balance(mapping):
 
    expect_numbers = {'USD': 500, 'BRL': 40000}
 
    balance = core.MutableBalance(amounts_from_map(expect_numbers))
 
    for code, number in mapping.items():
 
        expect_numbers[code] = expect_numbers.get(code, 0) + number
 
    balance += core.Balance(amounts_from_map(mapping))
 
    expected = core.Balance(amounts_from_map(expect_numbers))
 
    assert balance == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', DEFAULT_STRINGS)
 
def test_str(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert str(balance) == expected
0 comments (0 inline, 0 general)