Changeset - 52fc0d1b5f93
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-11 18:01:19
brettcsmith@brettcsmith.org
reports: Add RelatedPostings.group_by_first_meta_link() method.
3 files changed with 86 insertions and 7 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -665,65 +665,65 @@ metadata to match. A single ticket number is a shortcut for
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None and not args.search_terms:
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    if cliutil.is_main_script(PROGNAME):
 
        global logger
 
        logger = logging.getLogger(PROGNAME)
 
        sys.excepthook = cliutil.ExceptHook(logger)
 
    args = parse_arguments(arglist)
 
    cliutil.setup_logger(logger, args.loglevel, stderr)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    elif args.report_type is ReportType.AGING:
 
        entries, load_errors, _ = books_loader.load_all()
 
    else:
 
        entries, load_errors, _ = books_loader.load_all(args.since)
 
    filters.remove_opening_balance_txn(entries)
 

	
 
    returncode = 0
 
    postings = filter_search(data.Posting.from_entries(entries), args.search_terms)
 
    groups: PostGroups = dict(AccrualPostings.group_by_meta(postings, 'invoice'))
 
    groups: PostGroups = dict(AccrualPostings.group_by_first_meta_link(postings, 'invoice'))
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
    if not groups:
 
        logger.warning("no matching entries found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 

	
 
    groups = {
 
        key: posts
 
        for source_posts in groups.values()
 
        for key, posts in source_posts.make_consistent()
 
    }
 
    if args.report_type is not ReportType.AGING:
 
        groups = {
 
            key: posts for key, posts in groups.items() if not posts.is_paid()
 
        } or groups
 

	
 
    if args.report_type is None:
 
        args.report_type = ReportType.default_for(groups)
 
    report: Optional[BaseReport] = None
 
    output_path: Optional[Path] = None
 
    if args.report_type is ReportType.AGING:
 
        rt_client = config.rt_client()
 
        if rt_client is None:
 
            logger.error("unable to generate aging report: RT client is required")
 
        else:
 
            now = datetime.datetime.now()
 
            if args.output_file is None:
 
                args.output_file = Path(now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods'))
 
                logger.info("Writing report to %s", args.output_file)
 
            out_bin = cliutil.bytes_output(args.output_file, stdout)
 
            report = AgingReport(rt_client, out_bin)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -17,65 +17,64 @@
 
import abc
 
import collections
 
import datetime
 
import itertools
 
import operator
 
import re
 

	
 
import babel.core  # type:ignore[import]
 
import babel.numbers  # type:ignore[import]
 

	
 
import odf.config  # type:ignore[import]
 
import odf.element  # type:ignore[import]
 
import odf.number  # type:ignore[import]
 
import odf.opendocument  # type:ignore[import]
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 
import odf.text  # type:ignore[import]
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
from beancount.core import amount as bc_amount
 

	
 
from .. import data
 
from .. import filters
 

	
 
from typing import (
 
    cast,
 
    overload,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    DefaultDict,
 
    Dict,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    MutableMapping,
 
    Optional,
 
    Sequence,
 
    Set,
 
    Tuple,
 
    Type,
 
    TypeVar,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaKey,
 
    MetaValue,
 
)
 

	
 
DecimalCompat = data.DecimalCompat
 
BalanceType = TypeVar('BalanceType', bound='Balance')
 
ElementType = Callable[..., odf.element.Element]
 
LinkType = Union[str, Tuple[str, Optional[str]]]
 
RelatedType = TypeVar('RelatedType', bound='RelatedPostings')
 
RT = TypeVar('RT', bound=Sequence)
 
ST = TypeVar('ST')
 
T = TypeVar('T')
 

	
 
class Balance(Mapping[str, data.Amount]):
 
    """A collection of amounts mapped by currency
 

	
...
 
@@ -226,82 +225,109 @@ class MutableBalance(Balance):
 
    def __iadd__(self: BalanceType, other: Union[data.Amount, Balance]) -> BalanceType:
 
        self._add_other(self._currency_map, other)
 
        return self
 

	
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 
    __slots__ = ('_postings',)
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Posting]=(),
 
                 *,
 
                 _can_own: bool=False,
 
    ) -> None:
 
        self._postings: List[data.Posting]
 
        if _can_own and isinstance(source, list):
 
            self._postings = source
 
        else:
 
            self._postings = list(source)
 

	
 
    @classmethod
 
    def _group_by(cls: Type[RelatedType],
 
                  postings: Iterable[data.Posting],
 
                  key: Callable[[data.Posting], T],
 
    ) -> Iterator[Tuple[T, RelatedType]]:
 
        mapping: Dict[T, List[data.Posting]] = collections.defaultdict(list)
 
        for post in postings:
 
            mapping[key(post)].append(post)
 
        for value, posts in mapping.items():
 
            yield value, cls(posts, _can_own=True)
 

	
 
    @classmethod
 
    def group_by_meta(cls: Type[RelatedType],
 
                      postings: Iterable[data.Posting],
 
                      key: MetaKey,
 
                      default: Optional[MetaValue]=None,
 
    ) -> Iterator[Tuple[Optional[MetaValue], RelatedType]]:
 
        """Relate postings by metadata value
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of post.meta.get(key, default).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same metadata value.
 
        """
 
        mapping: DefaultDict[Optional[MetaValue], List[data.Posting]] = collections.defaultdict(list)
 
        for post in postings:
 
            mapping[post.meta.get(key, default)].append(post)
 
        for value, posts in mapping.items():
 
            yield value, cls(posts, _can_own=True)
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.get(key, default)
 
        return cls._group_by(postings, key_func)
 

	
 
    @classmethod
 
    def group_by_first_meta_link(
 
            cls: Type[RelatedType],
 
            postings: Iterable[data.Posting],
 
            key: MetaKey,
 
    ) -> Iterator[Tuple[Optional[str], RelatedType]]:
 
        """Relate postings by the first link in metadata
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of
 
        post.meta.first_link(key, None).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same first metadata link.
 
        """
 
        def key_func(post: data.Posting) -> Optional[MetaValue]:
 
            return post.meta.first_link(key, None)
 
        return cls._group_by(postings, key_func)
 

	
 
    def __repr__(self) -> str:
 
        return f'<{type(self).__name__} {self._postings!r}>'
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, index: int) -> data.Posting: ...
 

	
 
    @overload
 
    def __getitem__(self: RelatedType, s: slice) -> RelatedType: ...
 

	
 
    def __getitem__(self: RelatedType,
 
                    index: Union[int, slice],
 
    ) -> Union[data.Posting, RelatedType]:
 
        if isinstance(index, slice):
 
            return type(self)(self._postings[index], _can_own=True)
 
        else:
 
            return self._postings[index]
 

	
 
    def __len__(self) -> int:
 
        return len(self._postings)
 

	
 
    def _all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        for post in self:
 
            try:
 
                yield from post.meta.get_links(key)
 
            except TypeError:
 
                pass
 

	
 
    def all_meta_links(self, key: MetaKey) -> Iterator[str]:
 
        return filters.iter_unique(self._all_meta_links(key))
 

	
 
    @overload
tests/test_reports_related_postings.py
Show inline comments
...
 
@@ -34,64 +34,85 @@ def accruals_and_payments(acct, src_acct, dst_acct, start_date, *amounts):
 
        yield testutil.Transaction(date=next(dates), postings=[
 
            (acct, amt, currency, post_meta),
 
            (dst_acct if amt < 0 else src_acct, -amt, currency, post_meta),
 
        ])
 

	
 
@pytest.fixture
 
def credit_card_cycle():
 
    return list(accruals_and_payments(
 
        'Liabilities:CreditCard',
 
        'Assets:Checking',
 
        'Expenses:Other',
 
        datetime.date(2020, 4, 1),
 
        (-110, 'USD'),
 
        (110, 'USD'),
 
        (-120, 'USD'),
 
        (120, 'USD'),
 
    ))
 

	
 
@pytest.fixture
 
def two_accruals_three_payments():
 
    return list(accruals_and_payments(
 
        'Assets:Receivable:Accounts',
 
        'Income:Donations',
 
        'Assets:Checking',
 
        datetime.date(2020, 4, 10),
 
        (440, 'USD'),
 
        (-230, 'USD'),
 
        (550, 'EUR'),
 
        (-210, 'USD'),
 
        (-550, 'EUR'),
 
    ))
 

	
 
@pytest.fixture
 
def link_swap_posts():
 
    retval = []
 
    meta = {
 
        'rt-id': 'rt:12 rt:16',
 
        '_post_type': data.Posting,
 
        '_meta_type': data.Metadata,
 
    }
 
    for n in range(1, 3):
 
        n = Decimal(n)
 
        retval.append(testutil.Posting(
 
            'Assets:Receivable:Accounts', n * 10, metanum=n, **meta,
 
        ))
 
    meta['rt-id'] = 'rt:16 rt:12'
 
    for n in range(1, 3):
 
        n = Decimal(n)
 
        retval.append(testutil.Posting(
 
            'Liabilities:Payable:Accounts', n * -10, metanum=n, **meta,
 
        ))
 
    return retval
 

	
 
def test_initialize_with_list(credit_card_cycle):
 
    related = core.RelatedPostings(credit_card_cycle[0].postings)
 
    assert len(related) == 2
 

	
 
def test_initialize_with_iterable(two_accruals_three_payments):
 
    related = core.RelatedPostings(
 
        post for txn in two_accruals_three_payments
 
        for post in txn.postings
 
        if post.account == 'Assets:Receivable:Accounts'
 
    )
 
    assert len(related) == 5
 

	
 
def test_balance_empty():
 
    balance = core.RelatedPostings().balance()
 
    assert not balance
 
    assert balance.is_zero()
 

	
 
@pytest.mark.parametrize('index,expected', enumerate([
 
    -110,
 
    0,
 
    -120,
 
    0,
 
]))
 
def test_balance_credit_card(credit_card_cycle, index, expected):
 
    related = core.RelatedPostings(
 
        txn.postings[0] for txn in credit_card_cycle[:index + 1]
 
    )
 
    assert related.balance() == {'USD': testutil.Amount(expected, 'USD')}
 

	
 
def check_iter_with_balance(entries):
 
    expect_posts = [txn.postings[0] for txn in entries]
 
    expect_balances = []
...
 
@@ -284,32 +305,64 @@ def test_first_meta_links():
 

	
 
def test_first_meta_links_fallback():
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Assets:Cash', 10, contract=value, _meta_type=data.Metadata,
 
    ) for value in ['1 2', testutil.PAST_DATE, '1 3', None, '2 3'])
 
    del related[-2].meta['contract']
 
    assert list(related.first_meta_links('contract', None)) == ['1', None, '2']
 

	
 
def test_group_by_meta_zero():
 
    assert not list(core.RelatedPostings.group_by_meta([], 'metacurrency'))
 

	
 
def test_group_by_meta_one(credit_card_cycle):
 
    posting = next(post for post in data.Posting.from_entries(credit_card_cycle)
 
                   if post.account.is_credit_card())
 
    actual = core.RelatedPostings.group_by_meta([posting], 'metacurrency')
 
    assert set(key for key, _ in actual) == {'USD'}
 

	
 
def test_group_by_meta_many(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
 
    assert set(actual) == {'USD', 'EUR'}
 
    for key, group in actual.items():
 
        assert 2 <= len(group) <= 3
 
        assert group.balance().is_zero()
 

	
 
def test_group_by_meta_many_single_posts(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
 
    assert set(actual) == {post.units.number for post in postings}
 
    assert len(actual) == len(postings)
 

	
 
def test_group_by_first_meta_link_zero():
 
    assert not list(core.RelatedPostings.group_by_first_meta_link([], 'foo'))
 

	
 
def test_group_by_first_meta_link_no_key(link_swap_posts):
 
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
 
        iter(link_swap_posts), 'Nonexistent',
 
    ))
 
    assert len(actual) == 1
 
    assert list(actual[None]) == link_swap_posts
 

	
 
def test_group_by_first_meta_link_bad_type(link_swap_posts):
 
    assert all(post.meta.get('metanum') for post in link_swap_posts), \
 
        "did not find metadata required by test"
 
    actual = dict(core.RelatedPostings.group_by_first_meta_link(
 
        iter(link_swap_posts), 'metanum',
 
    ))
 
    assert len(actual) == 1
 
    assert list(actual[None]) == link_swap_posts
 

	
 
def test_group_by_first_meta_link(link_swap_posts):
 
    actual_all = dict(core.RelatedPostings.group_by_first_meta_link(
 
        iter(link_swap_posts), 'rt-id',
 
    ))
 
    assert len(actual_all) == 2
 
    for key, expect_account in [
 
            ('rt:12', 'Assets:Receivable:Accounts'),
 
            ('rt:16', 'Liabilities:Payable:Accounts'),
 
    ]:
 
        actual = actual_all.get(key, '')
 
        assert len(actual) == 2
 
        assert all(post.account == expect_account for post in actual)
0 comments (0 inline, 0 general)