Brett Smith - 4 years ago 2020-08-17 14:34:38
reports: Balance tolerance can be an int.
""" - Common data classes for reporting functionality"""
# Copyright © 2020  Brett Smith
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <>.

import abc
import collections
import copy
import datetime
import enum
import itertools
import operator
import re
import shlex
import sys
import urllib.parse as urlparse

import babel.core  # type:ignore[import]
import babel.numbers  # type:ignore[import]

import git  # type:ignore[import]

import odf.config  # type:ignore[import]
import odf.element  # type:ignore[import]
import odf.meta  # type:ignore[import]
import odf.number  # type:ignore[import]
import odf.opendocument  # type:ignore[import]
import  # type:ignore[import]
import odf.table  # type:ignore[import]
import odf.text  # type:ignore[import]

from decimal import Decimal
from pathlib import Path

from beancount.core import amount as bc_amount
from odf.namespaces import TOOLSVERSION  # type:ignore[import]

from ..cliutil import VERSION
from .. import data
from .. import filters
from .. import rtutil

from typing import (
from ..beancount_types import (


DecimalCompat = data.DecimalCompat
BalanceType = TypeVar('BalanceType', bound='Balance')
ElementType = Callable[..., odf.element.Element]
LinkType = Union[str, Tuple[str, Optional[str]]]
RelatedType = TypeVar('RelatedType', bound='RelatedPostings')
RT = TypeVar('RT', bound=Sequence)
ST = TypeVar('ST')
T = TypeVar('T')

class Balance(Mapping[str, data.Amount]):
    """A collection of amounts mapped by currency

    Each key is a Beancount currency string, and each value represents the
    balance in that currency.
    __slots__ = ('_currency_map', 'tolerance')
    TOLERANCE = Decimal('0.01')

    def __init__(self,
                 source: Iterable[data.Amount]=(),
                 tolerance: Optional[DecimalCompat]=None,
    ) -> None:
        if tolerance is None:
            tolerance = self.TOLERANCE
        self.tolerance = tolerance
        self._currency_map: Dict[str, data.Amount] = {}
        for amount in source:
            self._add_amount(self._currency_map, amount)

    def _add_amount(self,
                    currency_map: MutableMapping[str, data.Amount],
                    amount: data.Amount,
    ) -> None:
        code = amount.currency
            current_number = currency_map[code].number
        except KeyError:
            current_number = Decimal(0)
        currency_map[code] = data.Amount(current_number + amount.number, code)

    def _add_other(self,
                   currency_map: MutableMapping[str, data.Amount],
                   other: Union[data.Amount, 'Balance'],
    ) -> None:
        if isinstance(other, Balance):
            for amount in other.values():
                self._add_amount(currency_map, amount)
            self._add_amount(currency_map, other)

    def __repr__(self) -> str:
        values = [repr(amt) for amt in self.values()]
        return f"{type(self).__name__}({values!r})"

    def __str__(self) -> str:
        return self.format()

    def __abs__(self: BalanceType) -> BalanceType:
        return type(self)(bc_amount.abs(amt) for amt in self.values())

    def __add__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
        retval_map = self._currency_map.copy()
        self._add_other(retval_map, other)
        return type(self)(retval_map.values())

    def __sub__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
        return self.__add__(-other)

    def __eq__(self, other: Any) -> bool:
        if isinstance(other, Balance):
            clean_self = self.clean_copy()
            clean_other = other.clean_copy()
            return len(clean_self) == len(clean_other) and all(
                clean_self[key] == clean_other.get(key) for key in clean_self
            return super().__eq__(other)

    def __neg__(self: BalanceType) -> BalanceType:
        return type(self)(-amt for amt in self.values())

    def __pos__(self: BalanceType) -> BalanceType:
        return self

    def __getitem__(self, key: str) -> data.Amount:
        return self._currency_map[key]

    def __iter__(self) -> Iterator[str]:
        return iter(self._currency_map)

    def __len__(self) -> int:
        return len(self._currency_map)

    def _all_amounts(self,
                     op_func: Callable[[DecimalCompat, DecimalCompat], bool],
                     operand: DecimalCompat,
    ) -> bool:
        return all(op_func(amt.number, operand) for amt in self.values())

    def copy(self: BalanceType, tolerance: Optional[DecimalCompat]=None) -> BalanceType:
        if tolerance is None:
            tolerance = self.tolerance
        return type(self)(self.values(), tolerance)

    def clean_copy(self: BalanceType, tolerance: Optional[DecimalCompat]=None) -> BalanceType:
        if tolerance is None:
            tolerance = self.tolerance
        return type(self)(
            (amount for amount in self.values() if abs(amount.number) >= tolerance),

    def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
        dec = cast(Decimal, dec)
        return abs(dec) < tolerance

    def eq_zero(self) -> bool:
        """Returns true if all amounts in the balance == 0, within tolerance."""
        return self._all_amounts(self.within_tolerance, self.tolerance)

    is_zero = eq_zero

    def ge_zero(self) -> bool:
        """Returns true if all amounts in the balance >= 0, within tolerance."""
        op_func = if self.tolerance else
        return self._all_amounts(op_func, -self.tolerance)

    def le_zero(self) -> bool:
        """Returns true if all amounts in the balance <= 0, within tolerance."""
        op_func = if self.tolerance else operator.le
        return self._all_amounts(op_func, self.tolerance)

    def format(self,
               fmt: Optional[str]='#,##0.00 ¤¤',
               sep: str=', ',
               empty: str="Zero balance",
               zero: Optional[str]=None,
               tolerance: Optional[DecimalCompat]=None,
    ) -> str:
        """Formats the balance as a string with the given parameters

        If the balance is completely empty, return ``empty``.
        If the balance is zero (within tolerance) and ``zero`` is specified,
        return ``zero``.
        Otherwise, return a string with each amount in the balance formatted
        as ``fmt``, separated by ``sep``.

        If you set ``fmt`` to None, amounts will be formatted according to the
        user's locale. The default format is Beancount's input format.
        balance = self.clean_copy(tolerance) or self.copy(tolerance)
        if not balance:
            return empty
        elif zero is not None and balance.is_zero():
            return zero
            amounts = list(balance.values())
            amounts.sort(key=lambda amt: (-abs(amt.number), amt.currency))
            return sep.join(
                    amt.number, amt.currency, fmt, format_type='accounting',
                ) for amt in amounts


class MutableBalance(Balance):
    __slots__ = ()

    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
        self._add_other(self._currency_map, other)
        return self

    def __isub__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
        self._add_other(self._currency_map, -other)
        return self


class RelatedPostings(Sequence[data.Posting]):
    """Collect and query related postings

    This class provides common functionality for collecting related postings
    and running queries on them: iterating over them, tallying their balance,

    This class doesn't know anything about how the postings are related. That's
    entirely up to the caller.

    A common pattern is to use this class with collections.defaultdict
    to organize postings based on some key. See the group_by_meta classmethod
    for an example.
    __slots__ = ('_postings',)

    def __init__(self,
                 source: Iterable[data.Posting]=(),
                 _can_own: bool=False,
    ) -> None:
        self._postings: Sequence[data.Posting]
        if _can_own and isinstance(source, Sequence):
            self._postings = source
            self._postings = list(source)

    def _group_by(cls: Type[RelatedType],
                  postings: Iterable[data.Posting],
                  key: Callable[[data.Posting], T],
    ) -> Iterator[Tuple[T, RelatedType]]:
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
        for post in postings:
        for value, posts in mapping.items():
            yield value, cls(posts, _can_own=True)

    def group_by_account(cls: Type[RelatedType],
                         postings: Iterable[data.Posting],
    ) -> Iterator[Tuple[data.Account, RelatedType]]:
        return cls._group_by(postings, operator.attrgetter('account'))

    def group_by_meta(cls: Type[RelatedType],
                      postings: Iterable[data.Posting],
                      key: MetaKey,
                      default: Optional[MetaValue]=None,
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
        """Relate postings by metadata value

        This method takes an iterable of postings and returns a mapping.
        The keys of the mapping are the values of post.meta.get(key, default).
        The values are RelatedPostings instances that contain all the postings
        that had that same metadata value.
        def key_func(post: data.Posting) -> Optional[MetaValue]:
            return post.meta.get(key, default)
        return cls._group_by(postings, key_func)

    def group_by_first_meta_link(
            cls: Type[RelatedType],
            postings: Iterable[data.Posting],
            key: MetaKey,
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
        """Relate postings by the first link in metadata

        This method takes an iterable of postings and returns a mapping.
        The keys of the mapping are the values of
        post.meta.first_link(key, None).
        The values are RelatedPostings instances that contain all the postings
        that had that same first metadata link.
        def key_func(post: data.Posting) -> Optional[MetaValue]:
            return post.meta.first_link(key, None)
        return cls._group_by(postings, key_func)

    def __repr__(self) -> str:
        return f'<{type(self).__name__} {self._postings!r}>'

    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...

    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...

    def __getitem__(self: RelatedType,
                    index: Union[int, slice],
    ) -> Union[data.Posting, RelatedType]:
        if isinstance(index, slice):
            return type(self)(self._postings[index], _can_own=True)
            return self._postings[index]

    def __len__(self) -> int:
        return len(self._postings)

    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
        return filters.iter_unique(
            link for post in self for link in post.meta.report_links(key)

    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...

    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...

    def first_meta_links(self,
                         key: MetaKey,
                         default: Optional[str]='',
    ) -> Iterator[Optional[str]]:
        retval = filters.iter_unique(
            post.meta.first_link(key, default) for post in self
        if default == '':
            retval = (s for s in retval if s)
        return retval

    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
        balance = MutableBalance()
        for post in self:
            balance += post.units
            yield post, balance

    def balance(self) -> Balance:
        return Balance(post.units for post in self)

    def balance_at_cost(self) -> Balance:
        return Balance(post.at_cost() for post in self)

    def balance_at_cost_by_date(self, date: -> Balance:
        for index, post in enumerate(self):
            if >= date:
            index += 1
        return Balance(post.at_cost() for post in self._postings[:index])

    def meta_values(self,
                    key: MetaKey,
                    default: Optional[MetaValue]=None,
    ) -> Set[Optional[MetaValue]]:
        return {post.meta.get(key, default) for post in self}


class PeriodPostings(RelatedPostings):
    """Postings filtered and balanced over a date range

    Create a subclass with ``PeriodPostings.with_start_date(date)``.
    Note that there is no explicit stop date. The expectation is that the
    caller has already filtered out posts past the stop date from the input.

    Instances of that subclass will have three Balance attributes:

    * ``start_bal`` is the balance at cost of postings to your start date
    * ``period_bal`` is the balance at cost of postings from your start date
    * ``stop_bal`` is the balance at cost of all postings

    Use this subclass when your report includes a lot of balances over time to
    help you get the math right.
    __slots__ = (
    START_DATE =, 1, 1)

    def __init__(self,
                 source: Iterable[data.Posting]=(),
                 _can_own: bool=False,
    ) -> None:
        start_posts: List[data.Posting] = []
        period_posts: List[data.Posting] = []
        for post in source:
            if < self.START_DATE:
        super().__init__(period_posts, _can_own=True)
        self.start_bal = RelatedPostings(start_posts, _can_own=True).balance_at_cost()
        self.period_bal = self.balance_at_cost()
        self.stop_bal = self.start_bal + self.period_bal
        # Convenience aliases
        self.begin_bal = self.start_bal
        self.end_bal = self.stop_bal

    def with_start_date(cls: Type[RelatedType], start_date: -> Type[RelatedType]:
        name = f'BalancePostings{start_date.strftime("%Y%m%d")}'
        return type(name, (cls,), {'START_DATE': start_date})


class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta):
    """Abstract base class to help write spreadsheets

    This class provides the very core logic to write an arbitrary set of data
    rows to arbitrary output. It calls hooks when it starts writing the
    spreadsheet, starts a new "section" of rows, ends a section, and ends the

    RT is the type of the input data rows. ST is the type of the section
    identifier that you create from each row. If you don't want to use the
    section logic at all, set ST to None and define section_key to return None.

    def section_key(self, row: RT) -> ST:
        """Return the section a row belongs to

        Given a data row, this method should return some identifier for the
        "section" the row belongs to. The write method uses this to
        determine when to call start_section and end_section.

        If your spreadsheet doesn't need sections, define this to return None.

    def write_row(self, row: RT) -> None:
        """Write a data row to the output spreadsheet

        This method is called once for each data row in the input.

    # The next four methods are all called by the write method when the name
    # says. You may override them to output headers or sums, record
    # state, etc. The default implementations are all noops.

    def start_spreadsheet(self) -> None:

    def start_section(self, key: ST) -> None:

    def end_section(self, key: ST) -> None:

    def end_spreadsheet(self) -> None:

    def write(self, rows: Iterable[RT]) -> None:
        prev_section: Optional[ST] = None
        for row in rows:
            section = self.section_key(row)
            if section != prev_section:
                if prev_section is not None:
                prev_section = section
            should_end = section is not None
        except NameError:
            should_end = False
        if should_end:


class Border(enum.IntFlag):
    TOP = 1
    RIGHT = 2
    BOTTOM = 4
    LEFT = 8
    # in CSS order, clockwise from top


class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
    """Abstract base class to help write OpenDocument spreadsheets

    This class provides the very core logic to write an arbitrary set of data
    rows to an OpenDocument spreadsheet. It provides helper methods for
    building sheets, rows, and cells.

    See also the BaseSpreadsheet base class for additional documentation about
    methods you must and can define, the definition of RT and ST, etc.
    # Defined in the XSL spec, "Definitions of Units of Measure"
    MEASUREMENT_UNITS = frozenset([
    MEASUREMENT_RE = re.compile(

    def __init__(self, rt_wrapper: Optional[rtutil.RT]=None) -> None:
        self.rt_wrapper = rt_wrapper
        self.locale = babel.core.Locale.default('LC_MONETARY')
        self.currency_fmt_key = 'accounting'
        self._name_counter = itertools.count(1)
        self._style_cache: MutableMapping[str,] = {}
        self.document = odf.opendocument.OpenDocumentSpreadsheet()
        self.sheet = self.use_sheet("Report")

    ### Low-level document tree manipulation
    # The *intent* is that you only need to use these if you're adding new
    # methods to manipulate document settings or styles.

    def copy_element(self, elem: odf.element.Element) -> odf.element.Element:
        retval = odf.element.Element(
            orig_name = retval.getAttribute('name')
        except ValueError:
            orig_name = None
        if orig_name is not None:
            retval.setAttribute('name', f'{orig_name}{next(self._name_counter)}')
        for child in elem.childNodes:
            # Order is important: need to check the deepest subclasses first.
            if isinstance(child, odf.element.CDATASection):
            elif isinstance(child, odf.element.Text):
        return retval

    def ensure_child(self,
                     parent: odf.element.Element,
                     child_type: ElementType,
                     **kwargs: Any,
    ) -> odf.element.Element:
        new_child = child_type(**kwargs)
        found_child = self.find_child(parent, new_child)
        if found_child is None:
            return parent.lastChild
            return found_child

    def ensure_config_map_entry(self,
                                root: odf.element.Element,
                                map_name: str,
                                entry_name: str,
    ) -> odf.element.Element:
        """Return a ``ConfigItemMapEntry`` under ``root``

        This method ensures there's a ``ConfigItemMapNamed`` named ``map_name``
        under ``root``, and a ``ConfigItemMapEntry`` named ``entry_name`` under
        that. Return the ``ConfigItemMapEntry`` element.
        config_map = self.ensure_child(root, odf.config.ConfigItemMapNamed, name=map_name)
        return self.ensure_child(config_map, odf.config.ConfigItemMapEntry, name=entry_name)

    def find_child(self,
                   parent: odf.element.Element,
                   child: odf.element.Element,
    ) -> Optional[odf.element.Element]:
        attrs = {k: v for k, v in self.iter_attributes(child)}
        if not attrs:
            return None
        for elem in parent.childNodes:
            if (elem.qname == child.qname
                and all(elem.getAttribute(k) == v for k, v in attrs.items())):
                return elem
        return None

    def iter_attributes(self, elem: odf.element.Element) -> Iterator[Tuple[str, str]]:
        for (_, key), value in self.iter_qattributes(elem):
            yield key.lower().replace('-', ''), value

    def iter_qattributes(self, elem: odf.element.Element) -> Iterator[Tuple[Tuple[str, str], str]]:
        if elem.attributes:
            yield from elem.attributes.items()

    def replace_child(self,
                     parent: odf.element.Element,
                     child_type: ElementType,
                     **kwargs: Any,
    ) -> odf.element.Element:
        new_child = child_type(**kwargs)
        found_child = self.find_child(parent, new_child)
        parent.insertBefore(new_child, found_child)
        if found_child is not None:
        return new_child

    def set_config(self,
                   root: odf.element.Element,
                   name: str,
                   value: Union[bool, int, str],
                   config_type: Optional[str]=None,
    ) -> None:
        """Ensure ``root`` has a ``ConfigItem`` with the given name, type, and value"""
        value_s = str(value)
        if isinstance(value, bool):
            value_s = str(value).lower()
            default_type = 'boolean'
        elif isinstance(value, str):
            default_type = 'string'
        if config_type is None:
                config_type = default_type
            except NameError:
                raise ValueError(
                    f"need config_type for {type(value).__name__} value",
                ) from None
        item = self.replace_child(
            root, odf.config.ConfigItem, name=name, type=config_type,

    ### Styles

    def border_style(self,
                     edges: int,
                     width: str='1px',
                     style: str='solid',
                     color: str='#000000',
    ) ->
        flags = [edge for edge in Border if edges & edge]
        if not flags:
            raise ValueError(f"no valid edges in {edges!r}")
        border_attr = f'{width} {style} {color}'
        key = f'{",".join( for f in flags)} {border_attr}'
            retval = self._style_cache[key]
        except KeyError:
            props =
            for flag in flags:
                props.setAttribute(f'border{}', border_attr)
            retval =
            self._style_cache[key] = retval
        return retval

    def column_style(self, width: Union[float, str], **attrs: Any) ->
        if not isinstance(width, str) or (width and not width[-1].isalpha()):
            width = f'{width}in'
        match = self.MEASUREMENT_RE.fullmatch(width)
        if match is None:
            raise ValueError(f"invalid width {width!r}")
        width_float = float(
        if width_float <= 0:
            # Per the OpenDocument spec, column-width is a positiveLength.
            raise ValueError(f"width {width!r} must be positive")
        width = '{:.3g}{}'.format(width_float,
        retval = self.ensure_child(
            name=f'col_{width.replace(".", "_")}'
        retval.setAttribute('family', 'table-column')
        if retval.firstChild is None:
                columnwidth=width, **attrs
        return retval
