Changeset - 52fc0d1b5f93
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-11 18:01:19
reports: Add RelatedPostings.group_by_first_meta_link() method.
3 files changed with 86 insertions and 7 deletions:
0 comments (0 inline, 0 general)
Show inline comments
@@ -665,65 +665,65 @@ metadata to match. A single ticket number is a shortcut for
    args = parser.parse_args(arglist)
    if args.report_type is None and not args.search_terms:
        args.report_type = ReportType.AGING
    return args

def main(arglist: Optional[Sequence[str]]=None,
         stdout: TextIO=sys.stdout,
         stderr: TextIO=sys.stderr,
         config: Optional[configmod.Config]=None,
) -> int:
    if cliutil.is_main_script(PROGNAME):
        global logger
        logger = logging.getLogger(PROGNAME)
        sys.excepthook = cliutil.ExceptHook(logger)
    args = parse_arguments(arglist)
    cliutil.setup_logger(logger, args.loglevel, stderr)
    if config is None:
        config = configmod.Config()

    books_loader = config.books_loader()
    if books_loader is None:
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
    elif args.report_type is ReportType.AGING:
        entries, load_errors, _ = books_loader.load_all()
        entries, load_errors, _ = books_loader.load_all(args.since)

    returncode = 0
    postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
    groups: PostGroups = dict(AccrualPostings.group_by_meta(postings, 'invoice'))
    groups: PostGroups = dict(AccrualPostings.group_by_first_meta_link(postings, 'invoice'))
    for error in load_errors:
        bc_printer.print_error(error, file=stderr)
        returncode |= ReturnFlag.LOAD_ERRORS
    if not groups:
        logger.warning("no matching entries found to report")
        returncode |= ReturnFlag.NOTHING_TO_REPORT

    groups = {
        key: posts
        for source_posts in groups.values()
        for key, posts in source_posts.make_consistent()
    if args.report_type is not ReportType.AGING:
        groups = {
            key: posts for key, posts in groups.items() if not posts.is_paid()
        } or groups

    if args.report_type is None:
        args.report_type = ReportType.default_for(groups)
    report: Optional[BaseReport] = None
    output_path: Optional[Path] = None
    if args.report_type is ReportType.AGING:
        rt_client = config.rt_client()
        if rt_client is None:
            logger.error("unable to generate aging report: RT client is required")
            now =
            if args.output_file is None:
                args.output_file = Path(now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods'))
      "Writing report to %s", args.output_file)
            out_bin = cliutil.bytes_output(args.output_file, stdout)
            report = AgingReport(rt_client, out_bin)
Show inline comments
@@ -17,65 +17,64 @@
import abc
import collections
import datetime
import itertools
import operator
import re

import babel.core  # type:ignore[import]
import babel.numbers  # type:ignore[import]

import odf.config  # type:ignore[import]
import odf.element  # type:ignore[import]
import odf.number  # type:ignore[import]
import odf.opendocument  # type:ignore[import]
import  # type:ignore[import]
import odf.table  # type:ignore[import]
import odf.text  # type:ignore[import]

from decimal import Decimal
from pathlib import Path

from beancount.core import amount as bc_amount

from .. import data
from .. import filters

from typing import (
from ..beancount_types import (

DecimalCompat = data.DecimalCompat
BalanceType = TypeVar('BalanceType', bound='Balance')
ElementType = Callable[..., odf.element.Element]
LinkType = Union[str, Tuple[str, Optional[str]]]
RelatedType = TypeVar('RelatedType', bound='RelatedPostings')
RT = TypeVar('RT', bound=Sequence)
ST = TypeVar('ST')
T = TypeVar('T')

class Balance(Mapping[str, data.Amount]):
    """A collection of amounts mapped by currency

@@ -226,82 +225,109 @@ class MutableBalance(Balance):
    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
        self._add_other(self._currency_map, other)
        return self


class RelatedPostings(Sequence[data.Posting]):
    """Collect and query related postings

    This class provides common functionality for collecting related postings
    and running queries on them: iterating over them, tallying their balance,

    This class doesn't know anything about how the postings are related. That's
    entirely up to the caller.

    A common pattern is to use this class with collections.defaultdict
    to organize postings based on some key. See the group_by_meta classmethod
    for an example.
    __slots__ = ('_postings',)

    def __init__(self,
                 source: Iterable[data.Posting]=(),
                 _can_own: bool=False,
    ) -> None:
        self._postings: List[data.Posting]
        if _can_own and isinstance(source, list):
            self._postings = source
            self._postings = list(source)

    def _group_by(cls: Type[RelatedType],
                  postings: Iterable[data.Posting],
                  key: Callable[[data.Posting], T],
    ) -> Iterator[Tuple[T, RelatedType]]:
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
        for post in postings:
        for value, posts in mapping.items():
            yield value, cls(posts, _can_own=True)

    def group_by_meta(cls: Type[RelatedType],
                      postings: Iterable[data.Posting],
                      key: MetaKey,
                      default: Optional[MetaValue]=None,
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
        """Relate postings by metadata value

        This method takes an iterable of postings and returns a mapping.
        The keys of the mapping are the values of post.meta.get(key, default).
        The values are RelatedPostings instances that contain all the postings
        that had that same metadata value.
        mapping: DefaultDict[Optional[MetaValue], List[data.Posting]] = collections.defaultdict(list)
        for post in postings:
            mapping[post.meta.get(key, default)].append(post)
        for value, posts in mapping.items():
            yield value, cls(posts, _can_own=True)
        def key_func(post: data.Posting) -> Optional[MetaValue]:
            return post.meta.get(key, default)
        return cls._group_by(postings, key_func)

    def group_by_first_meta_link(
            cls: Type[RelatedType],
            postings: Iterable[data.Posting],
            key: MetaKey,
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
        """Relate postings by the first link in metadata

        This method takes an iterable of postings and returns a mapping.
        The keys of the mapping are the values of
        post.meta.first_link(key, None).
        The values are RelatedPostings instances that contain all the postings
        that had that same first metadata link.
        def key_func(post: data.Posting) -> Optional[MetaValue]:
            return post.meta.first_link(key, None)
        return cls._group_by(postings, key_func)

    def __repr__(self) -> str:
        return f'<{type(self).__name__} {self._postings!r}>'

    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...

    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...

    def __getitem__(self: RelatedType,
                    index: Union[int, slice],
    ) -> Union[data.Posting, RelatedType]:
        if isinstance(index, slice):
            return type(self)(self._postings[index], _can_own=True)
            return self._postings[index]

    def __len__(self) -> int:
        return len(self._postings)

    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
        for post in self:
                yield from post.meta.get_links(key)
            except TypeError:

    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
        return filters.iter_unique(self._all_meta_links(key))

Show inline comments
@@ -34,64 +34,85 @@ def accruals_and_payments(acct, src_acct, dst_acct, start_date, *amounts):
        yield testutil.Transaction(date=next(dates), postings=[
            (acct, amt, currency, post_meta),
            (dst_acct if amt < 0 else src_acct, -amt, currency, post_meta),

def credit_card_cycle():
    return list(accruals_and_payments(
, 4, 1),
        (-110, 'USD'),
        (110, 'USD'),
        (-120, 'USD'),
        (120, 'USD'),

def two_accruals_three_payments():
    return list(accruals_and_payments(
, 4, 10),
        (440, 'USD'),
        (-230, 'USD'),
        (550, 'EUR'),
        (-210, 'USD'),
        (-550, 'EUR'),

def link_swap_posts():
    retval = []
    meta = {
        'rt-id': 'rt:12 rt:16',
        '_post_type': data.Posting,
        '_meta_type': data.Metadata,
    for n in range(1, 3):
        n = Decimal(n)
            'Assets:Receivable:Accounts', n * 10, metanum=n, **meta,
    meta['rt-id'] = 'rt:16 rt:12'
    for n in range(1, 3):
        n = Decimal(n)
            'Liabilities:Payable:Accounts', n * -10, metanum=n, **meta,
    return retval

def test_initialize_with_list(credit_card_cycle):
    related = core.RelatedPostings(credit_card_cycle[0].postings)
    assert len(related) == 2

def test_initialize_with_iterable(two_accruals_three_payments):
    related = core.RelatedPostings(
        post for txn in two_accruals_three_payments
        for post in txn.postings
        if post.account == 'Assets:Receivable:Accounts'
    assert len(related) == 5

def test_balance_empty():
    balance = core.RelatedPostings().balance()
    assert not balance
    assert balance.is_zero()

@pytest.mark.parametrize('index,expected', enumerate([
def test_balance_credit_card(credit_card_cycle, index, expected):
    related = core.RelatedPostings(
        txn.postings[0] for txn in credit_card_cycle[:index + 1]
    assert related.balance() == {'USD': testutil.Amount(expected, 'USD')}

def check_iter_with_balance(entries):
    expect_posts = [txn.postings[0] for txn in entries]
    expect_balances = []
@@ -284,32 +305,64 @@ def test_first_meta_links():

def test_first_meta_links_fallback():
    related = core.RelatedPostings(testutil.Posting(
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
    ) for value in ['1 2', testutil.PAST_DATE, '1 3', None, '2 3'])
    del related[-2].meta['contract']
    assert list(related.first_meta_links('contract', None)) == ['1', None, '2']

def test_group_by_meta_zero():
    assert not list(core.RelatedPostings.group_by_meta([], 'metacurrency'))

def test_group_by_meta_one(credit_card_cycle):
    posting = next(post for post in data.Posting.from_entries(credit_card_cycle)
                   if post.account.is_credit_card())
    actual = core.RelatedPostings.group_by_meta([posting], 'metacurrency')
    assert set(key for key, _ in actual) == {'USD'}

def test_group_by_meta_many(two_accruals_three_payments):
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
                if post.account == 'Assets:Receivable:Accounts']
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
    assert set(actual) == {'USD', 'EUR'}
    for key, group in actual.items():
        assert 2 <= len(group) <= 3
        assert group.balance().is_zero()

def test_group_by_meta_many_single_posts(two_accruals_three_payments):
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
                if post.account == 'Assets:Receivable:Accounts']
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
    assert set(actual) == {post.units.number for post in postings}
    assert len(actual) == len(postings)

def test_group_by_first_meta_link_zero():
    assert not list(core.RelatedPostings.group_by_first_meta_link([], 'foo'))

def test_group_by_first_meta_link_no_key(link_swap_posts):
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
        iter(link_swap_posts), 'Nonexistent',
    assert len(actual) == 1
    assert list(actual[None]) == link_swap_posts

def test_group_by_first_meta_link_bad_type(link_swap_posts):
    assert all(post.meta.get('metanum') for post in link_swap_posts), \
        "did not find metadata required by test"
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
        iter(link_swap_posts), 'metanum',
    assert len(actual) == 1
    assert list(actual[None]) == link_swap_posts

def test_group_by_first_meta_link(link_swap_posts):
    actual_all = dict(core.RelatedPostings.group_by_first_meta_link(
        iter(link_swap_posts), 'rt-id',
    assert len(actual_all) == 2
    for key, expect_account in [
            ('rt:12', 'Assets:Receivable:Accounts'),
            ('rt:16', 'Liabilities:Payable:Accounts'),
        actual = actual_all.get(key, '')
        assert len(actual) == 2
        assert all(post.account == expect_account for post in actual)
0 comments (0 inline, 0 general)