Changeset - 7f3a26b5557d
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-18 18:07:44
brettcsmith@brettcsmith.org
reports: Balance.format() accepts zero argument.

This change has the same motivation as the recent change to
BaseODS.balance_cell(): try to preserve currency information when it's
available.
3 files changed with 47 insertions and 27 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -370,196 +370,197 @@ class AgingODS(core.BaseODS[AccrualPostings, Optional[data.Account]]):
 
                ),
 
                *(odf.table.TableCell() for _ in range(1, text_span)),
 
                self.balance_cell(balance),
 
            )
 
            last_age_text = age_text
 
            total_balance += balance
 
        self.add_row(
 
            self.string_cell(
 
                "Total Unpaid: ",
 
                stylename=text_style,
 
                numbercolumnsspanned=text_span,
 
            ),
 
            *(odf.table.TableCell() for _ in range(1, text_span)),
 
            self.balance_cell(total_balance),
 
        )
 

	
 
    def write_row(self, row: AccrualPostings) -> None:
 
        age = (self.date - row[0].meta.date).days
 
        if row.end_balance.ge_zero():
 
            for index, threshold in enumerate(self.age_thresholds):
 
                if age >= threshold:
 
                    self.age_balances[index] += row.end_balance
 
                    break
 
            else:
 
                return
 
        raw_balance = row.balance()
 
        if row.accrual_type is not None:
 
            raw_balance = row.accrual_type.normalize_amount(raw_balance)
 
        if raw_balance == row.end_balance:
 
            amount_cell = odf.table.TableCell()
 
        else:
 
            amount_cell = self.balance_cell(raw_balance)
 
        projects = {post.meta.get('project') or None for post in row}
 
        projects.discard(None)
 
        self.add_row(
 
            self.date_cell(row[0].meta.date),
 
            self.multiline_cell(row.entities()),
 
            amount_cell,
 
            self.balance_cell(row.end_balance),
 
            self.multiline_cell(sorted(projects)),
 
            *(self.meta_links_cell(row.all_meta_links(key))
 
              for key in self.DOC_COLUMNS),
 
        )
 

	
 

	
 
class AgingReport(BaseReport):
 
    def __init__(self,
 
                 rt_wrapper: rtutil.RT,
 
                 out_file: BinaryIO,
 
                 date: Optional[datetime.date]=None,
 
    ) -> None:
 
        if date is None:
 
            date = datetime.date.today()
 
        self.out_bin = out_file
 
        self.logger = logger.getChild(type(self).__name__)
 
        self.ods = AgingODS(rt_wrapper, date, self.logger)
 

	
 
    def run(self, groups: PostGroups) -> None:
 
        rows: List[AccrualPostings] = []
 
        for group in groups.values():
 
            if group.is_zero():
 
                # Cheap optimization: don't slice and dice groups we're not
 
                # going to report anyway.
 
                continue
 
            elif group.accrual_type is None:
 
                group = group.since_last_nonzero()
 
            else:
 
                # Filter out new accruals after the report date.
 
                # e.g., cover the case that the same invoices has multiple
 
                # postings over time, and we don't want to report too-recent
 
                # ones.
 
                cutoff_date = self.ods.date - datetime.timedelta(
 
                    days=group.accrual_type.value.aging_thresholds[-1],
 
                )
 
                group = AccrualPostings(
 
                    post for post in group.since_last_nonzero()
 
                    if post.meta.date <= cutoff_date
 
                    or group.accrual_type.normalize_amount(post.units.number) < 0
 
                )
 
            if group and not group.is_zero():
 
                rows.append(group)
 
        rows.sort(key=lambda related: (
 
            related.account,
 
            related[0].meta.date,
 
            ('\0'.join(related.entities())
 
             if related.entity is related.INCONSISTENT
 
             else related.entity),
 
        ))
 
        self.ods.write(rows)
 
        self.ods.save_file(self.out_bin)
 

	
 

	
 
class BalanceReport(BaseReport):
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        date_s = posts[0].meta.date.strftime('%Y-%m-%d')
 
        balance_s = posts.balance_at_cost().format(zero="Zero balance")
 
        if index:
 
            yield ""
 
        yield f"{posts.invoice}:"
 
        yield f"  {posts.balance_at_cost()} outstanding since {date_s}"
 
        yield f"  {balance_s} outstanding since {date_s}"
 

	
 

	
 
class OutgoingReport(BaseReport):
 
    def __init__(self, rt_wrapper: rtutil.RT, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_wrapper = rt_wrapper
 
        self.rt_client = rt_wrapper.rt
 

	
 
    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
 
        rt_id = posts.rt_id
 
        if rt_id is None:
 
            raise ValueError("no rt-id links found")
 
        elif isinstance(rt_id, Sentinel):
 
            raise ValueError("multiple rt-id links found")
 
        parsed = rtutil.RT.parse(rt_id)
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            self.logger.error(
 
                "can't generate outgoings report for %s because no RT ticket available: %s",
 
                posts.invoice, errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
 
                or ''
 
            )
 
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
 

	
 
        balance_s = posts.end_balance.format(None)
 
        raw_balance = -posts.balance()
 
        if raw_balance != posts.end_balance:
 
            balance_s = f'{raw_balance} ({balance_s})'
 

	
 
        contract_links = list(posts.all_meta_links('contract'))
 
        if contract_links:
 
            contract_s = ' , '.join(self.rt_wrapper.iter_urls(
 
                contract_links, missing_fmt='<BROKEN RT LINK: {}>',
 
            ))
 
        else:
 
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
 
        projects = [v for v in posts.meta_values('project')
 
                    if isinstance(v, str)]
 

	
 
        yield "PAYMENT FOR APPROVAL:"
 
        yield f"REQUESTOR: {requestor}"
 
        yield f"TOTAL TO PAY: {balance_s}"
 
        yield f"AGREEMENT: {contract_s}"
 
        yield f"PAYMENT TO: {ticket.get('CF.{payment-to}') or requestor_name}"
 
        yield f"PAYMENT METHOD: {ticket.get('CF.{payment-method}', '')}"
 
        yield f"PROJECT: {', '.join(projects)}"
 
        yield "\nBEANCOUNT ENTRIES:\n"
 

	
 
        last_txn: Optional[Transaction] = None
 
        for post in posts:
 
            txn = post.meta.txn
 
            if txn is not last_txn:
 
                last_txn = txn
 
                txn = self.rt_wrapper.txn_with_urls(txn, '{}')
 
                yield bc_printer.format_entry(txn)
 

	
 

	
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -109,214 +109,221 @@ class Balance(Mapping[str, data.Amount]):
 
        currency_map[code] = data.Amount(current_number + amount.number, code)
 

	
 
    def _add_other(self,
 
                   currency_map: MutableMapping[str, data.Amount],
 
                   other: Union[data.Amount, 'Balance'],
 
    ) -> None:
 
        if isinstance(other, Balance):
 
            for amount in other.values():
 
                self._add_amount(currency_map, amount)
 
        else:
 
            self._add_amount(currency_map, other)
 

	
 
    def __repr__(self) -> str:
 
        values = [repr(amt) for amt in self.values()]
 
        return f"{type(self).__name__}({values!r})"
 

	
 
    def __str__(self) -> str:
 
        return self.format()
 

	
 
    def __abs__(self: BalanceType) -> BalanceType:
 
        return type(self)(bc_amount.abs(amt) for amt in self.values())
 

	
 
    def __add__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
 
        retval_map = self._currency_map.copy()
 
        self._add_other(retval_map, other)
 
        return type(self)(retval_map.values())
 

	
 
    def __sub__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
 
        return self.__add__(-other)
 

	
 
    def __eq__(self, other: Any) -> bool:
 
        if isinstance(other, Balance):
 
            clean_self = self.clean_copy()
 
            clean_other = other.clean_copy()
 
            return len(clean_self) == len(clean_other) and all(
 
                clean_self[key] == clean_other.get(key) for key in clean_self
 
            )
 
        else:
 
            return super().__eq__(other)
 

	
 
    def __neg__(self: BalanceType) -> BalanceType:
 
        return type(self)(-amt for amt in self.values())
 

	
 
    def __pos__(self: BalanceType) -> BalanceType:
 
        return self
 

	
 
    def __getitem__(self, key: str) -> data.Amount:
 
        return self._currency_map[key]
 

	
 
    def __iter__(self) -> Iterator[str]:
 
        return iter(self._currency_map)
 

	
 
    def __len__(self) -> int:
 
        return len(self._currency_map)
 

	
 
    def _all_amounts(self,
 
                     op_func: Callable[[DecimalCompat, DecimalCompat], bool],
 
                     operand: DecimalCompat,
 
    ) -> bool:
 
        return all(op_func(amt.number, operand) for amt in self.values())
 

	
 
    def copy(self: BalanceType, tolerance: Optional[Decimal]=None) -> BalanceType:
 
        if tolerance is None:
 
            tolerance = self.tolerance
 
        return type(self)(self.values(), tolerance)
 

	
 
    def clean_copy(self: BalanceType, tolerance: Optional[Decimal]=None) -> BalanceType:
 
        if tolerance is None:
 
            tolerance = self.tolerance
 
        return type(self)(
 
            (amount for amount in self.values() if abs(amount.number) >= tolerance),
 
            tolerance,
 
        )
 

	
 
    @staticmethod
 
    def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
 
        dec = cast(Decimal, dec)
 
        return abs(dec) < tolerance
 

	
 
    def eq_zero(self) -> bool:
 
        """Returns true if all amounts in the balance == 0, within tolerance."""
 
        return self._all_amounts(self.within_tolerance, self.tolerance)
 

	
 
    is_zero = eq_zero
 

	
 
    def ge_zero(self) -> bool:
 
        """Returns true if all amounts in the balance >= 0, within tolerance."""
 
        op_func = operator.gt if self.tolerance else operator.ge
 
        return self._all_amounts(op_func, -self.tolerance)
 

	
 
    def le_zero(self) -> bool:
 
        """Returns true if all amounts in the balance <= 0, within tolerance."""
 
        op_func = operator.lt if self.tolerance else operator.le
 
        return self._all_amounts(op_func, self.tolerance)
 

	
 
    def format(self,
 
               fmt: Optional[str]='#,#00.00 ¤¤',
 
               fmt: Optional[str]='#,##0.00 ¤¤',
 
               sep: str=', ',
 
               empty: str="Zero balance",
 
               zero: Optional[str]=None,
 
               tolerance: Optional[Decimal]=None,
 
    ) -> str:
 
        """Formats the balance as a string with the given parameters
 

	
 
        If the balance is zero (within tolerance), returns ``empty``.
 
        Otherwise, returns a string with each amount in the balance formatted
 
        If the balance is completely empty, return ``empty``.
 
        If the balance is zero (within tolerance) and ``zero`` is specified,
 
        return ``zero``.
 
        Otherwise, return a string with each amount in the balance formatted
 
        as ``fmt``, separated by ``sep``.
 

	
 
        If you set ``fmt`` to None, amounts will be formatted according to the
 
        user's locale. The default format is Beancount's input format.
 
        """
 
        amounts = list(self.clean_copy(tolerance).values())
 
        if not amounts:
 
        balance = self.clean_copy(tolerance) or self.copy(tolerance)
 
        if not balance:
 
            return empty
 
        amounts.sort(key=lambda amt: abs(amt.number), reverse=True)
 
        return sep.join(
 
            babel.numbers.format_currency(amt.number, amt.currency, fmt)
 
            for amt in amounts
 
        )
 
        elif zero is not None and balance.is_zero():
 
            return zero
 
        else:
 
            amounts = list(balance.values())
 
            amounts.sort(key=lambda amt: (-abs(amt.number), amt.currency))
 
            return sep.join(
 
                babel.numbers.format_currency(amt.number, amt.currency, fmt)
 
                for amt in amounts
 
            )
 

	
 

	
 
class MutableBalance(Balance):
 
    __slots__ = ()
 

	
 
    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, other)
 
        return self
 

	
 
    def __isub__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, -other)
 
        return self
 

	
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 
    __slots__ = ('_postings',)
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        self._postings: List[data.Posting]
 
        if _can_own and isinstance(source, list):
 
            self._postings = source
 
        else:
 
            self._postings = list(source)
 

	
 
    @classmethod
 
    def _group_by(cls: Type[RelatedType],
 
                  postings: Iterable[data.Posting],
 
                  key: Callable[[data.Posting], T],
 
    ) -> Iterator[Tuple[T, RelatedType]]:
 
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
 
        for post in postings:
 
            mapping[key(post)].append(post)
 
        for value, posts in mapping.items():
 
            yield value, cls(posts, _can_own=True)
 

	
 
    @classmethod
 
    def group_by_account(cls: Type[RelatedType],
 
                         postings: Iterable[data.Posting],
 
    ) -> Iterator[Tuple[data.Account, RelatedType]]:
 
        return cls._group_by(postings, operator.attrgetter('account'))
 

	
 
    @classmethod
 
    def group_by_meta(cls: Type[RelatedType],
 
                      postings: Iterable[data.Posting],
 
                      key: MetaKey,
 
                      default: Optional[MetaValue]=None,
 
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
 
        """Relate postings by metadata value
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of post.meta.get(key, default).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same metadata value.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.get(key, default)
 
        return cls._group_by(postings, key_func)
 

	
 
    @classmethod
 
    def group_by_first_meta_link(
 
            cls: Type[RelatedType],
 
            postings: Iterable[data.Posting],
 
            key: MetaKey,
 
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
 
        """Relate postings by the first link in metadata
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of
 
        post.meta.first_link(key, None).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same first metadata link.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.first_link(key, None)
 
        return cls._group_by(postings, key_func)
 

	
 
    def __repr__(self) -> str:
 
        return f'<{type(self).__name__} {self._postings!r}>'
 

	
tests/test_reports_balance.py
Show inline comments
 
"""test_reports_balance - Unit tests for reports.core.Balance"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import itertools
 

	
 
from decimal import Decimal
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
import babel.numbers
 

	
 
from conservancy_beancount.reports import core
 

	
 
DEFAULT_STRINGS = [
 
    ({}, "Zero balance"),
 
    ({'JPY': 0, 'BRL': 0}, "Zero balance"),
 
    ({'JPY': 0, 'BRL': 0}, "0.00 BRL, 0 JPY"),
 
    ({'USD': '20.00'}, "20.00 USD"),
 
    ({'EUR': '50.00', 'GBP': '80.00'}, "80.00 GBP, 50.00 EUR"),
 
    ({'JPY': '-5500.00', 'BRL': '-8500.00'}, "-8,500.00 BRL, -5,500 JPY"),
 
    ({'USD': 10, 'EUR': '.00015'}, "10.00 USD"),
 
    ({'JPY': '-.00015'}, "Zero balance"),
 
    ({'JPY': '-.00015'}, "-0 JPY"),
 
]
 

	
 
TOLERANCES = [Decimal(n) for n in ['.1', '.01', '.001', 0]]
 

	
 
def amounts_from_map(currency_map):
 
    for code, number in currency_map.items():
 
        yield testutil.Amount(number, code)
 

	
 
def test_empty_balance():
 
    balance = core.Balance()
 
    assert not balance
 
    assert len(balance) == 0
 
    assert balance.is_zero()
 
    with pytest.raises(KeyError):
 
        balance['USD']
 

	
 
@pytest.mark.parametrize('currencies', [
 
    'USD',
 
    'EUR GBP',
 
    'JPY INR BRL',
 
])
 
def test_zero_balance(currencies):
 
    keys = currencies.split()
 
    balance = core.Balance(testutil.Amount(0, key) for key in keys)
 
    assert balance
 
    assert len(balance) == len(keys)
 
    assert balance.is_zero()
 
    assert all(balance[key].number == 0 for key in keys)
 
    assert all(balance[key].currency == key for key in keys)
 

	
 
@pytest.mark.parametrize('currencies', [
 
    'USD',
 
    'EUR GBP',
 
    'JPY INR BRL',
 
])
 
def test_nonzero_balance(currencies):
 
    amounts = dict(zip(currencies.split(), itertools.count(110, 100)))
 
    balance = core.Balance(amounts_from_map(amounts))
 
    assert balance
 
    assert len(balance) == len(amounts)
 
    assert not balance.is_zero()
 
    assert all(balance[key] == testutil.Amount(amt, key) for key, amt in amounts.items())
 

	
 
def test_mixed_balance():
 
    amounts = {'USD': 0, 'EUR': 120}
 
    balance = core.Balance(amounts_from_map(amounts))
 
    assert balance
 
    assert len(balance) == 2
 
    assert not balance.is_zero()
 
    assert all(balance[key] == testutil.Amount(amt, key) for key, amt in amounts.items())
 

	
 
def test_init_recurring_currency():
 
    balance = core.Balance([
 
        testutil.Amount(20),
 
        testutil.Amount(40),
 
        testutil.Amount(60, 'EUR'),
 
        testutil.Amount(-80),
 
    ])
 
    assert balance
 
    assert balance['EUR'] == testutil.Amount(60, 'EUR')
 
    assert balance['USD'] == testutil.Amount(-20)
 

	
 
def test_init_zeroed_out():
 
    balance = core.Balance([
 
        testutil.Amount(25),
 
        testutil.Amount(40, 'EUR'),
 
        testutil.Amount(-25),
 
        testutil.Amount(-40, 'EUR'),
 
    ])
 
    assert balance.is_zero()
 

	
 
@pytest.mark.parametrize('mapping,expected', [
 
    ({}, True),
 
    ({'USD': 0}, True),
 
    ({'USD': 0, 'EUR': 0}, True),
 
    ({'USD': -10, 'EUR': 0}, False),
 
    ({'EUR': -10}, False),
 
    ({'USD': -10, 'EUR': -20}, False),
 
    ({'USD': 10, 'EUR': -20}, False),
 
    ({'JPY': 10}, False),
 
    ({'JPY': 10, 'BRL': 0}, False),
 
    ({'JPY': 10, 'BRL': 20}, False),
 
    ({'USD': '0.00015'}, True),
 
    ({'EUR': '-0.00052'}, True),
 
])
 
def test_eq_zero(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.eq_zero() == expected
 
    assert balance.is_zero() == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', [
 
    ({}, True),
 
    ({'USD': 0}, True),
 
    ({'USD': 0, 'EUR': 0}, True),
 
    ({'EUR': -10}, False),
 
    ({'USD': 10, 'EUR': -20}, False),
...
 
@@ -292,148 +292,160 @@ def test_sub_amount(number, currency):
 

	
 
@pytest.mark.parametrize('number,currency', {
 
    (50, 'USD'),
 
    (-50, 'USD'),
 
    (50000, 'BRL'),
 
    (-4000, 'BRL'),
 
})
 
def test_isub_amount(number, currency):
 
    balance = core.MutableBalance([testutil.Amount(500, 'USD')])
 
    sub_amount = testutil.Amount(number, currency)
 
    balance -= sub_amount
 
    if currency == 'USD':
 
        assert len(balance) == 1
 
        assert balance['USD'] == testutil.Amount(500 - number)
 
    else:
 
        assert len(balance) == 2
 
        assert balance['USD'] == testutil.Amount(500)
 
        assert balance[currency] == -sub_amount
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_add_balance(mapping):
 
    expect_numbers = {'USD': 500, 'BRL': 40000}
 
    start_bal = core.Balance(amounts_from_map(expect_numbers))
 
    for code, number in mapping.items():
 
        expect_numbers[code] = expect_numbers.get(code, 0) + number
 
    add_bal = core.Balance(amounts_from_map(mapping))
 
    actual = start_bal + add_bal
 
    expected = core.Balance(amounts_from_map(expect_numbers))
 
    assert actual == expected
 

	
 
@pytest.mark.parametrize('mapping', [
 
    {},
 
    {'USD': 0},
 
    {'EUR': 10},
 
    {'JPY': 20, 'BRL': 30},
 
    {'EUR': -15},
 
    {'JPY': -25, 'BRL': -35},
 
    {'JPY': 40, 'USD': 0, 'EUR': -50},
 
])
 
def test_iadd_balance(mapping):
 
    expect_numbers = {'USD': 500, 'BRL': 40000}
 
    balance = core.MutableBalance(amounts_from_map(expect_numbers))
 
    for code, number in mapping.items():
 
        expect_numbers[code] = expect_numbers.get(code, 0) + number
 
    balance += core.Balance(amounts_from_map(mapping))
 
    expected = core.Balance(amounts_from_map(expect_numbers))
 
    assert balance == expected
 

	
 
def test_copy():
 
    amounts = frozenset(amounts_from_map({'USD': 10, 'EUR': '.001'}))
 
    # Use a ridiculous tolerance to test it doesn't matter.
 
    actual = core.Balance(amounts, 100).copy()
 
    assert frozenset(actual.values()) == amounts
 

	
 
@pytest.mark.parametrize('tolerance', TOLERANCES)
 
def test_clean_copy(tolerance):
 
    usd = testutil.Amount(10)
 
    eur = testutil.Amount('.002', 'EUR')
 
    actual = core.Balance([usd, eur], tolerance).clean_copy()
 
    if tolerance < eur.number:
 
        expected = {usd, eur}
 
    else:
 
        expected = {usd}
 
    assert frozenset(actual.values()) == expected
 

	
 
@pytest.mark.parametrize('tolerance', TOLERANCES)
 
def test_clean_copy_arg(tolerance):
 
    usd = testutil.Amount(10)
 
    eur = testutil.Amount('.002', 'EUR')
 
    actual = core.Balance([usd, eur], 0).clean_copy(tolerance)
 
    if tolerance < eur.number:
 
        expected = {usd, eur}
 
    else:
 
        expected = {usd}
 
    assert frozenset(actual.values()) == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', DEFAULT_STRINGS)
 
def test_str(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert str(balance) == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', DEFAULT_STRINGS)
 
def test_format_defaults(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.format() == expected
 

	
 
@pytest.mark.parametrize('fmt,expected', [
 
    ('¤##0.0', '¥5000, -€1500.00'),
 
    ('#,#00.0¤¤', '5,000JPY, -1,500.00EUR'),
 
    ('#,##0.0¤¤', '5,000JPY, -1,500.00EUR'),
 
    ('¤+##0.0;¤-##0.0', '¥+5000, €-1500.00'),
 
    ('#,#00.0 ¤¤;(#,#00.0 ¤¤)', '5,000 JPY, (1,500.00 EUR)'),
 
    ('#,##0.0 ¤¤;(#,##0.0 ¤¤)', '5,000 JPY, (1,500.00 EUR)'),
 
])
 
def test_format_fmt(fmt, expected):
 
    amounts = [testutil.Amount(5000, 'JPY'), testutil.Amount(-1500, 'EUR')]
 
    balance = core.Balance(amounts)
 
    assert balance.format(fmt) == expected
 

	
 
@pytest.mark.parametrize('sep', [
 
    '; ',
 
    '—',
 
    '\0',
 
])
 
def test_format_sep(sep):
 
    mapping, expected = DEFAULT_STRINGS[-1]
 
    expected = expected.replace(', ', sep)
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.format(sep=sep) == expected
 

	
 
def test_format_none():
 
    args = (65000, 'BRL')
 
    balance = core.Balance([testutil.Amount(*args)])
 
    expected = babel.numbers.format_currency(*args)
 
    assert balance.format(None) == expected
 

	
 
@pytest.mark.parametrize('empty', [
 
    "N/A",
 
    "Zero",
 
    "ø",
 
])
 
def test_format_empty(empty):
 
    balance = core.Balance()
 
    assert balance.format(empty=empty) == empty
 

	
 
@pytest.mark.parametrize('tolerance', TOLERANCES)
 
def test_str_tolerance(tolerance):
 
    chf = testutil.Amount('.005', 'CHF')
 
    actual = str(core.Balance([chf], tolerance))
 
    if tolerance > chf.number:
 
        assert actual == "Zero balance"
 
    else:
 
        assert actual == "00.00 CHF"
 
@pytest.mark.parametrize('currency,fmt', itertools.product(
 
    ['USD', 'JPY', 'BRL'],
 
    [None, '¤#,##0.00', '###0.00 ¤¤'],
 
))
 
def test_format_zero_balance_fmt(currency, fmt):
 
    zero_amt = testutil.Amount(0, currency)
 
    nonzero_amt = testutil.Amount(9, currency)
 
    zero_bal = core.Balance([zero_amt])
 
    nonzero_bal = core.Balance([nonzero_amt])
 
    expected = nonzero_bal.format(fmt).replace('9', '0')
 
    assert zero_bal.format(fmt) == expected
 

	
 
@pytest.mark.parametrize('currency,fmt', testutil.combine_values(
 
    ['USD', 'JPY', 'BRL'],
 
    ["N/A", "Zero", "ø"],
 
))
 
def test_format_zero_balance_zero_str(currency, fmt):
 
    zero_amt = testutil.Amount(0, currency)
 
    zero_bal = core.Balance([zero_amt])
 
    assert zero_bal.format(zero=fmt) == fmt
 

	
 
@pytest.mark.parametrize('tolerance', TOLERANCES)
 
def test_format_tolerance(tolerance):
 
def test_format_zero_balance_with_tolerance(tolerance):
 
    chf = testutil.Amount('.005', 'CHF')
 
    actual = core.Balance([chf]).format(tolerance=tolerance)
 
    actual = core.Balance([chf]).format(zero="ø", tolerance=tolerance)
 
    if tolerance > chf.number:
 
        assert actual == "Zero balance"
 
        assert actual == "ø"
 
    else:
 
        assert actual == "00.00 CHF"
 
        assert actual == "0.00 CHF"
0 comments (0 inline, 0 general)