Changeset - 4de5df9035f6
[Not reviewed]
0 2 0
Brett Smith - 3 years ago 2021-02-19 23:54:36
brettcsmith@brettcsmith.org
typing: Upgrade more Posting Iterables to Iterators.
2 files changed with 5 insertions and 4 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/cliutil.py
Show inline comments
...
 
@@ -14,64 +14,65 @@ import argparse
 
import datetime
 
import enum
 
import io
 
import logging
 
import operator
 
import os
 
import pkg_resources
 
import re
 
import signal
 
import subprocess
 
import sys
 
import traceback
 
import types
 

	
 
from pathlib import Path
 

	
 
import rt.exceptions as rt_error
 

	
 
from . import data
 
from . import filters
 
from . import rtutil
 

	
 
from typing import (
 
    cast,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    Container,
 
    Generic,
 
    Hashable,
 
    IO,
 
    Iterable,
 
    Iterator,
 
    NamedTuple,
 
    NoReturn,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Type,
 
    TypeVar,
 
    Union,
 
)
 
from .beancount_types import (
 
    MetaKey,
 
)
 

	
 
ET = TypeVar('ET', bound=enum.Enum)
 
OutputFile = Union[int, IO]
 

	
 
CPU_COUNT = len(os.sched_getaffinity(0))
 
STDSTREAM_PATH = Path('-')
 
VERSION = pkg_resources.require(PKGNAME)[0].version
 

	
 
class EnumArgument(Generic[ET]):
 
    """Wrapper class to use an enum as argument values
 

	
 
    Use this class when the user can choose one of some arbitrary enum names
 
    as an argument. It will let user abbreviate and use any case, and will
 
    return the correct value as long as it's unambiguous. Typical usage
 
    looks like::
 

	
 
        enum_arg = EnumArgument(Enum)
 
        arg_parser.add_argument(
 
          '--choice',
 
          type=enum_arg.enum_type,  # or .value_type
...
 
@@ -254,65 +255,65 @@ class SearchTerm(NamedTuple):
 
        if ticket_default_key is None:
 
            ticket_default_key = default_key
 
        def parse_search_term(arg: str) -> 'SearchTerm':
 
            key: Optional[str] = None
 
            if re.match(r'^[a-z][-\w]*=', arg):
 
                key, _, raw_link = arg.partition('=')
 
            else:
 
                raw_link = arg
 
            rt_ids = rtutil.RT.parse(raw_link)
 
            if rt_ids is None:
 
                rt_ids = rtutil.RT.parse('rt:' + raw_link)
 
            if rt_ids is None:
 
                if key is None:
 
                    key = default_key
 
                pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
 
            else:
 
                ticket_id, attachment_id = rt_ids
 
                if key is None:
 
                    if attachment_id is None:
 
                        key = ticket_default_key
 
                    else:
 
                        key = default_key
 
                pattern = rtutil.RT.metadata_regexp(
 
                    ticket_id,
 
                    attachment_id,
 
                    first_link_only=key == 'rt-id' and attachment_id is None,
 
                )
 
            if key is None:
 
                raise ValueError(f"invalid search term {arg!r}: no metadata key")
 
            return cls(key, pattern)
 
        return parse_search_term
 

	
 
    def filter_postings(self, postings: Iterable[data.Posting]) -> Iterable[data.Posting]:
 
    def filter_postings(self, postings: Iterable[data.Posting]) -> Iterator[data.Posting]:
 
        return filters.filter_meta_match(
 
            postings, self.meta_key, re.compile(self.pattern),
 
        )
 

	
 
def add_jobs_argument(parser: argparse.ArgumentParser) -> argparse.Action:
 
    return parser.add_argument(
 
        '--jobs', '-j',
 
        metavar='NUM',
 
        type=jobs_arg,
 
        default=CPU_COUNT,
 
        help="""Maximum number of processes to run concurrently.
 
Can specify a positive integer or a percentage of CPU cores. Default all cores.
 
""")
 

	
 
def add_loglevel_argument(parser: argparse.ArgumentParser,
 
                          default: LogLevel=LogLevel.INFO) -> argparse.Action:
 
    arg_enum = EnumArgument(LogLevel)
 
    return parser.add_argument(
 
        '--loglevel',
 
        metavar='LEVEL',
 
        default=default.value,
 
        type=arg_enum.value_type,
 
        help="Show logs at this level and above."
 
        f" Specify one of {arg_enum.choices_str()}."
 
        f" Default {default.name.lower()!r}.",
 
    )
 

	
 
def add_rewrite_rules_argument(parser: argparse.ArgumentParser) -> argparse.Action:
 
    return parser.add_argument(
 
        '--rewrite-rules', '--rewrites', '-r',
 
        action='append',
 
        default=[],
conservancy_beancount/filters.py
Show inline comments
...
 
@@ -15,81 +15,81 @@ from . import rtutil
 

	
 
from typing import (
 
    cast,
 
    Hashable,
 
    Iterable,
 
    Iterator,
 
    Optional,
 
    Pattern,
 
    Set,
 
    TypeVar,
 
    Union,
 
)
 
from .beancount_types import (
 
    Directive,
 
    Entries,
 
    MetaKey,
 
    MetaValue,
 
    Transaction,
 
)
 

	
 
# Saying Optional works around <https://github.com/python/mypy/issues/8768>.
 
HashT = TypeVar('HashT', bound=Optional[Hashable])
 
Postings = Iterable[data.Posting]
 
Regexp = Union[str, Pattern]
 

	
 
def audit_date(entries: Entries) -> Optional[datetime.date]:
 
    for entry in entries:
 
        if (isinstance(entry, bc_data.Custom)
 
            and entry.type == 'conservancy_beancount_audit'):  # type:ignore[attr-defined]
 
            return entry.date
 
    return None
 

	
 
def filter_meta_equal(postings: Postings, key: MetaKey, value: MetaValue) -> Postings:
 
def filter_meta_equal(postings: Postings, key: MetaKey, value: MetaValue) -> Iterator[data.Posting]:
 
    for post in postings:
 
        try:
 
            if post.meta[key] == value:
 
                yield post
 
        except KeyError:
 
            pass
 

	
 
def filter_meta_match(postings: Postings, key: MetaKey, regexp: Regexp) -> Postings:
 
def filter_meta_match(postings: Postings, key: MetaKey, regexp: Regexp) -> Iterator[data.Posting]:
 
    for post in postings:
 
        try:
 
            if re.search(regexp, post.meta[key]):
 
                yield post
 
        except (KeyError, TypeError):
 
            pass
 

	
 
def filter_for_rt_id(postings: Postings, ticket_id: Union[int, str]) -> Postings:
 
def filter_for_rt_id(postings: Postings, ticket_id: Union[int, str]) -> Iterator[data.Posting]:
 
    """Filter postings with a primary RT ticket
 

	
 
    This functions yields postings where the *first* rt-id matches the given
 
    ticket number.
 
    """
 
    regexp = rtutil.RT.metadata_regexp(ticket_id, first_link_only=True)
 
    return filter_meta_match(postings, 'rt-id', regexp)
 

	
 
def iter_unique(seq: Iterable[HashT]) -> Iterator[HashT]:
 
    seen: Set[HashT] = set()
 
    for item in seq:
 
        if item not in seen:
 
            seen.add(item)
 
            yield item
 

	
 
def remove_opening_balance_txn(entries: Entries) -> Optional[Transaction]:
 
    """Remove an opening balance transaction from entries returned by Beancount
 

	
 
    Returns the removed transaction if found, or None if not.
 
    Note that it modifies the ``entries`` argument in-place.
 

	
 
    This function is useful for tools like accrual-report that are more
 
    focused on finding and reporting related transactions than providing
 
    total account balances, etc. Since the opening balance transaction does not
 
    provide the same metadata documentation as typical transactions, it's
 
    typically easiest to filter it out before cross-referencing transactions by
 
    metadata.
 

	
 
    Note that this function only removes a single transaction, because that's
 
    fastest for the common case.
 
    """
 
    for index, entry in enumerate(entries):
0 comments (0 inline, 0 general)