Brett Smith - 4 years ago 2020-06-15 14:16:34
data: Add Posting.at_cost() method.
@@ -155,315 +155,321 @@ class Account(str):

    def slice_parts(self,
                    start: Optional[Union[int, slice]]=None,
                    stop: Optional[int]=None,
    ) -> Sequence[str]:
        """Slice the account parts like they were a list

        Given a single index, return that part of the account name as a string.
        Otherwise, return a list of part names sliced according to the arguments.
        if start is None:
            part_slice = slice(None)
        elif isinstance(start, slice):
            part_slice = start
        elif stop is None:
            return self[self._find_part_slice(start)]
            part_slice = slice(start, stop)
        return self.split(self.SEP)[part_slice]

    def root_part(self, count: int=1) -> str:
        """Return the first part(s) of the account name as a string"""
            stop = self._find_part_slice(count - 1).stop
        except IndexError:
            return self
            return self[:stop]


class Amount(bc_amount.Amount):
    """Beancount amount after processing

    Beancount's native Amount class declares number to be Optional[Decimal],
    because the number is None when Beancount first parses a posting that does
    not have an amount, because the user wants it to be automatically balanced.

    As part of the loading process, Beancount replaces those None numbers
    with the calculated amount, so it will always be a Decimal. This class
    overrides the type declaration accordingly, so the type checker knows
    that our code doesn't have to consider the possibility that number is
    number: decimal.Decimal

    # beancount.core._Amount is the plain namedtuple.
    # beancore.core.Amount adds instance methods to it.
    # b.c.Amount.__New__ calls `b.c._Amount.__new__`, which confuses type
    # checking. See <>.
    # It works fine if you use super(), which is better practice anyway.
    # So we override __new__ just to call _Amount.__new__ this way.
    def __new__(cls, number: decimal.Decimal, currency: str) -> 'Amount':
        return super(bc_amount.Amount, Amount).__new__(cls, number, currency)


class Metadata(MutableMapping[MetaKey, MetaValue]):
    """Transaction or posting metadata

    This class wraps a Beancount metadata dictionary with additional methods
    for common parsing and query tasks.
    __slots__ = ('meta',)
    _HUMAN_NAMES: MutableMapping[MetaKey, str] = {
        # Initialize this dict with special cases.
        # We use it as a cache for other metadata names as they're queried.
        'check-id': 'Check Number',
        'paypal-id': 'PayPal ID',
        'rt-id': 'Ticket',

    def __init__(self, source: MutableMapping[MetaKey, MetaValue]) -> None:
        self.meta = source

    def __iter__(self) -> Iterator[MetaKey]:
        return iter(self.meta)

    def __len__(self) -> int:
        return len(self.meta)

    def __getitem__(self, key: MetaKey) -> MetaValue:
        return self.meta[key]

    def __setitem__(self, key: MetaKey, value: MetaValue) -> None:
        self.meta[key] = value

    def __delitem__(self, key: MetaKey) -> None:
        del self.meta[key]

    def get_links(self, key: MetaKey) -> Sequence[str]:
            value = self.meta[key]
        except KeyError:
            return ()
        if isinstance(value, str):
            return value.split()
            raise TypeError("{} metadata is a {}, not str".format(
                key, type(value).__name__,

    def first_link(self, key: MetaKey, default: None=None) -> Optional[str]: ...

    def first_link(self, key: MetaKey, default: str) -> str: ...

    def first_link(self, key: MetaKey, default: Optional[str]=None) -> Optional[str]:
            return self.get_links(key)[0]
        except (IndexError, TypeError):
            return default

    def human_name(cls, key: MetaKey) -> str:
        """Return the "human" version of a metadata name

        This is usually the metadata key with punctuation replaced with spaces,
        and then titlecased, with a few special cases. The return value is
        suitable for using in reports.
            retval = cls._HUMAN_NAMES[key]
        except KeyError:
            retval = key.replace('-', ' ').title()
            retval = re.sub(r'\bId$', 'ID', retval)
            cls._HUMAN_NAMES[key] = retval
        return retval


class PostingMeta(Metadata):
    """Combined access to posting metadata with its parent transaction metadata

    This lets you access posting metadata through a single dict-like object.
    If you try to look up metadata that doesn't exist on the posting, it will
    look for the value in the parent transaction metadata instead.

    You can set and delete metadata as well. Changes only affect the metadata
    of the posting, never the transaction. Changes are propagated to the
    underlying Beancount data structures.

    Functionally, you can think of this as identical to:

      collections.ChainMap(post.meta, txn.meta)

    Under the hood, this class does a little extra work to avoid creating
    posting metadata if it doesn't have to.
    __slots__ = ('txn', 'index', 'post')

    def __init__(self,
                 txn: Transaction,
                 index: int,
                 post: Optional[BasePosting]=None,
    ) -> None:
        if post is None:
            post = txn.postings[index]
        self.txn = txn
        self.index = index
 = post
        if post.meta is None:
            self.meta = self.txn.meta
            self.meta = collections.ChainMap(post.meta, txn.meta)

    def __getitem__(self, key: MetaKey) -> MetaValue:
            return super().__getitem__(key)
        except KeyError:
            if key == 'entity' and self.txn.payee is not None:
                return self.txn.payee

    def __setitem__(self, key: MetaKey, value: MetaValue) -> None:
        if is None:
   ={key: value})
            self.txn.postings[self.index] =
            # mypy complains that could be None, but we know
            # from two lines up that it's not.
            self.meta = collections.ChainMap(, self.txn.meta)  # type:ignore[arg-type]
            super().__setitem__(key, value)

    def __delitem__(self, key: MetaKey) -> None:
        if is None:
            raise KeyError(key)

    # This is arguably cheating a litttle bit, but I'd argue the date of
    # the parent transaction still qualifies as posting metadata, and
    # it's something we want to access so often it's good to have it
    # within easy reach.
    def date(self) ->


class Posting(BasePosting):
    """Enhanced Posting objects

    This class is a subclass of Beancount's native Posting class where
    specific fields are replaced with enhanced versions:

    * The `account` field is an Account object
    * The `units` field is our Amount object (which simply declares that the
      number is always a Decimal—see that docstring for details)
    * The `meta` field is a PostingMeta object
    __slots__ = ()

    account: Account
    units: Amount
    cost: Optional[bc_position.Cost]
    # mypy correctly complains that our MutableMapping is not compatible
    # with Beancount's meta type declaration of Optional[Dict]. IMO
    # Beancount's type declaration is a smidge too specific: I think its type
    # declaration should also use MutableMapping, because it would be very
    # unusual for code to specifically require a Dict over that.
    # If it did, this declaration would pass without issue.
    meta: PostingMeta  # type:ignore[assignment]

    def from_beancount(cls,
                       txn: Transaction,
                       index: int,
                       post: Optional[BasePosting]=None,
    ) -> 'Posting':
        if post is None:
            post = txn.postings[index]
        return cls(
            # see rationale above about Posting.meta
            PostingMeta(txn, index, post), # type:ignore[arg-type]

    def from_txn(cls, txn: Transaction) -> Iterator['Posting']:
        """Yield an enhanced Posting object for every posting in the transaction"""
        for index, post in enumerate(txn.postings):
            yield cls.from_beancount(txn, index, post)

    def from_entries(cls, entries: Iterable[Directive]) -> Iterator['Posting']:
        """Yield an enhanced Posting object for every posting in these entries"""
        for entry in entries:
            # Because Beancount's own Transaction class isn't type-checkable,
            # we can't statically check this. Might as well rely on duck
            # typing while we're at it: just try to yield postings from
            # everything, and ignore entries that lack a postings attribute.
                yield from cls.from_txn(entry)  # type:ignore[arg-type]
            except AttributeError:

    def at_cost(self) -> Amount:
        if self.cost is None:
            return self.units
            return Amount(self.units.number * self.cost.number, self.cost.currency)


_KT = TypeVar('_KT', bound=Hashable)
_VT = TypeVar('_VT')
class _SizedDict(collections.OrderedDict, MutableMapping[_KT, _VT]):
    def __init__(self, maxsize: int=128) -> None:
        self.maxsize = maxsize

    def __setitem__(self, key: _KT, value: _VT) -> None:
        super().__setitem__(key, value)
        for _ in range(self.maxsize, len(self)):


def balance_of(txn: Transaction,
               *preds: Callable[[Account], Optional[bool]],
) -> Amount:
    """Return the balance of specified postings in a transaction.

    Given a transaction and a series of account predicates, balance_of
    returns the balance of the amounts of all postings with accounts that
    match any of the predicates.

    balance_of uses the "weight" of each posting, so the return value will
    use the currency of the postings' cost when available.
    match_posts = [post for post in Posting.from_txn(txn)
                   if any(pred(post.account) for pred in preds)]
    number = decimal.Decimal(0)
    if not match_posts:
        currency = ''
        weights: Sequence[Amount] = [
            bc_convert.get_weight(post) for post in match_posts
        number = sum((wt.number for wt in weights), number)
        currency = weights[0].currency
    return Amount(number, currency)

_opening_balance_cache: MutableMapping[str, bool] = _SizedDict()
def is_opening_balance_txn(txn: Transaction) -> bool:
    key = '\0'.join(
        f'{post.account}={post.units}' for post in txn.postings
        return _opening_balance_cache[key]
    except KeyError:
    opening_equity = balance_of(txn, Account.is_opening_equity)
    if not opening_equity.currency:
        retval = False
        rest = balance_of(txn, lambda acct: not acct.is_opening_equity())
        if not rest.currency:
            retval = False
            retval = abs(opening_equity.number + rest.number) < decimal.Decimal('.01')
    _opening_balance_cache[key] = retval
    return retval
@@ -103,528 +103,516 @@ class Balance(Mapping[str, data.Amount]):
    ) -> None:
        code = amount.currency
            current_number = currency_map[code].number
        except KeyError:
            current_number = Decimal(0)
        currency_map[code] = data.Amount(current_number + amount.number, code)

    def _add_other(self,
                   currency_map: MutableMapping[str, data.Amount],
                   other: Union[data.Amount, 'Balance'],
    ) -> None:
        if isinstance(other, Balance):
            for amount in other.values():
                self._add_amount(currency_map, amount)
            self._add_amount(currency_map, other)

    def __repr__(self) -> str:
        values = [repr(amt) for amt in self.values()]
        return f"{type(self).__name__}({values!r})"

    def __str__(self) -> str:
        return self.format()

    def __abs__(self: BalanceType) -> BalanceType:
        return type(self)(bc_amount.abs(amt) for amt in self.values())

    def __add__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
        retval_map = self._currency_map.copy()
        self._add_other(retval_map, other)
        return type(self)(retval_map.values())

    def __eq__(self, other: Any) -> bool:
        if isinstance(other, Balance):
            clean_self = self.clean_copy()
            clean_other = other.clean_copy()
            return len(clean_self) == len(clean_other) and all(
                clean_self[key] == clean_other.get(key) for key in clean_self
            return super().__eq__(other)

    def __neg__(self: BalanceType) -> BalanceType:
        return type(self)(-amt for amt in self.values())

    def __pos__(self: BalanceType) -> BalanceType:
        return self

    def __getitem__(self, key: str) -> data.Amount:
        return self._currency_map[key]

    def __iter__(self) -> Iterator[str]:
        return iter(self._currency_map)

    def __len__(self) -> int:
        return len(self._currency_map)

    def _all_amounts(self,
                     op_func: Callable[[DecimalCompat, DecimalCompat], bool],
                     operand: DecimalCompat,
    ) -> bool:
        return all(op_func(amt.number, operand) for amt in self.values())

    def copy(self: BalanceType) -> BalanceType:
        return type(self)(self.values())

    def clean_copy(self: BalanceType, tolerance: Optional[Decimal]=None) -> BalanceType:
        if tolerance is None:
            tolerance = self.tolerance
        return type(self)(
            amount for amount in self.values()
            if abs(amount.number) >= tolerance

    def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
        dec = cast(Decimal, dec)
        return abs(dec) < tolerance

    def eq_zero(self) -> bool:
        """Returns true if all amounts in the balance == 0, within tolerance."""
        return self._all_amounts(self.within_tolerance, self.tolerance)

    is_zero = eq_zero

    def ge_zero(self) -> bool:
        """Returns true if all amounts in the balance >= 0, within tolerance."""
        op_func = if self.tolerance else
        return self._all_amounts(op_func, -self.tolerance)

    def le_zero(self) -> bool:
        """Returns true if all amounts in the balance <= 0, within tolerance."""
        op_func = if self.tolerance else operator.le
        return self._all_amounts(op_func, self.tolerance)

    def format(self,
               fmt: Optional[str]='#,#00.00 ¤¤',
               sep: str=', ',
               empty: str="Zero balance",
               tolerance: Optional[Decimal]=None,
    ) -> str:
        """Formats the balance as a string with the given parameters

        If the balance is zero (within tolerance), returns ``empty``.
        Otherwise, returns a string with each amount in the balance formatted
        as ``fmt``, separated by ``sep``.

        If you set ``fmt`` to None, amounts will be formatted according to the
        user's locale. The default format is Beancount's input format.
        amounts = list(self.clean_copy(tolerance).values())
        if not amounts:
            return empty
        amounts.sort(key=lambda amt: abs(amt.number), reverse=True)
        return sep.join(
            babel.numbers.format_currency(amt.number, amt.currency, fmt)
            for amt in amounts


class MutableBalance(Balance):
    __slots__ = ()

    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
        self._add_other(self._currency_map, other)
        return self


class RelatedPostings(Sequence[data.Posting]):
    """Collect and query related postings

    This class provides common functionality for collecting related postings
    and running queries on them: iterating over them, tallying their balance,

    This class doesn't know anything about how the postings are related. That's
    entirely up to the caller.

    A common pattern is to use this class with collections.defaultdict
    to organize postings based on some key. See the group_by_meta classmethod
    for an example.
    __slots__ = ('_postings',)

    def __init__(self,
                 source: Iterable[data.Posting]=(),
                 _can_own: bool=False,
    ) -> None:
        self._postings: List[data.Posting]
        if _can_own and isinstance(source, list):
            self._postings = source
            self._postings = list(source)

    def _group_by(cls: Type[RelatedType],
                  postings: Iterable[data.Posting],
                  key: Callable[[data.Posting], T],
    ) -> Iterator[Tuple[T, RelatedType]]:
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
        for post in postings:
        for value, posts in mapping.items():
            yield value, cls(posts, _can_own=True)

    def group_by_meta(cls: Type[RelatedType],
                      postings: Iterable[data.Posting],
                      key: MetaKey,
                      default: Optional[MetaValue]=None,
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
        """Relate postings by metadata value

        This method takes an iterable of postings and returns a mapping.
        The keys of the mapping are the values of post.meta.get(key, default).
        The values are RelatedPostings instances that contain all the postings
        that had that same metadata value.
        def key_func(post: data.Posting) -> Optional[MetaValue]:
            return post.meta.get(key, default)
        return cls._group_by(postings, key_func)

    def group_by_first_meta_link(
            cls: Type[RelatedType],
            postings: Iterable[data.Posting],
            key: MetaKey,
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
        """Relate postings by the first link in metadata

        This method takes an iterable of postings and returns a mapping.
        The keys of the mapping are the values of
        post.meta.first_link(key, None).
        The values are RelatedPostings instances that contain all the postings
        that had that same first metadata link.
        def key_func(post: data.Posting) -> Optional[MetaValue]:
            return post.meta.first_link(key, None)
        return cls._group_by(postings, key_func)

    def __repr__(self) -> str:
        return f'<{type(self).__name__} {self._postings!r}>'

    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...

    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...

    def __getitem__(self: RelatedType,
                    index: Union[int, slice],
    ) -> Union[data.Posting, RelatedType]:
        if isinstance(index, slice):
            return type(self)(self._postings[index], _can_own=True)
            return self._postings[index]

    def __len__(self) -> int:
        return len(self._postings)

    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
        for post in self:
                yield from post.meta.get_links(key)
            except TypeError:

    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
        return filters.iter_unique(self._all_meta_links(key))

    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...

    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...

    def first_meta_links(self,
                         key: MetaKey,
                         default: Optional[str]='',
    ) -> Iterator[Optional[str]]:
        retval = filters.iter_unique(
            post.meta.first_link(key, default) for post in self
        if default == '':
            retval = (s for s in retval if s)
        return retval

    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
        balance = MutableBalance()
        for post in self:
            balance += post.units
            yield post, balance

    def balance(self) -> Balance:
        for _, balance in self.iter_with_balance():
            return balance
        except NameError:
            return Balance()
        return Balance(post.units for post in self)

    def balance_at_cost(self) -> Balance:
        balance = MutableBalance()
        for post in self:
            if post.cost is None:
                balance += post.units
                number = post.units.number * post.cost.number
                balance += data.Amount(number, post.cost.currency)
        return balance
        return Balance(post.at_cost() for post in self)

    def meta_values(self,
                    key: MetaKey,
                    default: Optional[MetaValue]=None,
    ) -> Set[Optional[MetaValue]]:
        return {post.meta.get(key, default) for post in self}


class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta):
    """Abstract base class to help write spreadsheets

    This class provides the very core logic to write an arbitrary set of data
    rows to arbitrary output. It calls hooks when it starts writing the
    spreadsheet, starts a new "section" of rows, ends a section, and ends the

    RT is the type of the input data rows. ST is the type of the section
    identifier that you create from each row. If you don't want to use the
    section logic at all, set ST to None and define section_key to return None.

    def section_key(self, row: RT) -> ST:
        """Return the section a row belongs to

        Given a data row, this method should return some identifier for the
        "section" the row belongs to. The write method uses this to
        determine when to call start_section and end_section.

        If your spreadsheet doesn't need sections, define this to return None.

    def write_row(self, row: RT) -> None:
        """Write a data row to the output spreadsheet

        This method is called once for each data row in the input.

    # The next four methods are all called by the write method when the name
    # says. You may override them to output headers or sums, record
    # state, etc. The default implementations are all noops.

    def start_spreadsheet(self) -> None:

    def start_section(self, key: ST) -> None:

    def end_section(self, key: ST) -> None:

    def end_spreadsheet(self) -> None:

    def write(self, rows: Iterable[RT]) -> None:
        prev_section: Optional[ST] = None
        for row in rows:
            section = self.section_key(row)
            if section != prev_section:
                if prev_section is not None:
                prev_section = section
            should_end = section is not None
        except NameError:
            should_end = False
        if should_end:


class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
    """Abstract base class to help write OpenDocument spreadsheets

    This class provides the very core logic to write an arbitrary set of data
    rows to an OpenDocument spreadsheet. It provides helper methods for
    building sheets, rows, and cells.

    See also the BaseSpreadsheet base class for additional documentation about
    methods you must and can define, the definition of RT and ST, etc.
    def __init__(self, rt_wrapper: Optional[rtutil.RT]=None) -> None:
        self.rt_wrapper = rt_wrapper
        self.locale = babel.core.Locale.default('LC_MONETARY')
        self.currency_fmt_key = 'accounting'
        self._name_counter = itertools.count(1)
        self._currency_style_cache: MutableMapping[str,] = {}
        self.document = odf.opendocument.OpenDocumentSpreadsheet()
        self.sheet = self.use_sheet("Report")

    ### Low-level document tree manipulation
    # The *intent* is that you only need to use these if you're adding new
    # methods to manipulate document settings or styles.

    def copy_element(self, elem: odf.element.Element) -> odf.element.Element:
        qattrs = dict(self.iter_qattributes(elem))
        retval = odf.element.Element(qname=elem.qname, qattributes=qattrs)
            orig_name = retval.getAttribute('name')
        except ValueError:
            orig_name = None
        if orig_name is not None:
            retval.setAttribute('name', f'{orig_name}{next(self._name_counter)}')
        return retval

    def ensure_child(self,
                     parent: odf.element.Element,
                     child_type: ElementType,
                     **kwargs: Any,
    ) -> odf.element.Element:
        new_child = child_type(**kwargs)
        found_child = self.find_child(parent, new_child)
        if found_child is None:
            return parent.lastChild
            return found_child

    def ensure_config_map_entry(self,
                                root: odf.element.Element,
                                map_name: str,
                                entry_name: str,
    ) -> odf.element.Element:
        """Return a ``ConfigItemMapEntry`` under ``root``

        This method ensures there's a ``ConfigItemMapNamed`` named ``map_name``
        under ``root``, and a ``ConfigItemMapEntry`` named ``entry_name`` under
        that. Return the ``ConfigItemMapEntry`` element.
        config_map = self.ensure_child(root, odf.config.ConfigItemMapNamed, name=map_name)
        return self.ensure_child(config_map, odf.config.ConfigItemMapEntry, name=entry_name)

    def find_child(self,
                   parent: odf.element.Element,
                   child: odf.element.Element,
    ) -> Optional[odf.element.Element]:
        attrs = {k: v for k, v in self.iter_attributes(child)}
        if not attrs:
            return None
        for elem in parent.childNodes:
            if (elem.qname == child.qname
                and all(elem.getAttribute(k) == v for k, v in attrs.items())):
                return elem
        return None

    def iter_attributes(self, elem: odf.element.Element) -> Iterator[Tuple[str, str]]:
        for (_, key), value in self.iter_qattributes(elem):
            yield key.lower().replace('-', ''), value

    def iter_qattributes(self, elem: odf.element.Element) -> Iterator[Tuple[Tuple[str, str], str]]:
        if elem.attributes:
            yield from elem.attributes.items()

    def replace_child(self,
                     parent: odf.element.Element,
                     child_type: ElementType,
                     **kwargs: Any,
    ) -> odf.element.Element:
        new_child = child_type(**kwargs)
        found_child = self.find_child(parent, new_child)
        parent.insertBefore(new_child, found_child)
        if found_child is not None:
        return new_child

    def set_config(self,
                   root: odf.element.Element,
                   name: str,
                   value: Union[bool, int, str],
                   config_type: Optional[str]=None,
    ) -> None:
        """Ensure ``root`` has a ``ConfigItem`` with the given name, type, and value"""
        value_s = str(value)
        if isinstance(value, bool):
            value_s = str(value).lower()
            default_type = 'boolean'
        elif isinstance(value, str):
            default_type = 'string'
        if config_type is None:
                config_type = default_type
            except NameError:
                raise ValueError(
                    f"need config_type for {type(value).__name__} value",
                ) from None
        item = self.replace_child(
            root, odf.config.ConfigItem, name=name, type=config_type,

    ### Styles

    def _build_currency_style(
            root: odf.element.Element,
            locale: babel.core.Locale,
            code: str,
            fmt_index: int,
            properties: Optional[]=None,
            fmt_key: Optional[str]=None,
            volatile: bool=False,
            minintegerdigits: int=1,
    ) -> odf.element.Element:
        if fmt_key is None:
            fmt_key = self.currency_fmt_key
        pattern = locale.currency_formats[fmt_key]
        fmts = pattern.pattern.split(';')
            fmt = fmts[fmt_index]
        except IndexError:
            fmt = fmts[0]
            grouping = pattern.grouping[0]
            grouping = pattern.grouping[fmt_index]
        zero_s = babel.numbers.format_currency(0, code, '##0.0', locale)
            decimal_index = zero_s.rindex('.') + 1
        except ValueError:
            decimalplaces = 0
            decimalplaces = len(zero_s) - decimal_index
        style = self.replace_child(
        style.setAttribute('volatile', 'true' if volatile else 'false')
        if properties is not None:
        for part in re.split(r"(¤+|[#0,.]+|'[^']+')", fmt):
            if not part:
            elif not part.strip('#0,.'):
                    grouping='true' if grouping else 'false',
            elif part == '¤':
                    text=babel.numbers.get_currency_symbol(code, locale),
            elif part == '¤¤':
"""Test Posting methods"""
# Copyright © 2020  Brett Smith
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <>.

import pytest

from . import testutil

from beancount.core import data as bc_data

from conservancy_beancount import data

def simple_txn(index=None, key=None):
    return testutil.Transaction(note='txn note', postings=[
        ('Assets:Cash', 5),
        ('Income:Donations', -5, {'note': 'donation love', 'extra': 'Extra'}),

def test_from_beancount():
    txn = testutil.Transaction(payee='Smith-Dakota', postings=[
        ('Income:Donations', -50),
        ('Assets:Cash', 50, {'receipt': 'cash-donation.pdf'}),
    post = data.Posting.from_beancount(txn, 1)
    # We don't just want to assert isinstance(post.attr, data.SomeClass);
    # we also want to double-check that attributes were instantiated correctly.
    assert post.account.is_under('Assets:Cash')
    assert post.meta['receipt'] == 'cash-donation.pdf'
    assert post.meta['entity'] == 'Smith-Dakota'
    assert == testutil.FY_MID_DATE

def test_setting_metadata_propagates_to_source(simple_txn):
    src_post = simple_txn.postings[1]
    post = data.Posting.from_beancount(simple_txn, 1)
    post.meta['edited'] = 'yes'
    assert src_post.meta['edited'] == 'yes'
    assert not isinstance(src_post.meta, data.PostingMeta)

def test_deleting_metadata_propagates_to_source(simple_txn):
    post = data.Posting.from_beancount(simple_txn, 1)
    del post.meta['extra']
    assert 'extra' not in simple_txn.postings[1].meta

def test_from_txn(simple_txn):
    for source, post in zip(simple_txn.postings, data.Posting.from_txn(simple_txn)):
        assert all(source[x] == post[x] for x in range(len(source) - 1))
        assert isinstance(post.account, data.Account)
        assert post.meta['note']  # Only works with PostingMeta

def test_from_entries_two_txns(simple_txn):
    entries = [simple_txn, simple_txn]
    sources = [post for txn in entries for post in txn.postings]
    for source, post in zip(sources, data.Posting.from_entries(entries)):
        assert all(source[x] == post[x] for x in range(len(source) - 1))
        assert isinstance(post.account, data.Account)
        assert post.meta['note']  # Only works with PostingMeta

def test_from_entries_mix_txns_and_other_directives(simple_txn):
    meta = {
        'filename': __file__,
        'lineno': 75,
    entries = [
        bc_data.Commodity(meta, testutil.FY_START_DATE, 'EUR'),
        bc_data.Commodity(meta, testutil.FY_START_DATE, 'USD'),
    for source, post in zip(simple_txn.postings, data.Posting.from_entries(entries)):
        assert all(source[x] == post[x] for x in range(len(source) - 1))
        assert isinstance(post.account, data.Account)
        assert post.meta['note']  # Only works with PostingMeta

@pytest.mark.parametrize('cost_num', [105, 110, 115])
def test_at_cost(cost_num):
    post = data.Posting(
        testutil.Amount(25, 'EUR'),
        testutil.Cost(cost_num, 'JPY'),
    assert post.at_cost() == testutil.Amount(25 * cost_num, 'JPY')

def test_at_cost_no_cost():
    amount = testutil.Amount(25, 'EUR')
    post = data.Posting(
    assert post.at_cost() == amount
