Changeset - f21ac740f24c
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-15 14:16:34
brettcsmith@brettcsmith.org
data: Add Posting.at_cost() method.
3 files changed with 32 insertions and 14 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/data.py
Show inline comments
...
 
@@ -283,187 +283,193 @@ class Metadata(MutableMapping[MetaKey, MetaValue]):
 

	
 
class PostingMeta(Metadata):
 
    """Combined access to posting metadata with its parent transaction metadata
 

	
 
    This lets you access posting metadata through a single dict-like object.
 
    If you try to look up metadata that doesn't exist on the posting, it will
 
    look for the value in the parent transaction metadata instead.
 

	
 
    You can set and delete metadata as well. Changes only affect the metadata
 
    of the posting, never the transaction. Changes are propagated to the
 
    underlying Beancount data structures.
 

	
 
    Functionally, you can think of this as identical to:
 

	
 
      collections.ChainMap(post.meta, txn.meta)
 

	
 
    Under the hood, this class does a little extra work to avoid creating
 
    posting metadata if it doesn't have to.
 
    """
 
    __slots__ = ('txn', 'index', 'post')
 

	
 
    def __init__(self,
 
                 txn: Transaction,
 
                 index: int,
 
                 post: Optional[BasePosting]=None,
 
    ) -> None:
 
        if post is None:
 
            post = txn.postings[index]
 
        self.txn = txn
 
        self.index = index
 
        self.post = post
 
        if post.meta is None:
 
            self.meta = self.txn.meta
 
        else:
 
            self.meta = collections.ChainMap(post.meta, txn.meta)
 

	
 
    def __getitem__(self, key: MetaKey) -> MetaValue:
 
        try:
 
            return super().__getitem__(key)
 
        except KeyError:
 
            if key == 'entity' and self.txn.payee is not None:
 
                return self.txn.payee
 
            else:
 
                raise
 

	
 
    def __setitem__(self, key: MetaKey, value: MetaValue) -> None:
 
        if self.post.meta is None:
 
            self.post = self.post._replace(meta={key: value})
 
            self.txn.postings[self.index] = self.post
 
            # mypy complains that self.post.meta could be None, but we know
 
            # from two lines up that it's not.
 
            self.meta = collections.ChainMap(self.post.meta, self.txn.meta)  # type:ignore[arg-type]
 
        else:
 
            super().__setitem__(key, value)
 

	
 
    def __delitem__(self, key: MetaKey) -> None:
 
        if self.post.meta is None:
 
            raise KeyError(key)
 
        else:
 
            super().__delitem__(key)
 

	
 
    # This is arguably cheating a litttle bit, but I'd argue the date of
 
    # the parent transaction still qualifies as posting metadata, and
 
    # it's something we want to access so often it's good to have it
 
    # within easy reach.
 
    @property
 
    def date(self) -> datetime.date:
 
        return self.txn.date
 

	
 

	
 
class Posting(BasePosting):
 
    """Enhanced Posting objects
 

	
 
    This class is a subclass of Beancount's native Posting class where
 
    specific fields are replaced with enhanced versions:
 

	
 
    * The `account` field is an Account object
 
    * The `units` field is our Amount object (which simply declares that the
 
      number is always a Decimal—see that docstring for details)
 
    * The `meta` field is a PostingMeta object
 
    """
 
    __slots__ = ()
 

	
 
    account: Account
 
    units: Amount
 
    cost: Optional[bc_position.Cost]
 
    # mypy correctly complains that our MutableMapping is not compatible
 
    # with Beancount's meta type declaration of Optional[Dict]. IMO
 
    # Beancount's type declaration is a smidge too specific: I think its type
 
    # declaration should also use MutableMapping, because it would be very
 
    # unusual for code to specifically require a Dict over that.
 
    # If it did, this declaration would pass without issue.
 
    meta: PostingMeta  # type:ignore[assignment]
 

	
 
    @classmethod
 
    def from_beancount(cls,
 
                       txn: Transaction,
 
                       index: int,
 
                       post: Optional[BasePosting]=None,
 
    ) -> 'Posting':
 
        if post is None:
 
            post = txn.postings[index]
 
        return cls(
 
            Account(post.account),
 
            *post[1:5],
 
            # see rationale above about Posting.meta
 
            PostingMeta(txn, index, post), # type:ignore[arg-type]
 
        )
 

	
 
    @classmethod
 
    def from_txn(cls, txn: Transaction) -> Iterator['Posting']:
 
        """Yield an enhanced Posting object for every posting in the transaction"""
 
        for index, post in enumerate(txn.postings):
 
            yield cls.from_beancount(txn, index, post)
 

	
 
    @classmethod
 
    def from_entries(cls, entries: Iterable[Directive]) -> Iterator['Posting']:
 
        """Yield an enhanced Posting object for every posting in these entries"""
 
        for entry in entries:
 
            # Because Beancount's own Transaction class isn't type-checkable,
 
            # we can't statically check this. Might as well rely on duck
 
            # typing while we're at it: just try to yield postings from
 
            # everything, and ignore entries that lack a postings attribute.
 
            try:
 
                yield from cls.from_txn(entry)  # type:ignore[arg-type]
 
            except AttributeError:
 
                pass
 

	
 
    def at_cost(self) -> Amount:
 
        if self.cost is None:
 
            return self.units
 
        else:
 
            return Amount(self.units.number * self.cost.number, self.cost.currency)
 

	
 

	
 
_KT = TypeVar('_KT', bound=Hashable)
 
_VT = TypeVar('_VT')
 
class _SizedDict(collections.OrderedDict, MutableMapping[_KT, _VT]):
 
    def __init__(self, maxsize: int=128) -> None:
 
        self.maxsize = maxsize
 
        super().__init__()
 

	
 
    def __setitem__(self, key: _KT, value: _VT) -> None:
 
        super().__setitem__(key, value)
 
        for _ in range(self.maxsize, len(self)):
 
            self.popitem(last=False)
 

	
 

	
 
def balance_of(txn: Transaction,
 
               *preds: Callable[[Account], Optional[bool]],
 
) -> Amount:
 
    """Return the balance of specified postings in a transaction.
 

	
 
    Given a transaction and a series of account predicates, balance_of
 
    returns the balance of the amounts of all postings with accounts that
 
    match any of the predicates.
 

	
 
    balance_of uses the "weight" of each posting, so the return value will
 
    use the currency of the postings' cost when available.
 
    """
 
    match_posts = [post for post in Posting.from_txn(txn)
 
                   if any(pred(post.account) for pred in preds)]
 
    number = decimal.Decimal(0)
 
    if not match_posts:
 
        currency = ''
 
    else:
 
        weights: Sequence[Amount] = [
 
            bc_convert.get_weight(post) for post in match_posts
 
        ]
 
        number = sum((wt.number for wt in weights), number)
 
        currency = weights[0].currency
 
    return Amount(number, currency)
 

	
 
_opening_balance_cache: MutableMapping[str, bool] = _SizedDict()
 
def is_opening_balance_txn(txn: Transaction) -> bool:
 
    key = '\0'.join(
 
        f'{post.account}={post.units}' for post in txn.postings
 
    )
 
    try:
 
        return _opening_balance_cache[key]
 
    except KeyError:
 
        pass
 
    opening_equity = balance_of(txn, Account.is_opening_equity)
 
    if not opening_equity.currency:
 
        retval = False
 
    else:
 
        rest = balance_of(txn, lambda acct: not acct.is_opening_equity())
 
        if not rest.currency:
 
            retval = False
 
        else:
 
            retval = abs(opening_equity.number + rest.number) < decimal.Decimal('.01')
 
    _opening_balance_cache[key] = retval
 
    return retval
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -231,272 +231,260 @@ class MutableBalance(Balance):
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 
    __slots__ = ('_postings',)
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        self._postings: List[data.Posting]
 
        if _can_own and isinstance(source, list):
 
            self._postings = source
 
        else:
 
            self._postings = list(source)
 

	
 
    @classmethod
 
    def _group_by(cls: Type[RelatedType],
 
                  postings: Iterable[data.Posting],
 
                  key: Callable[[data.Posting], T],
 
    ) -> Iterator[Tuple[T, RelatedType]]:
 
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
 
        for post in postings:
 
            mapping[key(post)].append(post)
 
        for value, posts in mapping.items():
 
            yield value, cls(posts, _can_own=True)
 

	
 
    @classmethod
 
    def group_by_meta(cls: Type[RelatedType],
 
                      postings: Iterable[data.Posting],
 
                      key: MetaKey,
 
                      default: Optional[MetaValue]=None,
 
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
 
        """Relate postings by metadata value
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of post.meta.get(key, default).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same metadata value.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.get(key, default)
 
        return cls._group_by(postings, key_func)
 

	
 
    @classmethod
 
    def group_by_first_meta_link(
 
            cls: Type[RelatedType],
 
            postings: Iterable[data.Posting],
 
            key: MetaKey,
 
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
 
        """Relate postings by the first link in metadata
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of
 
        post.meta.first_link(key, None).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same first metadata link.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.first_link(key, None)
 
        return cls._group_by(postings, key_func)
 

	
 
    def __repr__(self) -> str:
 
        return f'<{type(self).__name__} {self._postings!r}>'
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...
 

	
 
    def __getitem__(self: RelatedType,
 
                    index: Union[int, slice],
 
    ) -> Union[data.Posting, RelatedType]:
 
        if isinstance(index, slice):
 
            return type(self)(self._postings[index], _can_own=True)
 
        else:
 
            return self._postings[index]
 

	
 
    def __len__(self) -> int:
 
        return len(self._postings)
 

	
 
    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        for post in self:
 
            try:
 
                yield from post.meta.get_links(key)
 
            except TypeError:
 
                pass
 

	
 
    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        return filters.iter_unique(self._all_meta_links(key))
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...
 

	
 
    def first_meta_links(self,
 
                         key: MetaKey,
 
                         default: Optional[str]='',
 
    ) -> Iterator[Optional[str]]:
 
        retval = filters.iter_unique(
 
            post.meta.first_link(key, default) for post in self
 
        )
 
        if default == '':
 
            retval = (s for s in retval if s)
 
        return retval
 

	
 
    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
 
        balance = MutableBalance()
 
        for post in self:
 
            balance += post.units
 
            yield post, balance
 

	
 
    def balance(self) -> Balance:
 
        for _, balance in self.iter_with_balance():
 
            pass
 
        try:
 
            return balance
 
        except NameError:
 
            return Balance()
 
        return Balance(post.units for post in self)
 

	
 
    def balance_at_cost(self) -> Balance:
 
        balance = MutableBalance()
 
        for post in self:
 
            if post.cost is None:
 
                balance += post.units
 
            else:
 
                number = post.units.number * post.cost.number
 
                balance += data.Amount(number, post.cost.currency)
 
        return balance
 
        return Balance(post.at_cost() for post in self)
 

	
 
    def meta_values(self,
 
                    key: MetaKey,
 
                    default: Optional[MetaValue]=None,
 
    ) -> Set[Optional[MetaValue]]:
 
        return {post.meta.get(key, default) for post in self}
 

	
 

	
 
class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta):
 
    """Abstract base class to help write spreadsheets
 

	
 
    This class provides the very core logic to write an arbitrary set of data
 
    rows to arbitrary output. It calls hooks when it starts writing the
 
    spreadsheet, starts a new "section" of rows, ends a section, and ends the
 
    spreadsheet.
 

	
 
    RT is the type of the input data rows. ST is the type of the section
 
    identifier that you create from each row. If you don't want to use the
 
    section logic at all, set ST to None and define section_key to return None.
 
    """
 

	
 
    @abc.abstractmethod
 
    def section_key(self, row: RT) -> ST:
 
        """Return the section a row belongs to
 

	
 
        Given a data row, this method should return some identifier for the
 
        "section" the row belongs to. The write method uses this to
 
        determine when to call start_section and end_section.
 

	
 
        If your spreadsheet doesn't need sections, define this to return None.
 
        """
 
        ...
 

	
 
    @abc.abstractmethod
 
    def write_row(self, row: RT) -> None:
 
        """Write a data row to the output spreadsheet
 

	
 
        This method is called once for each data row in the input.
 
        """
 
        ...
 

	
 
    # The next four methods are all called by the write method when the name
 
    # says. You may override them to output headers or sums, record
 
    # state, etc. The default implementations are all noops.
 

	
 
    def start_spreadsheet(self) -> None:
 
        pass
 

	
 
    def start_section(self, key: ST) -> None:
 
        pass
 

	
 
    def end_section(self, key: ST) -> None:
 
        pass
 

	
 
    def end_spreadsheet(self) -> None:
 
        pass
 

	
 
    def write(self, rows: Iterable[RT]) -> None:
 
        prev_section: Optional[ST] = None
 
        self.start_spreadsheet()
 
        for row in rows:
 
            section = self.section_key(row)
 
            if section != prev_section:
 
                if prev_section is not None:
 
                    self.end_section(prev_section)
 
                self.start_section(section)
 
                prev_section = section
 
            self.write_row(row)
 
        try:
 
            should_end = section is not None
 
        except NameError:
 
            should_end = False
 
        if should_end:
 
            self.end_section(section)
 
        self.end_spreadsheet()
 

	
 

	
 
class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
 
    """Abstract base class to help write OpenDocument spreadsheets
 

	
 
    This class provides the very core logic to write an arbitrary set of data
 
    rows to an OpenDocument spreadsheet. It provides helper methods for
 
    building sheets, rows, and cells.
 

	
 
    See also the BaseSpreadsheet base class for additional documentation about
 
    methods you must and can define, the definition of RT and ST, etc.
 
    """
 
    def __init__(self, rt_wrapper: Optional[rtutil.RT]=None) -> None:
 
        self.rt_wrapper = rt_wrapper
 
        self.locale = babel.core.Locale.default('LC_MONETARY')
 
        self.currency_fmt_key = 'accounting'
 
        self._name_counter = itertools.count(1)
 
        self._currency_style_cache: MutableMapping[str, odf.style.Style] = {}
 
        self.document = odf.opendocument.OpenDocumentSpreadsheet()
 
        self.init_settings()
 
        self.init_styles()
 
        self.sheet = self.use_sheet("Report")
 

	
 
    ### Low-level document tree manipulation
 
    # The *intent* is that you only need to use these if you're adding new
 
    # methods to manipulate document settings or styles.
 

	
 
    def copy_element(self, elem: odf.element.Element) -> odf.element.Element:
 
        qattrs = dict(self.iter_qattributes(elem))
 
        retval = odf.element.Element(qname=elem.qname, qattributes=qattrs)
 
        try:
 
            orig_name = retval.getAttribute('name')
 
        except ValueError:
 
            orig_name = None
 
        if orig_name is not None:
 
            retval.setAttribute('name', f'{orig_name}{next(self._name_counter)}')
 
        return retval
 

	
 
    def ensure_child(self,
 
                     parent: odf.element.Element,
 
                     child_type: ElementType,
 
                     **kwargs: Any,
 
    ) -> odf.element.Element:
 
        new_child = child_type(**kwargs)
 
        found_child = self.find_child(parent, new_child)
 
        if found_child is None:
 
            parent.addElement(new_child)
 
            return parent.lastChild
 
        else:
 
            return found_child
 

	
 
    def ensure_config_map_entry(self,
 
                                root: odf.element.Element,
tests/test_data_posting.py
Show inline comments
 
"""Test Posting methods"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from beancount.core import data as bc_data
 

	
 
from conservancy_beancount import data
 

	
 
@pytest.fixture
 
def simple_txn(index=None, key=None):
 
    return testutil.Transaction(note='txn note', postings=[
 
        ('Assets:Cash', 5),
 
        ('Income:Donations', -5, {'note': 'donation love', 'extra': 'Extra'}),
 
    ])
 

	
 
def test_from_beancount():
 
    txn = testutil.Transaction(payee='Smith-Dakota', postings=[
 
        ('Income:Donations', -50),
 
        ('Assets:Cash', 50, {'receipt': 'cash-donation.pdf'}),
 
    ])
 
    post = data.Posting.from_beancount(txn, 1)
 
    # We don't just want to assert isinstance(post.attr, data.SomeClass);
 
    # we also want to double-check that attributes were instantiated correctly.
 
    assert post.account.is_under('Assets:Cash')
 
    assert post.meta['receipt'] == 'cash-donation.pdf'
 
    assert post.meta['entity'] == 'Smith-Dakota'
 
    assert post.meta.date == testutil.FY_MID_DATE
 

	
 
def test_setting_metadata_propagates_to_source(simple_txn):
 
    src_post = simple_txn.postings[1]
 
    post = data.Posting.from_beancount(simple_txn, 1)
 
    post.meta['edited'] = 'yes'
 
    assert src_post.meta['edited'] == 'yes'
 
    assert not isinstance(src_post.meta, data.PostingMeta)
 

	
 
def test_deleting_metadata_propagates_to_source(simple_txn):
 
    post = data.Posting.from_beancount(simple_txn, 1)
 
    del post.meta['extra']
 
    assert 'extra' not in simple_txn.postings[1].meta
 

	
 
def test_from_txn(simple_txn):
 
    for source, post in zip(simple_txn.postings, data.Posting.from_txn(simple_txn)):
 
        assert all(source[x] == post[x] for x in range(len(source) - 1))
 
        assert isinstance(post.account, data.Account)
 
        assert post.meta['note']  # Only works with PostingMeta
 

	
 
def test_from_entries_two_txns(simple_txn):
 
    entries = [simple_txn, simple_txn]
 
    sources = [post for txn in entries for post in txn.postings]
 
    for source, post in zip(sources, data.Posting.from_entries(entries)):
 
        assert all(source[x] == post[x] for x in range(len(source) - 1))
 
        assert isinstance(post.account, data.Account)
 
        assert post.meta['note']  # Only works with PostingMeta
 

	
 
def test_from_entries_mix_txns_and_other_directives(simple_txn):
 
    meta = {
 
        'filename': __file__,
 
        'lineno': 75,
 
    }
 
    entries = [
 
        bc_data.Commodity(meta, testutil.FY_START_DATE, 'EUR'),
 
        bc_data.Commodity(meta, testutil.FY_START_DATE, 'USD'),
 
        simple_txn,
 
    ]
 
    for source, post in zip(simple_txn.postings, data.Posting.from_entries(entries)):
 
        assert all(source[x] == post[x] for x in range(len(source) - 1))
 
        assert isinstance(post.account, data.Account)
 
        assert post.meta['note']  # Only works with PostingMeta
 

	
 
@pytest.mark.parametrize('cost_num', [105, 110, 115])
 
def test_at_cost(cost_num):
 
    post = data.Posting(
 
        'Income:Donations',
 
        testutil.Amount(25, 'EUR'),
 
        testutil.Cost(cost_num, 'JPY'),
 
        None,
 
        '*',
 
        None,
 
    )
 
    assert post.at_cost() == testutil.Amount(25 * cost_num, 'JPY')
 

	
 
def test_at_cost_no_cost():
 
    amount = testutil.Amount(25, 'EUR')
 
    post = data.Posting(
 
        'Income:Donations',
 
        amount,
 
        None,
 
        None,
 
        '*',
 
        None,
 
    )
 
    assert post.at_cost() == amount
0 comments (0 inline, 0 general)