Changeset - ccc3a829da9e
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-15 14:38:26
brettcsmith@brettcsmith.org
reports: Add RelatedPostings.balance_at_cost_by_date() method.
2 files changed with 40 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -243,256 +243,264 @@ class RelatedPostings(Sequence[data.Posting]):
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 
    __slots__ = ('_postings',)
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        self._postings: List[data.Posting]
 
        if _can_own and isinstance(source, list):
 
            self._postings = source
 
        else:
 
            self._postings = list(source)
 

	
 
    @classmethod
 
    def _group_by(cls: Type[RelatedType],
 
                  postings: Iterable[data.Posting],
 
                  key: Callable[[data.Posting], T],
 
    ) -> Iterator[Tuple[T, RelatedType]]:
 
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
 
        for post in postings:
 
            mapping[key(post)].append(post)
 
        for value, posts in mapping.items():
 
            yield value, cls(posts, _can_own=True)
 

	
 
    @classmethod
 
    def group_by_meta(cls: Type[RelatedType],
 
                      postings: Iterable[data.Posting],
 
                      key: MetaKey,
 
                      default: Optional[MetaValue]=None,
 
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
 
        """Relate postings by metadata value
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of post.meta.get(key, default).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same metadata value.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.get(key, default)
 
        return cls._group_by(postings, key_func)
 

	
 
    @classmethod
 
    def group_by_first_meta_link(
 
            cls: Type[RelatedType],
 
            postings: Iterable[data.Posting],
 
            key: MetaKey,
 
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
 
        """Relate postings by the first link in metadata
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of
 
        post.meta.first_link(key, None).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same first metadata link.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.first_link(key, None)
 
        return cls._group_by(postings, key_func)
 

	
 
    def __repr__(self) -> str:
 
        return f'<{type(self).__name__} {self._postings!r}>'
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...
 

	
 
    def __getitem__(self: RelatedType,
 
                    index: Union[int, slice],
 
    ) -> Union[data.Posting, RelatedType]:
 
        if isinstance(index, slice):
 
            return type(self)(self._postings[index], _can_own=True)
 
        else:
 
            return self._postings[index]
 

	
 
    def __len__(self) -> int:
 
        return len(self._postings)
 

	
 
    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        for post in self:
 
            try:
 
                yield from post.meta.get_links(key)
 
            except TypeError:
 
                pass
 

	
 
    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        return filters.iter_unique(self._all_meta_links(key))
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: str='') -> Iterator[str]: ...
 

	
 
    @overload
 
    def first_meta_links(self, key: MetaKey, default: None) -> Iterator[Optional[str]]: ...
 

	
 
    def first_meta_links(self,
 
                         key: MetaKey,
 
                         default: Optional[str]='',
 
    ) -> Iterator[Optional[str]]:
 
        retval = filters.iter_unique(
 
            post.meta.first_link(key, default) for post in self
 
        )
 
        if default == '':
 
            retval = (s for s in retval if s)
 
        return retval
 

	
 
    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
 
        balance = MutableBalance()
 
        for post in self:
 
            balance += post.units
 
            yield post, balance
 

	
 
    def balance(self) -> Balance:
 
        return Balance(post.units for post in self)
 

	
 
    def balance_at_cost(self) -> Balance:
 
        return Balance(post.at_cost() for post in self)
 

	
 
    def balance_at_cost_by_date(self, date: datetime.date) -> Balance:
 
        for index, post in enumerate(self):
 
            if post.meta.date >= date:
 
                break
 
        else:
 
            index += 1
 
        return Balance(post.at_cost() for post in self._postings[:index])
 

	
 
    def meta_values(self,
 
                    key: MetaKey,
 
                    default: Optional[MetaValue]=None,
 
    ) -> Set[Optional[MetaValue]]:
 
        return {post.meta.get(key, default) for post in self}
 

	
 

	
 
class BaseSpreadsheet(Generic[RT, ST], metaclass=abc.ABCMeta):
 
    """Abstract base class to help write spreadsheets
 

	
 
    This class provides the very core logic to write an arbitrary set of data
 
    rows to arbitrary output. It calls hooks when it starts writing the
 
    spreadsheet, starts a new "section" of rows, ends a section, and ends the
 
    spreadsheet.
 

	
 
    RT is the type of the input data rows. ST is the type of the section
 
    identifier that you create from each row. If you don't want to use the
 
    section logic at all, set ST to None and define section_key to return None.
 
    """
 

	
 
    @abc.abstractmethod
 
    def section_key(self, row: RT) -> ST:
 
        """Return the section a row belongs to
 

	
 
        Given a data row, this method should return some identifier for the
 
        "section" the row belongs to. The write method uses this to
 
        determine when to call start_section and end_section.
 

	
 
        If your spreadsheet doesn't need sections, define this to return None.
 
        """
 
        ...
 

	
 
    @abc.abstractmethod
 
    def write_row(self, row: RT) -> None:
 
        """Write a data row to the output spreadsheet
 

	
 
        This method is called once for each data row in the input.
 
        """
 
        ...
 

	
 
    # The next four methods are all called by the write method when the name
 
    # says. You may override them to output headers or sums, record
 
    # state, etc. The default implementations are all noops.
 

	
 
    def start_spreadsheet(self) -> None:
 
        pass
 

	
 
    def start_section(self, key: ST) -> None:
 
        pass
 

	
 
    def end_section(self, key: ST) -> None:
 
        pass
 

	
 
    def end_spreadsheet(self) -> None:
 
        pass
 

	
 
    def write(self, rows: Iterable[RT]) -> None:
 
        prev_section: Optional[ST] = None
 
        self.start_spreadsheet()
 
        for row in rows:
 
            section = self.section_key(row)
 
            if section != prev_section:
 
                if prev_section is not None:
 
                    self.end_section(prev_section)
 
                self.start_section(section)
 
                prev_section = section
 
            self.write_row(row)
 
        try:
 
            should_end = section is not None
 
        except NameError:
 
            should_end = False
 
        if should_end:
 
            self.end_section(section)
 
        self.end_spreadsheet()
 

	
 

	
 
class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
 
    """Abstract base class to help write OpenDocument spreadsheets
 

	
 
    This class provides the very core logic to write an arbitrary set of data
 
    rows to an OpenDocument spreadsheet. It provides helper methods for
 
    building sheets, rows, and cells.
 

	
 
    See also the BaseSpreadsheet base class for additional documentation about
 
    methods you must and can define, the definition of RT and ST, etc.
 
    """
 
    def __init__(self, rt_wrapper: Optional[rtutil.RT]=None) -> None:
 
        self.rt_wrapper = rt_wrapper
 
        self.locale = babel.core.Locale.default('LC_MONETARY')
 
        self.currency_fmt_key = 'accounting'
 
        self._name_counter = itertools.count(1)
 
        self._currency_style_cache: MutableMapping[str, odf.style.Style] = {}
 
        self.document = odf.opendocument.OpenDocumentSpreadsheet()
 
        self.init_settings()
 
        self.init_styles()
 
        self.sheet = self.use_sheet("Report")
 

	
 
    ### Low-level document tree manipulation
 
    # The *intent* is that you only need to use these if you're adding new
 
    # methods to manipulate document settings or styles.
 

	
 
    def copy_element(self, elem: odf.element.Element) -> odf.element.Element:
 
        qattrs = dict(self.iter_qattributes(elem))
 
        retval = odf.element.Element(qname=elem.qname, qattributes=qattrs)
 
        try:
 
            orig_name = retval.getAttribute('name')
 
        except ValueError:
 
            orig_name = None
 
        if orig_name is not None:
 
            retval.setAttribute('name', f'{orig_name}{next(self._name_counter)}')
 
        return retval
 

	
 
    def ensure_child(self,
 
                     parent: odf.element.Element,
 
                     child_type: ElementType,
 
                     **kwargs: Any,
 
    ) -> odf.element.Element:
 
        new_child = child_type(**kwargs)
 
        found_child = self.find_child(parent, new_child)
 
        if found_child is None:
 
            parent.addElement(new_child)
 
            return parent.lastChild
 
        else:
 
            return found_child
 

	
 
    def ensure_config_map_entry(self,
 
                                root: odf.element.Element,
 
                                map_name: str,
tests/test_reports_related_postings.py
Show inline comments
...
 
@@ -72,256 +72,288 @@ def link_swap_posts():
 
        '_meta_type': data.Metadata,
 
    }
 
    for n in range(1, 3):
 
        n = Decimal(n)
 
        retval.append(testutil.Posting(
 
            'Assets:Receivable:Accounts', n * 10, metanum=n, **meta,
 
        ))
 
    meta['rt-id'] = 'rt:16 rt:12'
 
    for n in range(1, 3):
 
        n = Decimal(n)
 
        retval.append(testutil.Posting(
 
            'Liabilities:Payable:Accounts', n * -10, metanum=n, **meta,
 
        ))
 
    return retval
 

	
 
def test_initialize_with_list(credit_card_cycle):
 
    related = core.RelatedPostings(credit_card_cycle[0].postings)
 
    assert len(related) == 2
 

	
 
def test_initialize_with_iterable(two_accruals_three_payments):
 
    related = core.RelatedPostings(
 
        post for txn in two_accruals_three_payments
 
        for post in txn.postings
 
        if post.account == 'Assets:Receivable:Accounts'
 
    )
 
    assert len(related) == 5
 

	
 
def test_balance_empty():
 
    balance = core.RelatedPostings().balance()
 
    assert not balance
 
    assert balance.is_zero()
 

	
 
@pytest.mark.parametrize('index,expected', enumerate([
 
    -110,
 
    0,
 
    -120,
 
    0,
 
]))
 
def test_balance_credit_card(credit_card_cycle, index, expected):
 
    related = core.RelatedPostings(
 
        txn.postings[0] for txn in credit_card_cycle[:index + 1]
 
    )
 
    assert related.balance() == {'USD': testutil.Amount(expected, 'USD')}
 

	
 
def check_iter_with_balance(entries):
 
    expect_posts = [txn.postings[0] for txn in entries]
 
    expect_balances = []
 
    balance_tally = collections.defaultdict(Decimal)
 
    for post in expect_posts:
 
        number, currency = post.units
 
        balance_tally[currency] += number
 
        expect_balances.append({code: testutil.Amount(number, code)
 
                                for code, number in balance_tally.items()})
 
    related = core.RelatedPostings(expect_posts)
 
    for (post, balance), exp_post, exp_balance in zip(
 
            related.iter_with_balance(),
 
            expect_posts,
 
            expect_balances,
 
    ):
 
        assert post is exp_post
 
        assert balance == exp_balance
 
    assert post is expect_posts[-1]
 
    assert related.balance() == expect_balances[-1]
 

	
 
def test_iter_with_balance_empty():
 
    assert not list(core.RelatedPostings().iter_with_balance())
 

	
 
def test_iter_with_balance_credit_card(credit_card_cycle):
 
    check_iter_with_balance(credit_card_cycle)
 

	
 
def test_iter_with_balance_two_acccruals(two_accruals_three_payments):
 
    check_iter_with_balance(two_accruals_three_payments)
 

	
 
def test_balance_at_cost_mixed():
 
    txn = testutil.Transaction(postings=[
 
        ('Expenses:Other', '22'),
 
        ('Expenses:Other', '30', 'EUR', ('1.1',)),
 
        ('Expenses:Other', '40', 'EUR'),
 
        ('Expenses:Other', '50', 'USD', ('1.1', 'EUR')),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    amounts = set(balance.values())
 
    assert amounts == {testutil.Amount(55, 'USD'), testutil.Amount(95, 'EUR')}
 

	
 
def test_balance_at_single_currency_cost():
 
    txn = testutil.Transaction(postings=[
 
        ('Expenses:Other', '22'),
 
        ('Expenses:Other', '30', 'EUR', ('1.1',)),
 
        ('Expenses:Other', '40', 'GBP', ('1.1',)),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    amounts = set(balance.values())
 
    assert amounts == {testutil.Amount(99)}
 

	
 
def test_balance_at_cost_zeroed_out():
 
    txn = testutil.Transaction(postings=[
 
        ('Income:Other', '-22'),
 
        ('Assets:Receivable:Accounts', '20', 'EUR', ('1.1',)),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    assert balance.is_zero()
 

	
 
def test_balance_at_cost_singleton():
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Receivable:Accounts', '20', 'EUR', ('1.1',)),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    amounts = set(balance.values())
 
    assert amounts == {testutil.Amount(22)}
 

	
 
def test_balance_at_cost_singleton_without_cost():
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Receivable:Accounts', '20'),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    amounts = set(balance.values())
 
    assert amounts == {testutil.Amount(20)}
 

	
 
def test_balance_at_cost_empty():
 
    related = core.RelatedPostings()
 
    balance = related.balance_at_cost()
 
    assert balance.is_zero()
 

	
 
@pytest.mark.parametrize('date,expected', [
 
    (testutil.FY_MID_DATE - datetime.timedelta(days=1), 0),
 
    (testutil.FY_MID_DATE, 0),
 
    (testutil.FY_MID_DATE + datetime.timedelta(days=1), 25),
 
    (testutil.FY_MID_DATE + datetime.timedelta(days=2), 70),
 
    (testutil.FY_MID_DATE + datetime.timedelta(days=3), 135),
 
    (testutil.FY_MID_DATE + datetime.timedelta(days=4), 135),
 
])
 
def test_balance_at_cost_by_date(date, expected):
 
    dates = testutil.date_seq()
 
    jpy_cost = ('0.01', 'USD')
 
    entries = [
 
        testutil.Transaction(date=next(dates), postings=[
 
            ('Assets:Cash', 1000, 'JPY', jpy_cost),
 
            ('Assets:Cash', 15),
 
        ]),
 
        testutil.Transaction(date=next(dates), postings=[
 
            ('Assets:Cash', 2000, 'JPY', jpy_cost),
 
            ('Assets:Cash', 25),
 
        ]),
 
        testutil.Transaction(date=next(dates), postings=[
 
            ('Assets:Cash', 3000, 'JPY', jpy_cost),
 
            ('Assets:Cash', 35),
 
        ]),
 
    ]
 
    related = core.RelatedPostings(data.Posting.from_entries(entries))
 
    actual = related.balance_at_cost_by_date(date)
 
    if not expected:
 
        assert actual.is_zero()
 
    else:
 
        assert actual == {'USD': testutil.Amount(expected)}
 

	
 
def test_meta_values_empty():
 
    related = core.RelatedPostings()
 
    assert related.meta_values('key') == set()
 

	
 
def test_meta_values_no_match():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, metakey='metavalue'),
 
    ])
 
    assert related.meta_values('key') == {None}
 

	
 
def test_meta_values_no_match_default_given():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, metakey='metavalue'),
 
    ])
 
    assert related.meta_values('key', '') == {''}
 

	
 
def test_meta_values_one_match():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='metavalue'),
 
    ])
 
    assert related.meta_values('key') == {'metavalue'}
 

	
 
def test_meta_values_some_match():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, metakey='2'),
 
    ])
 
    assert related.meta_values('key') == {'1', None}
 

	
 
def test_meta_values_some_match_default_given():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, metakey='2'),
 
    ])
 
    assert related.meta_values('key', '') == {'1', ''}
 

	
 
def test_meta_values_all_match():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, key='2'),
 
    ])
 
    assert related.meta_values('key') == {'1', '2'}
 

	
 
def test_meta_values_all_match_one_value():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, key='1'),
 
    ])
 
    assert related.meta_values('key') == {'1'}
 

	
 
def test_meta_values_all_match_default_given():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, key='2'),
 
    ])
 
    assert related.meta_values('key', '') == {'1', '2'}
 

	
 
def test_meta_values_many_types():
 
    expected = {
 
        datetime.date(2020, 4, 1),
 
        Decimal(42),
 
        testutil.Amount(5),
 
        'rt:42',
 
    }
 
    related = core.RelatedPostings(
 
        testutil.Posting('Income:Donations', -index, key=value)
 
        for index, value in enumerate(expected)
 
    )
 
    assert related.meta_values('key') == expected
 

	
 
@pytest.mark.parametrize('count', range(3))
 
def test_all_meta_links_zero(count):
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -n, testkey=str(n), _meta_type=data.Metadata,
 
    ) for n in range(count))
 
    assert next(related.all_meta_links('approval'), None) is None
 

	
 
def test_all_meta_links_singletons():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, statement=value, _meta_type=data.Metadata,
 
    ) for value in itertools.chain(
 
        testutil.NON_LINK_METADATA_STRINGS,
 
        testutil.LINK_METADATA_STRINGS,
 
        testutil.NON_STRING_METADATA_VALUES,
 
    ))
 
    assert set(related.all_meta_links('statement')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_multiples():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=' '.join(value), _meta_type=data.Metadata,
 
    ) for value in itertools.permutations(testutil.LINK_METADATA_STRINGS, 2))
 
    assert set(related.all_meta_links('approval')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_preserves_order():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=c, _meta_type=data.Metadata,
 
    ) for c in '121323')
 
    assert list(related.all_meta_links('approval')) == list('123')
 

	
 
def test_first_meta_links():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
 
    ) for value in ['1 2', '', '1 3', testutil.PAST_DATE, '2 3', None])
 
    del related[-1].meta['contract']
 
    assert list(related.first_meta_links('contract')) == list('12')
 

	
 
def test_first_meta_links_fallback():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
 
    ) for value in ['1 2', testutil.PAST_DATE, '1 3', None, '2 3'])
 
    del related[-2].meta['contract']
 
    assert list(related.first_meta_links('contract', None)) == ['1', None, '2']
 

	
 
def test_group_by_meta_zero():
 
    assert not list(core.RelatedPostings.group_by_meta([], 'metacurrency'))
 

	
 
def test_group_by_meta_one(credit_card_cycle):
 
    posting = next(post for post in data.Posting.from_entries(credit_card_cycle)
 
                   if post.account.is_credit_card())
 
    actual = core.RelatedPostings.group_by_meta([posting], 'metacurrency')
 
    assert set(key for key, _ in actual) == {'USD'}
 

	
 
def test_group_by_meta_many(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
 
    assert set(actual) == {'USD', 'EUR'}
 
    for key, group in actual.items():
0 comments (0 inline, 0 general)