Changeset - 110e5038e103
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-09 13:04:27
brettcsmith@brettcsmith.org
reports: Balance.__init__ better handles multiple amounts of same currency.

This is something that should've happened with 3d704e286
but I didn't think of it at the time.
2 files changed with 23 insertions and 1 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
 
"""core.py - Common data classes for reporting functionality"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import abc
 
import collections
 
import datetime
 
import itertools
 
import operator
 
import re
 

	
 
import babel.core  # type:ignore[import]
 
import babel.numbers  # type:ignore[import]
 

	
 
import odf.config  # type:ignore[import]
 
import odf.element  # type:ignore[import]
 
import odf.number  # type:ignore[import]
 
import odf.opendocument  # type:ignore[import]
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 
import odf.text  # type:ignore[import]
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
from beancount.core import amount as bc_amount
 

	
 
from .. import data
 

	
 
from typing import (
 
    cast,
 
    overload,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    DefaultDict,
 
    Dict,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    MutableMapping,
 
    Optional,
 
    Sequence,
 
    Set,
 
    Tuple,
 
    Type,
 
    TypeVar,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaKey,
 
    MetaValue,
 
)
 

	
 
DecimalCompat = data.DecimalCompat
 
BalanceType = TypeVar('BalanceType', bound='Balance')
 
ElementType = Callable[..., odf.element.Element]
 
LinkType = Union[str, Tuple[str, Optional[str]]]
 
RelatedType = TypeVar('RelatedType', bound='RelatedPostings')
 
RT = TypeVar('RT', bound=Sequence)
 
ST = TypeVar('ST')
 
T = TypeVar('T')
 

	
 
class Balance(Mapping[str, data.Amount]):
 
    """A collection of amounts mapped by currency
 

	
 
    Each key is a Beancount currency string, and each value represents the
 
    balance in that currency.
 
    """
 
    __slots__ = ('_currency_map', 'tolerance')
 
    TOLERANCE = Decimal('0.01')
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Amount]=(),
 
                 tolerance: Optional[Decimal]=None,
 
    ) -> None:
 
        if tolerance is None:
 
            tolerance = self.TOLERANCE
 
        self._currency_map = {amount.currency: amount for amount in source}
 
        self.tolerance = tolerance
 
        self._currency_map: Dict[str, data.Amount] = {}
 
        for amount in source:
 
            self._add_amount(self._currency_map, amount)
 

	
 
    def _add_amount(self,
 
                    currency_map: MutableMapping[str, data.Amount],
 
                    amount: data.Amount,
 
    ) -> None:
 
        code = amount.currency
 
        try:
 
            current_number = currency_map[code].number
 
        except KeyError:
 
            current_number = Decimal(0)
 
        currency_map[code] = data.Amount(current_number + amount.number, code)
 

	
 
    def _add_other(self,
 
                   currency_map: MutableMapping[str, data.Amount],
 
                   other: Union[data.Amount, 'Balance'],
 
    ) -> None:
 
        if isinstance(other, Balance):
 
            for amount in other.values():
 
                self._add_amount(currency_map, amount)
 
        else:
 
            self._add_amount(currency_map, other)
 

	
 
    def __repr__(self) -> str:
 
        values = [repr(amt) for amt in self.values()]
 
        return f"{type(self).__name__}({values!r})"
 

	
 
    def __str__(self) -> str:
 
        return self.format()
 

	
 
    def __abs__(self: BalanceType) -> BalanceType:
 
        return type(self)(bc_amount.abs(amt) for amt in self.values())
 

	
 
    def __add__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
 
        retval_map = self._currency_map.copy()
 
        self._add_other(retval_map, other)
 
        return type(self)(retval_map.values())
 

	
 
    def __eq__(self, other: Any) -> bool:
 
        if (self.is_zero()
 
            and isinstance(other, Balance)
 
            and other.is_zero()):
 
            return True
 
        else:
 
            return super().__eq__(other)
 

	
 
    def __neg__(self: BalanceType) -> BalanceType:
 
        return type(self)(-amt for amt in self.values())
 

	
 
    def __pos__(self: BalanceType) -> BalanceType:
 
        return self
 

	
 
    def __getitem__(self, key: str) -> data.Amount:
 
        return self._currency_map[key]
 

	
 
    def __iter__(self) -> Iterator[str]:
 
        return iter(self._currency_map)
 

	
 
    def __len__(self) -> int:
 
        return len(self._currency_map)
 

	
 
    def _all_amounts(self,
 
                     op_func: Callable[[DecimalCompat, DecimalCompat], bool],
 
                     operand: DecimalCompat,
 
    ) -> bool:
 
        return all(op_func(amt.number, operand) for amt in self.values())
 

	
 
    @staticmethod
 
    def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
 
        dec = cast(Decimal, dec)
 
        return abs(dec) < tolerance
 

	
 
    def eq_zero(self) -> bool:
 
        """Returns true if all amounts in the balance == 0, within tolerance."""
 
        return self._all_amounts(self.within_tolerance, self.tolerance)
 

	
 
    is_zero = eq_zero
 

	
 
    def ge_zero(self) -> bool:
 
        """Returns true if all amounts in the balance >= 0, within tolerance."""
 
        op_func = operator.gt if self.tolerance else operator.ge
 
        return self._all_amounts(op_func, -self.tolerance)
 

	
 
    def le_zero(self) -> bool:
 
        """Returns true if all amounts in the balance <= 0, within tolerance."""
 
        op_func = operator.lt if self.tolerance else operator.le
 
        return self._all_amounts(op_func, self.tolerance)
 

	
 
    def format(self,
 
               fmt: Optional[str]='#,#00.00 ¤¤',
 
               sep: str=', ',
 
               empty: str="Zero balance",
 
    ) -> str:
 
        """Formats the balance as a string with the given parameters
 

	
 
        If the balance is zero, returns ``empty``. Otherwise, returns a string
 
        with each amount in the balance formatted as ``fmt``, separated by
 
        ``sep``.
 

	
 
        If you set ``fmt`` to None, amounts will be formatted according to the
 
        user's locale. The default format is Beancount's input format.
 
        """
 
        amounts = [amount for amount in self.values() if amount.number]
 
        if not amounts:
 
            return empty
 
        amounts.sort(key=lambda amt: abs(amt.number), reverse=True)
 
        return sep.join(
 
            babel.numbers.format_currency(amt.number, amt.currency, fmt)
 
            for amt in amounts
 
        )
 

	
 

	
 
class MutableBalance(Balance):
 
    __slots__ = ()
 

	
 
    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, other)
 
        return self
 

	
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
tests/test_reports_balance.py
Show inline comments
 
"""test_reports_balance - Unit tests for reports.core.Balance"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import itertools
 

	
 
from decimal import Decimal
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
import babel.numbers
 

	
 
from conservancy_beancount.reports import core
 

	
 
DEFAULT_STRINGS = [
 
    ({}, "Zero balance"),
 
    ({'JPY': 0, 'BRL': 0}, "Zero balance"),
 
    ({'USD': '20.00'}, "20.00 USD"),
 
    ({'EUR': '50.00', 'GBP': '80.00'}, "80.00 GBP, 50.00 EUR"),
 
    ({'JPY': '-5500.00', 'BRL': '-8500.00'}, "-8,500.00 BRL, -5,500 JPY"),
 
]
 

	
 
def amounts_from_map(currency_map):
 
    for code, number in currency_map.items():
 
        yield testutil.Amount(number, code)
 

	
 
def test_empty_balance():
 
    balance = core.Balance()
 
    assert not balance
 
    assert len(balance) == 0
 
    assert balance.is_zero()
 
    with pytest.raises(KeyError):
 
        balance['USD']
 

	
 
@pytest.mark.parametrize('currencies', [
 
    'USD',
 
    'EUR GBP',
 
    'JPY INR BRL',
 
])
 
def test_zero_balance(currencies):
 
    keys = currencies.split()
 
    balance = core.Balance(testutil.Amount(0, key) for key in keys)
 
    assert balance
 
    assert len(balance) == len(keys)
 
    assert balance.is_zero()
 
    assert all(balance[key].number == 0 for key in keys)
 
    assert all(balance[key].currency == key for key in keys)
 

	
 
@pytest.mark.parametrize('currencies', [
 
    'USD',
 
    'EUR GBP',
 
    'JPY INR BRL',
 
])
 
def test_nonzero_balance(currencies):
 
    amounts = dict(zip(currencies.split(), itertools.count(110, 100)))
 
    balance = core.Balance(amounts_from_map(amounts))
 
    assert balance
 
    assert len(balance) == len(amounts)
 
    assert not balance.is_zero()
 
    assert all(balance[key] == testutil.Amount(amt, key) for key, amt in amounts.items())
 

	
 
def test_mixed_balance():
 
    amounts = {'USD': 0, 'EUR': 120}
 
    balance = core.Balance(amounts_from_map(amounts))
 
    assert balance
 
    assert len(balance) == 2
 
    assert not balance.is_zero()
 
    assert all(balance[key] == testutil.Amount(amt, key) for key, amt in amounts.items())
 

	
 
def test_init_recurring_currency():
 
    balance = core.Balance([
 
        testutil.Amount(20),
 
        testutil.Amount(40),
 
        testutil.Amount(60, 'EUR'),
 
        testutil.Amount(-80),
 
    ])
 
    assert balance
 
    assert balance['EUR'] == testutil.Amount(60, 'EUR')
 
    assert balance['USD'] == testutil.Amount(-20)
 

	
 
def test_init_zeroed_out():
 
    balance = core.Balance([
 
        testutil.Amount(25),
 
        testutil.Amount(40, 'EUR'),
 
        testutil.Amount(-25),
 
        testutil.Amount(-40, 'EUR'),
 
    ])
 
    assert balance.is_zero()
 

	
 
@pytest.mark.parametrize('mapping,expected', [
 
    ({}, True),
 
    ({'USD': 0}, True),
 
    ({'USD': 0, 'EUR': 0}, True),
 
    ({'USD': -10, 'EUR': 0}, False),
 
    ({'EUR': -10}, False),
 
    ({'USD': -10, 'EUR': -20}, False),
 
    ({'USD': 10, 'EUR': -20}, False),
 
    ({'JPY': 10}, False),
 
    ({'JPY': 10, 'BRL': 0}, False),
 
    ({'JPY': 10, 'BRL': 20}, False),
 
    ({'USD': '0.00015'}, True),
 
    ({'EUR': '-0.00052'}, True),
 
])
 
def test_eq_zero(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.eq_zero() == expected
 
    assert balance.is_zero() == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', [
 
    ({}, True),
 
    ({'USD': 0}, True),
 
    ({'USD': 0, 'EUR': 0}, True),
 
    ({'EUR': -10}, False),
 
    ({'USD': 10, 'EUR': -20}, False),
 
    ({'USD': -10, 'EUR': -20}, False),
 
    ({'JPY': 10}, True),
 
    ({'JPY': 10, 'BRL': 0}, True),
 
    ({'JPY': 10, 'BRL': 20}, True),
 
    ({'USD': '0.00015'}, True),
 
    ({'EUR': '-0.00052'}, True),
 
    ({'RUB': core.Balance.TOLERANCE}, True),
 
    ({'RUB': -core.Balance.TOLERANCE}, False),
 
])
 
def test_ge_zero(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.ge_zero() == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', [
 
    ({}, True),
 
    ({'USD': 0}, True),
 
    ({'USD': 0, 'EUR': 0}, True),
 
    ({'EUR': -10}, True),
 
    ({'USD': 10, 'EUR': -20}, False),
 
    ({'USD': -10, 'EUR': -20}, True),
 
    ({'JPY': 10}, False),
 
    ({'JPY': 10, 'BRL': 0}, False),
 
    ({'JPY': 10, 'BRL': 20}, False),
 
    ({'USD': '0.00015'}, True),
 
    ({'EUR': '-0.00052'}, True),
 
    ({'RUB': core.Balance.TOLERANCE}, False),
 
    ({'RUB': -core.Balance.TOLERANCE}, True),
 
])
 
def test_le_zero(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.le_zero() == expected
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_abs(mapping):
 
    actual = abs(core.Balance(amounts_from_map(mapping)))
 
    assert set(actual) == set(mapping)
 
    for key, number in mapping.items():
 
        assert actual[key] == testutil.Amount(abs(number), key)
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_neg(mapping):
 
    actual = -core.Balance(amounts_from_map(mapping))
 
    assert set(actual) == set(mapping)
 
    for key, number in mapping.items():
 
        assert actual[key] == testutil.Amount(-number, key)
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_pos(mapping):
 
    amounts = frozenset(amounts_from_map(mapping))
 
    actual = +core.Balance(amounts)
 
    assert set(actual.values()) == amounts
 

	
 
@pytest.mark.parametrize('map1,map2,expected', [
 
    ({}, {}, True),
 
    ({}, {'USD': 0}, True),
 
    ({}, {'EUR': 1}, False),
 
    ({'USD': 1}, {'EUR': 1}, False),
 
    ({'USD': 1}, {'USD': '1.0'}, True),
 
    ({'USD': 1}, {'USD': '1.0', 'EUR': '2.0'}, False),
 
    ({'USD': 1, 'BRL': '2.0'}, {'USD': '1.0', 'EUR': '2.0'}, False),
 
    ({'USD': 1, 'EUR': 2, 'BRL': '3.0'}, {'USD': '1.0', 'EUR': '2.0'}, False),
 
    ({'USD': 1, 'EUR': 2}, {'USD': '1.0', 'EUR': '2.0'}, True),
 
])
 
def test_eq(map1, map2, expected):
 
    bal1 = core.Balance(amounts_from_map(map1))
 
    bal2 = core.Balance(amounts_from_map(map2))
 
    actual = bal1 == bal2
 
    assert actual == expected
 

	
 
@pytest.mark.parametrize('number,currency', {
 
    (50, 'USD'),
 
    (-50, 'USD'),
 
    (50000, 'BRL'),
 
    (-4000, 'BRL'),
 
})
 
def test_add_amount(number, currency):
 
    start_amount = testutil.Amount(500, 'USD')
 
    start_bal = core.Balance([start_amount])
 
    add_amount = testutil.Amount(number, currency)
0 comments (0 inline, 0 general)