Changeset - bd0d607032eb
[Not reviewed]
0 4 0
Brett Smith - 4 years ago 2020-04-28 20:35:15
brettcsmith@brettcsmith.org
typing: Annotate Iterators more specifically.
4 files changed with 6 insertions and 6 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/data.py
Show inline comments
...
 
@@ -168,205 +168,205 @@ class Metadata(MutableMapping[MetaKey, MetaValue]):
 

	
 
    def __setitem__(self, key: MetaKey, value: MetaValue) -> None:
 
        self.meta[key] = value
 

	
 
    def __delitem__(self, key: MetaKey) -> None:
 
        del self.meta[key]
 

	
 
    def get_links(self, key: MetaKey) -> Sequence[str]:
 
        try:
 
            value = self.meta[key]
 
        except KeyError:
 
            return ()
 
        if isinstance(value, str):
 
            return value.split()
 
        else:
 
            raise TypeError("{} metadata is a {}, not str".format(
 
                key, type(value).__name__,
 
            ))
 

	
 

	
 
class PostingMeta(Metadata):
 
    """Combined access to posting metadata with its parent transaction metadata
 

	
 
    This lets you access posting metadata through a single dict-like object.
 
    If you try to look up metadata that doesn't exist on the posting, it will
 
    look for the value in the parent transaction metadata instead.
 

	
 
    You can set and delete metadata as well. Changes only affect the metadata
 
    of the posting, never the transaction. Changes are propagated to the
 
    underlying Beancount data structures.
 

	
 
    Functionally, you can think of this as identical to:
 

	
 
      collections.ChainMap(post.meta, txn.meta)
 

	
 
    Under the hood, this class does a little extra work to avoid creating
 
    posting metadata if it doesn't have to.
 
    """
 
    __slots__ = ('txn', 'index', 'post')
 

	
 
    def __init__(self,
 
                 txn: Transaction,
 
                 index: int,
 
                 post: Optional[BasePosting]=None,
 
    ) -> None:
 
        if post is None:
 
            post = txn.postings[index]
 
        self.txn = txn
 
        self.index = index
 
        self.post = post
 
        if post.meta is None:
 
            self.meta = self.txn.meta
 
        else:
 
            self.meta = collections.ChainMap(post.meta, txn.meta)
 

	
 
    def __getitem__(self, key: MetaKey) -> MetaValue:
 
        try:
 
            return super().__getitem__(key)
 
        except KeyError:
 
            if key == 'entity' and self.txn.payee is not None:
 
                return self.txn.payee
 
            else:
 
                raise
 

	
 
    def __setitem__(self, key: MetaKey, value: MetaValue) -> None:
 
        if self.post.meta is None:
 
            self.post = self.post._replace(meta={key: value})
 
            self.txn.postings[self.index] = self.post
 
            # mypy complains that self.post.meta could be None, but we know
 
            # from two lines up that it's not.
 
            self.meta = collections.ChainMap(self.post.meta, self.txn.meta)  # type:ignore[arg-type]
 
        else:
 
            super().__setitem__(key, value)
 

	
 
    def __delitem__(self, key: MetaKey) -> None:
 
        if self.post.meta is None:
 
            raise KeyError(key)
 
        else:
 
            super().__delitem__(key)
 

	
 
    # This is arguably cheating a litttle bit, but I'd argue the date of
 
    # the parent transaction still qualifies as posting metadata, and
 
    # it's something we want to access so often it's good to have it
 
    # within easy reach.
 
    @property
 
    def date(self) -> datetime.date:
 
        return self.txn.date
 

	
 

	
 
class Posting(BasePosting):
 
    """Enhanced Posting objects
 

	
 
    This class is a subclass of Beancount's native Posting class where
 
    specific fields are replaced with enhanced versions:
 

	
 
    * The `account` field is an Account object
 
    * The `units` field is our Amount object (which simply declares that the
 
      number is always a Decimal—see that docstring for details)
 
    * The `meta` field is a PostingMeta object
 
    """
 
    __slots__ = ()
 

	
 
    account: Account
 
    units: Amount
 
    # mypy correctly complains that our MutableMapping is not compatible
 
    # with Beancount's meta type declaration of Optional[Dict]. IMO
 
    # Beancount's type declaration is a smidge too specific: I think its type
 
    # declaration should also use MutableMapping, because it would be very
 
    # unusual for code to specifically require a Dict over that.
 
    # If it did, this declaration would pass without issue.
 
    meta: PostingMeta  # type:ignore[assignment]
 

	
 
    @classmethod
 
    def from_beancount(cls,
 
                       txn: Transaction,
 
                       index: int,
 
                       post: Optional[BasePosting]=None,
 
    ) -> 'Posting':
 
        if post is None:
 
            post = txn.postings[index]
 
        return cls(
 
            Account(post.account),
 
            *post[1:5],
 
            # see rationale above about Posting.meta
 
            PostingMeta(txn, index, post), # type:ignore[arg-type]
 
        )
 

	
 
    @classmethod
 
    def from_txn(cls, txn: Transaction) -> Iterable['Posting']:
 
    def from_txn(cls, txn: Transaction) -> Iterator['Posting']:
 
        """Yield an enhanced Posting object for every posting in the transaction"""
 
        for index, post in enumerate(txn.postings):
 
            yield cls.from_beancount(txn, index, post)
 

	
 
    @classmethod
 
    def from_entries(cls, entries: Iterable[Directive]) -> Iterable['Posting']:
 
    def from_entries(cls, entries: Iterable[Directive]) -> Iterator['Posting']:
 
        """Yield an enhanced Posting object for every posting in these entries"""
 
        for entry in entries:
 
            # Because Beancount's own Transaction class isn't type-checkable,
 
            # we can't statically check this. Might as well rely on duck
 
            # typing while we're at it: just try to yield postings from
 
            # everything, and ignore entries that lack a postings attribute.
 
            try:
 
                yield from cls.from_txn(entry)  # type:ignore[arg-type]
 
            except AttributeError:
 
                pass
 

	
 

	
 
_KT = TypeVar('_KT', bound=Hashable)
 
_VT = TypeVar('_VT')
 
class _SizedDict(collections.OrderedDict, MutableMapping[_KT, _VT]):
 
    def __init__(self, maxsize: int=128) -> None:
 
        self.maxsize = maxsize
 
        super().__init__()
 

	
 
    def __setitem__(self, key: _KT, value: _VT) -> None:
 
        super().__setitem__(key, value)
 
        for _ in range(self.maxsize, len(self)):
 
            self.popitem(last=False)
 

	
 

	
 
def balance_of(txn: Transaction,
 
               *preds: Callable[[Account], Optional[bool]],
 
) -> Amount:
 
    """Return the balance of specified postings in a transaction.
 

	
 
    Given a transaction and a series of account predicates, balance_of
 
    returns the balance of the amounts of all postings with accounts that
 
    match any of the predicates.
 

	
 
    balance_of uses the "weight" of each posting, so the return value will
 
    use the currency of the postings' cost when available.
 
    """
 
    match_posts = [post for post in Posting.from_txn(txn)
 
                   if any(pred(post.account) for pred in preds)]
 
    number = decimal.Decimal(0)
 
    if not match_posts:
 
        currency = ''
 
    else:
 
        weights: Sequence[Amount] = [
 
            bc_convert.get_weight(post) for post in match_posts
 
        ]
 
        number = sum((wt.number for wt in weights), number)
 
        currency = weights[0].currency
 
    return Amount(number, currency)
 

	
 
_opening_balance_cache: MutableMapping[str, bool] = _SizedDict()
 
def is_opening_balance_txn(txn: Transaction) -> bool:
 
    key = '\0'.join(
 
        f'{post.account}={post.units}' for post in txn.postings
 
    )
 
    try:
 
        return _opening_balance_cache[key]
 
    except KeyError:
 
        pass
 
    opening_equity = balance_of(txn, Account.is_opening_equity)
 
    if not opening_equity.currency:
 
        retval = False
 
    else:
 
        rest = balance_of(txn, lambda acct: not acct.is_opening_equity())
 
        if not rest.currency:
 
            retval = False
 
        else:
 
            retval = abs(opening_equity.number + rest.number) < decimal.Decimal('.01')
 
    _opening_balance_cache[key] = retval
 
    return retval
conservancy_beancount/plugin/__init__.py
Show inline comments
 
"""Beancount plugin entry point for Conservancy"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import importlib
 

	
 
import beancount.core.data as bc_data
 

	
 
from typing import (
 
    AbstractSet,
 
    Any,
 
    Dict,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Optional,
 
    Set,
 
    Tuple,
 
    Type,
 
)
 
from ..beancount_types import (
 
    ALL_DIRECTIVES,
 
    Directive,
 
    Entries,
 
    Errors,
 
    OptionsMap,
 
)
 
from .. import config as configmod
 
from .core import (
 
    Hook,
 
    HookName,
 
)
 
from ..errors import (
 
    Error,
 
)
 

	
 
__plugins__ = ['run']
 

	
 
class HookRegistry:
 
    INCLUDED_HOOKS: Dict[str, Optional[List[str]]] = {
 
        '.meta_approval': None,
 
        '.meta_entity': None,
 
        '.meta_expense_allocation': None,
 
        '.meta_income_type': None,
 
        '.meta_invoice': None,
 
        '.meta_payable_documentation': None,
 
        '.meta_paypal_id': ['MetaPayPalID'],
 
        '.meta_project': None,
 
        '.meta_receipt': None,
 
        '.meta_receivable_documentation': None,
 
        '.meta_repo_links': None,
 
        '.meta_rt_links': ['MetaRTLinks'],
 
        '.meta_tax_implication': None,
 
    }
 

	
 
    def __init__(self) -> None:
 
        self.group_name_map: Dict[HookName, Set[Type[Hook]]] = {
 
            t.__name__: set() for t in ALL_DIRECTIVES
 
        }
 
        self.group_name_map['all'] = set()
 

	
 
    def add_hook(self, hook_cls: Type[Hook]) -> Type[Hook]:
 
        self.group_name_map['all'].add(hook_cls)
 
        self.group_name_map[hook_cls.DIRECTIVE.__name__].add(hook_cls)
 
        for key in hook_cls.HOOK_GROUPS:
 
            self.group_name_map.setdefault(key, set()).add(hook_cls)
 
        return hook_cls  # to allow use as a decorator
 

	
 
    def import_hooks(self,
 
                     mod_name: str,
 
                     *hook_names: str,
 
                     package: Optional[str]=None,
 
    ) -> None:
 
        if not hook_names:
 
            _, _, hook_name = mod_name.rpartition('.')
 
            hook_names = (hook_name.title().replace('_', ''),)
 
        module = importlib.import_module(mod_name, package)
 
        for hook_name in hook_names:
 
            self.add_hook(getattr(module, hook_name))
 

	
 
    def load_included_hooks(self) -> None:
 
        for mod_name, hook_names in self.INCLUDED_HOOKS.items():
 
            self.import_hooks(mod_name, *(hook_names or []), package=self.__module__)
 

	
 
    def group_by_directive(self, config_str: str='') -> Iterable[Tuple[HookName, Type[Hook]]]:
 
    def group_by_directive(self, config_str: str='') -> Iterator[Tuple[HookName, Type[Hook]]]:
 
        config_str = config_str.strip()
 
        if not config_str:
 
            config_str = 'all'
 
        elif config_str.startswith('-'):
 
            config_str = 'all ' + config_str
 
        available_hooks: Set[Type[Hook]] = set()
 
        for token in config_str.split():
 
            if token.startswith('-'):
 
                update_available = available_hooks.difference_update
 
                key = token[1:]
 
            else:
 
                update_available = available_hooks.update
 
                key = token
 
            try:
 
                update_set = self.group_name_map[key]
 
            except KeyError:
 
                raise ValueError("configuration refers to unknown hooks {!r}".format(key)) from None
 
            else:
 
                update_available(update_set)
 
        for directive in ALL_DIRECTIVES:
 
            key = directive.__name__
 
            for hook in self.group_name_map[key] & available_hooks:
 
                yield key, hook
 

	
 

	
 
def run(
 
        entries: Entries,
 
        options_map: OptionsMap,
 
        config: str='',
 
        hook_registry: Optional[HookRegistry]=None,
 
) -> Tuple[Entries, Errors]:
 
    if hook_registry is None:
 
        hook_registry = HookRegistry()
 
        hook_registry.load_included_hooks()
 
    errors: Errors = []
 
    hooks: Dict[HookName, List[Hook]] = {
 
        # mypy thinks NamedTuples don't have __name__ but they do at runtime.
 
        t.__name__: [] for t in bc_data.ALL_DIRECTIVES  # type:ignore[attr-defined]
 
    }
 
    user_config = configmod.Config()
 
    for key, hook_type in hook_registry.group_by_directive(config):
 
        try:
 
            hook = hook_type(user_config)
 
        except Error as error:
 
            errors.append(error)
 
        else:
 
            hooks[key].append(hook)
 
    for entry in entries:
 
        entry_type = type(entry).__name__
 
        for hook in hooks[entry_type]:
 
            errors.extend(hook.run(entry))
 
    return entries, errors
 

	
conservancy_beancount/plugin/core.py
Show inline comments
...
 
@@ -134,142 +134,142 @@ class MetadataEnum:
 
          that each key will evaluate to. The code asserts that all values are
 
          in standard_values.
 
        """
 
        self.key = key
 
        self._stdvalues = frozenset(standard_values)
 
        self._aliases: Dict[MetaValueEnum, MetaValueEnum] = dict(aliases_map or ())
 
        assert self._stdvalues.issuperset(self._aliases.values())
 
        self._aliases.update((v, v) for v in standard_values)
 

	
 
    def __repr__(self) -> str:
 
        return "{}<{}>".format(type(self).__name__, self.key)
 

	
 
    def __contains__(self, key: MetaValueEnum) -> bool:
 
        """Returns true if `key` is a standard value or alias."""
 
        return key in self._aliases
 

	
 
    def __getitem__(self, key: MetaValueEnum) -> MetaValueEnum:
 
        """Return the standard value for `key`.
 

	
 
        Raises KeyError if `key` is not a known value or alias.
 
        """
 
        return self._aliases[key]
 

	
 
    def __iter__(self) -> Iterator[MetaValueEnum]:
 
        """Iterate over standard values."""
 
        return iter(self._stdvalues)
 

	
 
    def get(self,
 
            key: MetaValueEnum,
 
            default_key: Optional[MetaValueEnum]=None,
 
    ) -> Optional[MetaValueEnum]:
 
        """Return self[key], or a default fallback if that doesn't exist.
 

	
 
        default_key is another key to look up, *not* a default value to return.
 
        This helps ensure you always get a standard value.
 
        """
 
        try:
 
            return self[key]
 
        except KeyError:
 
            if default_key is None:
 
                return None
 
            else:
 
                return self[default_key]
 

	
 

	
 
### HOOK SUBCLASSES
 

	
 
class _PostingHook(TransactionHook, metaclass=abc.ABCMeta):
 
    TXN_DATE_RANGE: _GenericRange = _GenericRange(DEFAULT_START_DATE, DEFAULT_STOP_DATE)
 

	
 
    def __init_subclass__(cls) -> None:
 
        cls.HOOK_GROUPS = cls.HOOK_GROUPS.union(['posting'])
 

	
 
    def _run_on_txn(self, txn: Transaction) -> bool:
 
        return (
 
            txn.date in self.TXN_DATE_RANGE
 
            and not data.is_opening_balance_txn(txn)
 
        )
 

	
 
    def _run_on_post(self, txn: Transaction, post: data.Posting) -> bool:
 
        return True
 

	
 
    def run(self, txn: Transaction) -> errormod.Iter:
 
        if self._run_on_txn(txn):
 
            for post in data.Posting.from_txn(txn):
 
                if self._run_on_post(txn, post):
 
                    yield from self.post_run(txn, post)
 

	
 
    @abc.abstractmethod
 
    def post_run(self, txn: Transaction, post: data.Posting) -> errormod.Iter: ...
 

	
 

	
 
class _NormalizePostingMetadataHook(_PostingHook):
 
    """Base class to normalize posting metadata from an enum."""
 
    # This class provides basic functionality to filter postings, normalize
 
    # metadata values, and set default values.
 
    METADATA_KEY: MetaKey
 
    VALUES_ENUM: MetadataEnum
 

	
 
    def __init_subclass__(cls) -> None:
 
        super().__init_subclass__()
 
        cls.METADATA_KEY = cls.VALUES_ENUM.key
 
        cls.HOOK_GROUPS = cls.HOOK_GROUPS.union(['metadata', cls.METADATA_KEY])
 

	
 
    # If the posting does not specify METADATA_KEY, the hook calls
 
    # _default_value to get a default. This method should either return
 
    # a value string from METADATA_ENUM, or else raise InvalidMetadataError.
 
    # This base implementation does the latter.
 
    def _default_value(self, txn: Transaction, post: data.Posting) -> MetaValueEnum:
 
        raise errormod.InvalidMetadataError(txn, self.METADATA_KEY, None, post)
 

	
 
    def post_run(self, txn: Transaction, post: data.Posting) -> errormod.Iter:
 
        source_value = post.meta.get(self.METADATA_KEY)
 
        set_value = source_value
 
        error: Optional[errormod.Error] = None
 
        if source_value is None:
 
            try:
 
                set_value = self._default_value(txn, post)
 
            except errormod.Error as error_:
 
                error = error_
 
        else:
 
            try:
 
                set_value = self.VALUES_ENUM[source_value]
 
            except KeyError:
 
                error = errormod.InvalidMetadataError(
 
                    txn, self.METADATA_KEY, source_value, post,
 
                )
 
        if error is None:
 
            post.meta[self.METADATA_KEY] = set_value
 
        else:
 
            yield error
 

	
 

	
 
class _RequireLinksPostingMetadataHook(_PostingHook):
 
    """Base class to require that posting metadata include links"""
 
    # This base class confirms that a posting's metadata has one or more links
 
    # under one of the metadata keys listed in CHECKED_METADATA.
 
    # Most subclasses only need to define CHECKED_METADATA and _run_on_post.
 
    CHECKED_METADATA: Sequence[MetaKey]
 

	
 
    def __init_subclass__(cls) -> None:
 
        super().__init_subclass__()
 
        cls.HOOK_GROUPS = cls.HOOK_GROUPS.union(cls.CHECKED_METADATA).union('metadata')
 

	
 
    def _check_metadata(self,
 
                        txn: Transaction,
 
                        post: data.Posting,
 
                        keys: Sequence[MetaKey],
 
    ) -> Iterable[errormod.InvalidMetadataError]:
 
    ) -> Iterator[errormod.InvalidMetadataError]:
 
        have_docs = False
 
        for key in keys:
 
            try:
 
                links = post.meta.get_links(key)
 
            except TypeError as error:
 
                yield errormod.InvalidMetadataError(txn, key, post.meta[key], post)
 
            else:
 
                have_docs = have_docs or any(links)
 
        if not have_docs:
 
            yield errormod.InvalidMetadataError(txn, '/'.join(keys), None, post)
 

	
 
    def post_run(self, txn: Transaction, post: data.Posting) -> errormod.Iter:
 
        return self._check_metadata(txn, post, self.CHECKED_METADATA)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -18,147 +18,147 @@ import collections
 

	
 
from decimal import Decimal
 

	
 
from .. import data
 

	
 
from typing import (
 
    overload,
 
    DefaultDict,
 
    Dict,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    Optional,
 
    Sequence,
 
    Set,
 
    Tuple,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaKey,
 
    MetaValue,
 
)
 

	
 
class Balance(Mapping[str, data.Amount]):
 
    """A collection of amounts mapped by currency
 

	
 
    Each key is a Beancount currency string, and each value represents the
 
    balance in that currency.
 
    """
 
    __slots__ = ('_currency_map',)
 

	
 
    def __init__(self,
 
                 source: Union[Iterable[Tuple[str, data.Amount]],
 
                               Mapping[str, data.Amount]]=(),
 
    ) -> None:
 
        if isinstance(source, Mapping):
 
            source = source.items()
 
        self._currency_map = {
 
            currency: amount.number for currency, amount in source
 
        }
 

	
 
    def __repr__(self) -> str:
 
        return f"{type(self).__name__}({self._currency_map!r})"
 

	
 
    def __getitem__(self, key: str) -> data.Amount:
 
        return data.Amount(self._currency_map[key], key)
 

	
 
    def __iter__(self) -> Iterator[str]:
 
        return iter(self._currency_map)
 

	
 
    def __len__(self) -> int:
 
        return len(self._currency_map)
 

	
 
    def is_zero(self) -> bool:
 
        return all(number == 0 for number in self._currency_map.values())
 

	
 

	
 
class MutableBalance(Balance):
 
    __slots__ = ()
 

	
 
    def add_amount(self, amount: data.Amount) -> None:
 
        try:
 
            self._currency_map[amount.currency] += amount.number
 
        except KeyError:
 
            self._currency_map[amount.currency] = amount.number
 

	
 

	
 
class RelatedPostings(Sequence[data.Posting]):
 
    """Collect and query related postings
 

	
 
    This class provides common functionality for collecting related postings
 
    and running queries on them: iterating over them, tallying their balance,
 
    etc.
 

	
 
    This class doesn't know anything about how the postings are related. That's
 
    entirely up to the caller.
 

	
 
    A common pattern is to use this class with collections.defaultdict
 
    to organize postings based on some key. See the group_by_meta classmethod
 
    for an example.
 
    """
 

	
 
    def __init__(self, source: Iterable[data.Posting]=()) -> None:
 
        self._postings: List[data.Posting] = list(source)
 

	
 
    @classmethod
 
    def group_by_meta(cls,
 
                      postings: Iterable[data.Posting],
 
                      key: MetaKey,
 
                      default: Optional[MetaValue]=None,
 
    ) -> Mapping[Optional[MetaValue], 'RelatedPostings']:
 
        """Relate postings by metadata value
 

	
 
        This method takes an iterable of postings and returns a mapping.
 
        The keys of the mapping are the values of post.meta.get(key, default).
 
        The values are RelatedPostings instances that contain all the postings
 
        that had that same metadata value.
 
        """
 
        retval: DefaultDict[Optional[MetaValue], 'RelatedPostings'] = collections.defaultdict(cls)
 
        for post in postings:
 
            retval[post.meta.get(key, default)].add(post)
 
        retval.default_factory = None
 
        return retval
 

	
 
    @overload
 
    def __getitem__(self, index: int) -> data.Posting: ...
 

	
 
    @overload
 
    def __getitem__(self, s: slice) -> Sequence[data.Posting]: ...
 

	
 
    def __getitem__(self,
 
                    index: Union[int, slice],
 
    ) -> Union[data.Posting, Sequence[data.Posting]]:
 
        if isinstance(index, slice):
 
            raise NotImplementedError("RelatedPostings[slice]")
 
        else:
 
            return self._postings[index]
 

	
 
    def __len__(self) -> int:
 
        return len(self._postings)
 

	
 
    def add(self, post: data.Posting) -> None:
 
        self._postings.append(post)
 

	
 
    def clear(self) -> None:
 
        self._postings.clear()
 

	
 
    def iter_with_balance(self) -> Iterable[Tuple[data.Posting, Balance]]:
 
    def iter_with_balance(self) -> Iterator[Tuple[data.Posting, Balance]]:
 
        balance = MutableBalance()
 
        for post in self:
 
            balance.add_amount(post.units)
 
            yield post, balance
 

	
 
    def balance(self) -> Balance:
 
        for _, balance in self.iter_with_balance():
 
            pass
 
        try:
 
            return balance
 
        except NameError:
 
            return Balance()
 

	
 
    def meta_values(self,
 
                    key: MetaKey,
 
                    default: Optional[MetaValue]=None,
 
    ) -> Set[Optional[MetaValue]]:
 
        return {post.meta.get(key, default) for post in self}
0 comments (0 inline, 0 general)