Changeset - 7f3a26b5557d
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-18 18:07:44
brettcsmith@brettcsmith.org
reports: Balance.format() accepts zero argument.

This change has the same motivation as the recent change to
BaseODS.balance_cell(): try to preserve currency information when it's
available.
3 files changed with 43 insertions and 23 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -418,100 +418,101 @@ class AgingReport(BaseReport):
 
                 out_file: BinaryIO,
 
                 date: Optional[datetime.date]=None,
 
    ) -> None:
 
        if date is None:
 
            date = datetime.date.today()
 
        self.out_bin = out_file
 
        self.logger = logger.getChild(type(self).__name__)
 
        self.ods = AgingODS(rt_wrapper, date, self.logger)
 

	
 
    def run(self, groups: PostGroups) -> None:
 
        rows: List[AccrualPostings] = []
 
        for group in groups.values():
 
            if group.is_zero():
 
                # Cheap optimization: don't slice and dice groups we're not
 
                # going to report anyway.
 
                continue
 
            elif group.accrual_type is None:
 
                group = group.since_last_nonzero()
 
            else:
 
                # Filter out new accruals after the report date.
 
                # e.g., cover the case that the same invoices has multiple
 
                # postings over time, and we don't want to report too-recent
 
                # ones.
 
                cutoff_date = self.ods.date - datetime.timedelta(
 
                    days=group.accrual_type.value.aging_thresholds[-1],
 
                )
 
                group = AccrualPostings(
 
                    post for post in group.since_last_nonzero()
 
                    if post.meta.date <= cutoff_date
 
                    or group.accrual_type.normalize_amount(post.units.number) < 0
 
                )
 
            if group and not group.is_zero():
 
                rows.append(group)
 
        rows.sort(key=lambda related: (
 
            related.account,
 
            related[0].meta.date,
 
            ('\0'.join(related.entities())
 
             if related.entity is related.INCONSISTENT
 
             else related.entity),
 
        ))
 
        self.ods.write(rows)
 
        self.ods.save_file(self.out_bin)
 

	
 

	
 
class BalanceReport(BaseReport):
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        date_s = posts[0].meta.date.strftime('%Y-%m-%d')
 
        balance_s = posts.balance_at_cost().format(zero="Zero balance")
 
        if index:
 
            yield ""
 
        yield f"{posts.invoice}:"
 
        yield f"  {posts.balance_at_cost()} outstanding since {date_s}"
 
        yield f"  {balance_s} outstanding since {date_s}"
 

	
 

	
 
class OutgoingReport(BaseReport):
 
    def __init__(self, rt_wrapper: rtutil.RT, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_wrapper = rt_wrapper
 
        self.rt_client = rt_wrapper.rt
 

	
 
    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
 
        rt_id = posts.rt_id
 
        if rt_id is None:
 
            raise ValueError("no rt-id links found")
 
        elif isinstance(rt_id, Sentinel):
 
            raise ValueError("multiple rt-id links found")
 
        parsed = rtutil.RT.parse(rt_id)
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        posts = posts.since_last_nonzero()
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            self.logger.error(
 
                "can't generate outgoings report for %s because no RT ticket available: %s",
 
                posts.invoice, errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -157,114 +157,121 @@ class Balance(Mapping[str, data.Amount]):
 

	
 
    def __iter__(self) -> Iterator[str]:
 
        return iter(self._currency_map)
 

	
 
    def __len__(self) -> int:
 
        return len(self._currency_map)
 

	
 
    def _all_amounts(self,
 
                     op_func: Callable[[DecimalCompat, DecimalCompat], bool],
 
                     operand: DecimalCompat,
 
    ) -> bool:
 
        return all(op_func(amt.number, operand) for amt in self.values())
 

	
 
    def copy(self: BalanceType, tolerance: Optional[Decimal]=None) -> BalanceType:
 
        if tolerance is None:
 
            tolerance = self.tolerance
 
        return type(self)(self.values(), tolerance)
 

	
 
    def clean_copy(self: BalanceType, tolerance: Optional[Decimal]=None) -> BalanceType:
 
        if tolerance is None:
 
            tolerance = self.tolerance
 
        return type(self)(
 
            (amount for amount in self.values() if abs(amount.number) >= tolerance),
 
            tolerance,
 
        )
 

	
 
    @staticmethod
 
    def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
 
        dec = cast(Decimal, dec)
 
        return abs(dec) < tolerance
 

	
 
    def eq_zero(self) -> bool:
 
        """Returns true if all amounts in the balance == 0, within tolerance."""
 
        return self._all_amounts(self.within_tolerance, self.tolerance)
 

	
 
    is_zero = eq_zero
 

	
 
    def ge_zero(self) -> bool:
 
        """Returns true if all amounts in the balance >= 0, within tolerance."""
 
        op_func = operator.gt if self.tolerance else operator.ge
 
        return self._all_amounts(op_func, -self.tolerance)
 

	
 
    def le_zero(self) -> bool:
 
        """Returns true if all amounts in the balance <= 0, within tolerance."""
 
        op_func = operator.lt if self.tolerance else operator.le
 
        return self._all_amounts(op_func, self.tolerance)
 

	
 
    def format(self,
 
               fmt: Optional[str]='#,#00.00 ¤¤',
 
               fmt: Optional[str]='#,##0.00 ¤¤',
 
               sep: str=', ',
 
               empty: str="Zero balance",
 
               zero: Optional[str]=None,
 
               tolerance: Optional[Decimal]=None,
 
    ) -> str:
 
        """Formats the balance as a string with the given parameters
 

	
 
        If the balance is zero (within tolerance), returns ``empty``.
 
        Otherwise, returns a string with each amount in the balance formatted
 
        If the balance is completely empty, return ``empty``.
 
        If the balance is zero (within tolerance) and ``zero`` is specified,
 
        return ``zero``.
 
        Otherwise, return a string with each amount in the balance formatted
 
        as ``fmt``, separated by ``sep``.
 

	
 
        If you set ``fmt`` to None, amounts will be formatted according to the
 
        user's locale. The default format is Beancount's input format.
 
        """
 
        amounts = list(self.clean_copy(tolerance).values())
 
        if not amounts:
 
        balance = self.clean_copy(tolerance) or self.copy(tolerance)
 
        if not balance:
 
            return empty
 
        amounts.sort(key=lambda amt: abs(amt.number), reverse=True)
 
        elif zero is not None and balance.is_zero():
 
            return zero
 
        else:
 
            amounts = list(balance.values())
 
            amounts.sort(key=lambda amt: (-abs(amt.number), amt.currency))
 
            return sep.join(
 
                babel.numbers.format_currency(amt.number, amt.currency, fmt)
 
                for amt in amounts
 
            )
 

	
 

	
 
class MutableBalance(Balance):
 
    __slots__ = ()
 

	
 
    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, other)
 
        return self
 

	
 
    def __isub__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, -other)
 
        return self
 

	
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 
    __slots__ = ('_postings',)
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        self._postings: List[data.Posting]
 
        if _can_own and isinstance(source, list):
 
            self._postings = source
 
        else:
 
            self._postings = list(source)
 

	
 
    @classmethod
 
    def _group_by(cls: Type[RelatedType],
 
                  postings: Iterable[data.Posting],
tests/test_reports_balance.py
Show inline comments
 
"""test_reports_balance - Unit tests for reports.core.Balance"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import itertools
 

	
 
from decimal import Decimal
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
import babel.numbers
 

	
 
from conservancy_beancount.reports import core
 

	
 
DEFAULT_STRINGS = [
 
    ({}, "Zero balance"),
 
    ({'JPY': 0, 'BRL': 0}, "Zero balance"),
 
    ({'JPY': 0, 'BRL': 0}, "0.00 BRL, 0 JPY"),
 
    ({'USD': '20.00'}, "20.00 USD"),
 
    ({'EUR': '50.00', 'GBP': '80.00'}, "80.00 GBP, 50.00 EUR"),
 
    ({'JPY': '-5500.00', 'BRL': '-8500.00'}, "-8,500.00 BRL, -5,500 JPY"),
 
    ({'USD': 10, 'EUR': '.00015'}, "10.00 USD"),
 
    ({'JPY': '-.00015'}, "Zero balance"),
 
    ({'JPY': '-.00015'}, "-0 JPY"),
 
]
 

	
 
TOLERANCES = [Decimal(n) for n in ['.1', '.01', '.001', 0]]
 

	
 
def amounts_from_map(currency_map):
 
    for code, number in currency_map.items():
 
        yield testutil.Amount(number, code)
 

	
 
def test_empty_balance():
 
    balance = core.Balance()
 
    assert not balance
 
    assert len(balance) == 0
 
    assert balance.is_zero()
 
    with pytest.raises(KeyError):
 
        balance['USD']
 

	
 
@pytest.mark.parametrize('currencies', [
 
    'USD',
 
    'EUR GBP',
 
    'JPY INR BRL',
 
])
 
def test_zero_balance(currencies):
 
    keys = currencies.split()
 
    balance = core.Balance(testutil.Amount(0, key) for key in keys)
 
    assert balance
 
    assert len(balance) == len(keys)
 
    assert balance.is_zero()
 
    assert all(balance[key].number == 0 for key in keys)
 
    assert all(balance[key].currency == key for key in keys)
 

	
 
@pytest.mark.parametrize('currencies', [
 
    'USD',
 
    'EUR GBP',
 
    'JPY INR BRL',
 
])
 
def test_nonzero_balance(currencies):
 
    amounts = dict(zip(currencies.split(), itertools.count(110, 100)))
 
    balance = core.Balance(amounts_from_map(amounts))
 
    assert balance
 
    assert len(balance) == len(amounts)
 
    assert not balance.is_zero()
 
    assert all(balance[key] == testutil.Amount(amt, key) for key, amt in amounts.items())
 

	
 
def test_mixed_balance():
 
    amounts = {'USD': 0, 'EUR': 120}
 
    balance = core.Balance(amounts_from_map(amounts))
 
    assert balance
 
    assert len(balance) == 2
...
 
@@ -340,100 +340,112 @@ def test_iadd_balance(mapping):
 
    expect_numbers = {'USD': 500, 'BRL': 40000}
 
    balance = core.MutableBalance(amounts_from_map(expect_numbers))
 
    for code, number in mapping.items():
 
        expect_numbers[code] = expect_numbers.get(code, 0) + number
 
    balance += core.Balance(amounts_from_map(mapping))
 
    expected = core.Balance(amounts_from_map(expect_numbers))
 
    assert balance == expected
 

	
 
def test_copy():
 
    amounts = frozenset(amounts_from_map({'USD': 10, 'EUR': '.001'}))
 
    # Use a ridiculous tolerance to test it doesn't matter.
 
    actual = core.Balance(amounts, 100).copy()
 
    assert frozenset(actual.values()) == amounts
 

	
 
@pytest.mark.parametrize('tolerance', TOLERANCES)
 
def test_clean_copy(tolerance):
 
    usd = testutil.Amount(10)
 
    eur = testutil.Amount('.002', 'EUR')
 
    actual = core.Balance([usd, eur], tolerance).clean_copy()
 
    if tolerance < eur.number:
 
        expected = {usd, eur}
 
    else:
 
        expected = {usd}
 
    assert frozenset(actual.values()) == expected
 

	
 
@pytest.mark.parametrize('tolerance', TOLERANCES)
 
def test_clean_copy_arg(tolerance):
 
    usd = testutil.Amount(10)
 
    eur = testutil.Amount('.002', 'EUR')
 
    actual = core.Balance([usd, eur], 0).clean_copy(tolerance)
 
    if tolerance < eur.number:
 
        expected = {usd, eur}
 
    else:
 
        expected = {usd}
 
    assert frozenset(actual.values()) == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', DEFAULT_STRINGS)
 
def test_str(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert str(balance) == expected
 

	
 
@pytest.mark.parametrize('mapping,expected', DEFAULT_STRINGS)
 
def test_format_defaults(mapping, expected):
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.format() == expected
 

	
 
@pytest.mark.parametrize('fmt,expected', [
 
    ('¤##0.0', '¥5000, -€1500.00'),
 
    ('#,#00.0¤¤', '5,000JPY, -1,500.00EUR'),
 
    ('#,##0.0¤¤', '5,000JPY, -1,500.00EUR'),
 
    ('¤+##0.0;¤-##0.0', '¥+5000, €-1500.00'),
 
    ('#,#00.0 ¤¤;(#,#00.0 ¤¤)', '5,000 JPY, (1,500.00 EUR)'),
 
    ('#,##0.0 ¤¤;(#,##0.0 ¤¤)', '5,000 JPY, (1,500.00 EUR)'),
 
])
 
def test_format_fmt(fmt, expected):
 
    amounts = [testutil.Amount(5000, 'JPY'), testutil.Amount(-1500, 'EUR')]
 
    balance = core.Balance(amounts)
 
    assert balance.format(fmt) == expected
 

	
 
@pytest.mark.parametrize('sep', [
 
    '; ',
 
    '—',
 
    '\0',
 
])
 
def test_format_sep(sep):
 
    mapping, expected = DEFAULT_STRINGS[-1]
 
    expected = expected.replace(', ', sep)
 
    balance = core.Balance(amounts_from_map(mapping))
 
    assert balance.format(sep=sep) == expected
 

	
 
def test_format_none():
 
    args = (65000, 'BRL')
 
    balance = core.Balance([testutil.Amount(*args)])
 
    expected = babel.numbers.format_currency(*args)
 
    assert balance.format(None) == expected
 

	
 
@pytest.mark.parametrize('empty', [
 
    "N/A",
 
    "Zero",
 
    "ø",
 
])
 
def test_format_empty(empty):
 
    balance = core.Balance()
 
    assert balance.format(empty=empty) == empty
 

	
 
@pytest.mark.parametrize('tolerance', TOLERANCES)
 
def test_str_tolerance(tolerance):
 
    chf = testutil.Amount('.005', 'CHF')
 
    actual = str(core.Balance([chf], tolerance))
 
    if tolerance > chf.number:
 
        assert actual == "Zero balance"
 
    else:
 
        assert actual == "00.00 CHF"
 
@pytest.mark.parametrize('currency,fmt', itertools.product(
 
    ['USD', 'JPY', 'BRL'],
 
    [None, '¤#,##0.00', '###0.00 ¤¤'],
 
))
 
def test_format_zero_balance_fmt(currency, fmt):
 
    zero_amt = testutil.Amount(0, currency)
 
    nonzero_amt = testutil.Amount(9, currency)
 
    zero_bal = core.Balance([zero_amt])
 
    nonzero_bal = core.Balance([nonzero_amt])
 
    expected = nonzero_bal.format(fmt).replace('9', '0')
 
    assert zero_bal.format(fmt) == expected
 

	
 
@pytest.mark.parametrize('currency,fmt', testutil.combine_values(
 
    ['USD', 'JPY', 'BRL'],
 
    ["N/A", "Zero", "ø"],
 
))
 
def test_format_zero_balance_zero_str(currency, fmt):
 
    zero_amt = testutil.Amount(0, currency)
 
    zero_bal = core.Balance([zero_amt])
 
    assert zero_bal.format(zero=fmt) == fmt
 

	
 
@pytest.mark.parametrize('tolerance', TOLERANCES)
 
def test_format_tolerance(tolerance):
 
def test_format_zero_balance_with_tolerance(tolerance):
 
    chf = testutil.Amount('.005', 'CHF')
 
    actual = core.Balance([chf]).format(tolerance=tolerance)
 
    actual = core.Balance([chf]).format(zero="ø", tolerance=tolerance)
 
    if tolerance > chf.number:
 
        assert actual == "Zero balance"
 
        assert actual == "ø"
 
    else:
 
        assert actual == "00.00 CHF"
 
        assert actual == "0.00 CHF"
0 comments (0 inline, 0 general)