Changeset - f21ac740f24c
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-15 14:16:34
brettcsmith@brettcsmith.org
data: Add Posting.at_cost() method.
3 files changed with 32 insertions and 14 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/data.py
Show inline comments
...
 
@@ -347,123 +347,129 @@ class PostingMeta(Metadata):
 
    # within easy reach.
 
    @property
 
    def date(self) -> datetime.date:
 
        return self.txn.date
 

	
 

	
 
class Posting(BasePosting):
 
    """Enhanced Posting objects
 

	
 
    This class is a subclass of Beancount's native Posting class where
 
    specific fields are replaced with enhanced versions:
 

	
 
    * The `account` field is an Account object
 
    * The `units` field is our Amount object (which simply declares that the
 
      number is always a Decimal—see that docstring for details)
 
    * The `meta` field is a PostingMeta object
 
    """
 
    __slots__ = ()
 

	
 
    account: Account
 
    units: Amount
 
    cost: Optional[bc_position.Cost]
 
    # mypy correctly complains that our MutableMapping is not compatible
 
    # with Beancount's meta type declaration of Optional[Dict]. IMO
 
    # Beancount's type declaration is a smidge too specific: I think its type
 
    # declaration should also use MutableMapping, because it would be very
 
    # unusual for code to specifically require a Dict over that.
 
    # If it did, this declaration would pass without issue.
 
    meta: PostingMeta  # type:ignore[assignment]
 

	
 
    @classmethod
 
    def from_beancount(cls,
 
                       txn: Transaction,
 
                       index: int,
 
                       post: Optional[BasePosting]=None,
 
    ) -> 'Posting':
 
        if post is None:
 
            post = txn.postings[index]
 
        return cls(
 
            Account(post.account),
 
            *post[1:5],
 
            # see rationale above about Posting.meta
 
            PostingMeta(txn, index, post), # type:ignore[arg-type]
 
        )
 

	
 
    @classmethod
 
    def from_txn(cls, txn: Transaction) -> Iterator['Posting']:
 
        """Yield an enhanced Posting object for every posting in the transaction"""
 
        for index, post in enumerate(txn.postings):
 
            yield cls.from_beancount(txn, index, post)
 

	
 
    @classmethod
 
    def from_entries(cls, entries: Iterable[Directive]) -> Iterator['Posting']:
 
        """Yield an enhanced Posting object for every posting in these entries"""
 
        for entry in entries:
 
            # Because Beancount's own Transaction class isn't type-checkable,
 
            # we can't statically check this. Might as well rely on duck
 
            # typing while we're at it: just try to yield postings from
 
            # everything, and ignore entries that lack a postings attribute.
 
            try:
 
                yield from cls.from_txn(entry)  # type:ignore[arg-type]
 
            except AttributeError:
 
                pass
 

	
 
    def at_cost(self) -> Amount:
 
        if self.cost is None:
 
            return self.units
 
        else:
 
            return Amount(self.units.number * self.cost.number, self.cost.currency)
 

	
 

	
 
_KT = TypeVar('_KT', bound=Hashable)
 
_VT = TypeVar('_VT')
 
class _SizedDict(collections.OrderedDict, MutableMapping[_KT, _VT]):
 
    def __init__(self, maxsize: int=128) -> None:
 
        self.maxsize = maxsize
 
        super().__init__()
 

	
 
    def __setitem__(self, key: _KT, value: _VT) -> None:
 
        super().__setitem__(key, value)
 
        for _ in range(self.maxsize, len(self)):
 
            self.popitem(last=False)
 

	
 

	
 
def balance_of(txn: Transaction,
 
               *preds: Callable[[Account], Optional[bool]],
 
) -> Amount:
 
    """Return the balance of specified postings in a transaction.
 

	
 
    Given a transaction and a series of account predicates, balance_of
 
    returns the balance of the amounts of all postings with accounts that
 
    match any of the predicates.
 

	
 
    balance_of uses the "weight" of each posting, so the return value will
 
    use the currency of the postings' cost when available.
 
    """
 
    match_posts = [post for post in Posting.from_txn(txn)
 
                   if any(pred(post.account) for pred in preds)]
 
    number = decimal.Decimal(0)
 
    if not match_posts:
 
        currency = ''
 
    else:
 
        weights: Sequence[Amount] = [
 
            bc_convert.get_weight(post) for post in match_posts
 
        ]
 
        number = sum((wt.number for wt in weights), number)
 
        currency = weights[0].currency
 
    return Amount(number, currency)
 

	
 
_opening_balance_cache: MutableMapping[str, bool] = _SizedDict()
 
def is_opening_balance_txn(txn: Transaction) -> bool:
 
    key = '\0'.join(
 
        f'{post.account}={post.units}' for post in txn.postings
 
    )
 
    try:
 
        return _opening_balance_cache[key]
 
    except KeyError:
 
        pass
 
    opening_equity = balance_of(txn, Account.is_opening_equity)
 
    if not opening_equity.currency:
 
        retval = False
 
    else:
 
        rest = balance_of(txn, lambda acct: not acct.is_opening_equity())
 
        if not rest.currency:
 
            retval = False
 
        else:
 
            retval = abs(opening_equity.number + rest.number) < decimal.Decimal('.01')
 
    _opening_balance_cache[key] = retval
 
    return retval
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -295,144 +295,132 @@ class RelatedPostings(Sequence[data.Posting]):
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of
 
        post.meta.first_link(key, None).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same first metadata link.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.first_link(key, None)
 
        return cls._group_by(postings, key_func)
 

	
 
    def __repr__(self) -> str:
 
        return f'<{type(self).__name__} {self._postings!r}>'
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...
 

	
 
    def __getitem__(self: RelatedType,
 
                    index: Union[int, slice],
 
    ) -> Union[data.Posting, RelatedType]:
 
        if isinstance(index, slice):
 
            return type(self)(self._postings[index], _can_own=True)
 
        else:
 
            return self._postings[index]
 

	
 
    def __len__(self) -> int:
 
        return len(self._postings)
 

	
 
    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        for post in self:
 
            try:
 
                yield from post.meta.get_links(key)
 
            except TypeError:
 
                pass
 

	
 
    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        return filters.iter_unique(self._all_meta_links(key))
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...
 

	
 
    def first_meta_links(self,
 
                         key: MetaKey,
 
                         default: Optional[str]='',
 
    ) -> Iterator[Optional[str]]:
 
        retval = filters.iter_unique(
 
            post.meta.first_link(key, default) for post in self
 
        )
 
        if default == '':
 
            retval = (s for s in retval if s)
 
        return retval
 

	
 
    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
 
        balance = MutableBalance()
 
        for post in self:
 
            balance += post.units
 
            yield post, balance
 

	
 
    def balance(self) -> Balance:
 
        for _, balance in self.iter_with_balance():
 
            pass
 
        try:
 
            return balance
 
        except NameError:
 
            return Balance()
 
        return Balance(post.units for post in self)
 

	
 
    def balance_at_cost(self) -> Balance:
 
        balance = MutableBalance()
 
        for post in self:
 
            if post.cost is None:
 
                balance += post.units
 
            else:
 
                number = post.units.number * post.cost.number
 
                balance += data.Amount(number, post.cost.currency)
 
        return balance
 
        return Balance(post.at_cost() for post in self)
 

	
 
    def meta_values(self,
 
                    key: MetaKey,
 
                    default: Optional[MetaValue]=None,
 
    ) -> Set[Optional[MetaValue]]:
 
        return {post.meta.get(key, default) for post in self}
 

	
 

	
 
class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta):
 
    """Abstract base class to help write spreadsheets
 

	
 
    This class provides the very core logic to write an arbitrary set of data
 
    rows to arbitrary output. It calls hooks when it starts writing the
 
    spreadsheet, starts a new "section" of rows, ends a section, and ends the
 
    spreadsheet.
 

	
 
    RT is the type of the input data rows. ST is the type of the section
 
    identifier that you create from each row. If you don't want to use the
 
    section logic at all, set ST to None and define section_key to return None.
 
    """
 

	
 
    @abc.abstractmethod
 
    def section_key(self, row: RT) -> ST:
 
        """Return the section a row belongs to
 

	
 
        Given a data row, this method should return some identifier for the
 
        "section" the row belongs to. The write method uses this to
 
        determine when to call start_section and end_section.
 

	
 
        If your spreadsheet doesn't need sections, define this to return None.
 
        """
 
        ...
 

	
 
    @abc.abstractmethod
 
    def write_row(self, row: RT) -> None:
 
        """Write a data row to the output spreadsheet
 

	
 
        This method is called once for each data row in the input.
 
        """
 
        ...
 

	
 
    # The next four methods are all called by the write method when the name
 
    # says. You may override them to output headers or sums, record
 
    # state, etc. The default implementations are all noops.
 

	
 
    def start_spreadsheet(self) -> None:
 
        pass
 

	
 
    def start_section(self, key: ST) -> None:
 
        pass
 

	
 
    def end_section(self, key: ST) -> None:
 
        pass
 

	
 
    def end_spreadsheet(self) -> None:
 
        pass
 

	
 
    def write(self, rows: Iterable[RT]) -> None:
 
        prev_section: Optional[ST] = None
 
        self.start_spreadsheet()
 
        for row in rows:
 
            section = self.section_key(row)
 
            if section != prev_section:
 
                if prev_section is not None:
tests/test_data_posting.py
Show inline comments
...
 
@@ -21,64 +21,88 @@ from . import testutil
 
from beancount.core import data as bc_data
 

	
 
from conservancy_beancount import data
 

	
 
@pytest.fixture
 
def simple_txn(index=None, key=None):
 
    return testutil.Transaction(note='txn note', postings=[
 
        ('Assets:Cash', 5),
 
        ('Income:Donations', -5, {'note': 'donation love', 'extra': 'Extra'}),
 
    ])
 

	
 
def test_from_beancount():
 
    txn = testutil.Transaction(payee='Smith-Dakota', postings=[
 
        ('Income:Donations', -50),
 
        ('Assets:Cash', 50, {'receipt': 'cash-donation.pdf'}),
 
    ])
 
    post = data.Posting.from_beancount(txn, 1)
 
    # We don't just want to assert isinstance(post.attr, data.SomeClass);
 
    # we also want to double-check that attributes were instantiated correctly.
 
    assert post.account.is_under('Assets:Cash')
 
    assert post.meta['receipt'] == 'cash-donation.pdf'
 
    assert post.meta['entity'] == 'Smith-Dakota'
 
    assert post.meta.date == testutil.FY_MID_DATE
 

	
 
def test_setting_metadata_propagates_to_source(simple_txn):
 
    src_post = simple_txn.postings[1]
 
    post = data.Posting.from_beancount(simple_txn, 1)
 
    post.meta['edited'] = 'yes'
 
    assert src_post.meta['edited'] == 'yes'
 
    assert not isinstance(src_post.meta, data.PostingMeta)
 

	
 
def test_deleting_metadata_propagates_to_source(simple_txn):
 
    post = data.Posting.from_beancount(simple_txn, 1)
 
    del post.meta['extra']
 
    assert 'extra' not in simple_txn.postings[1].meta
 

	
 
def test_from_txn(simple_txn):
 
    for source, post in zip(simple_txn.postings, data.Posting.from_txn(simple_txn)):
 
        assert all(source[x] == post[x] for x in range(len(source) - 1))
 
        assert isinstance(post.account, data.Account)
 
        assert post.meta['note']  # Only works with PostingMeta
 

	
 
def test_from_entries_two_txns(simple_txn):
 
    entries = [simple_txn, simple_txn]
 
    sources = [post for txn in entries for post in txn.postings]
 
    for source, post in zip(sources, data.Posting.from_entries(entries)):
 
        assert all(source[x] == post[x] for x in range(len(source) - 1))
 
        assert isinstance(post.account, data.Account)
 
        assert post.meta['note']  # Only works with PostingMeta
 

	
 
def test_from_entries_mix_txns_and_other_directives(simple_txn):
 
    meta = {
 
        'filename': __file__,
 
        'lineno': 75,
 
    }
 
    entries = [
 
        bc_data.Commodity(meta, testutil.FY_START_DATE, 'EUR'),
 
        bc_data.Commodity(meta, testutil.FY_START_DATE, 'USD'),
 
        simple_txn,
 
    ]
 
    for source, post in zip(simple_txn.postings, data.Posting.from_entries(entries)):
 
        assert all(source[x] == post[x] for x in range(len(source) - 1))
 
        assert isinstance(post.account, data.Account)
 
        assert post.meta['note']  # Only works with PostingMeta
 

	
 
@pytest.mark.parametrize('cost_num', [105, 110, 115])
 
def test_at_cost(cost_num):
 
    post = data.Posting(
 
        'Income:Donations',
 
        testutil.Amount(25, 'EUR'),
 
        testutil.Cost(cost_num, 'JPY'),
 
        None,
 
        '*',
 
        None,
 
    )
 
    assert post.at_cost() == testutil.Amount(25 * cost_num, 'JPY')
 

	
 
def test_at_cost_no_cost():
 
    amount = testutil.Amount(25, 'EUR')
 
    post = data.Posting(
 
        'Income:Donations',
 
        amount,
 
        None,
 
        None,
 
        '*',
 
        None,
 
    )
 
    assert post.at_cost() == amount
0 comments (0 inline, 0 general)