Changeset - f0a5116429a4
[Not reviewed]
0 1 0
Brett Smith - 3 years ago 2021-03-12 19:58:14
query: Add ContextMeta function.

Deduplicate metadata lookup code.
1 file changed with 11 insertions and 10 deletions:
0 comments (0 inline, 0 general)
Show inline comments
@@ -87,94 +87,101 @@ from decimal import Decimal
from pathlib import Path
from beancount.core.amount import _Amount as BeancountAmount
from beancount.core.inventory import Inventory
from beancount.core.position import _Position as Position

import beancount.query.numberify as bc_query_numberify
import beancount.query.query_compile as bc_query_compile
import beancount.query.query_env as bc_query_env
import beancount.query.query_execute as bc_query_execute
import beancount.query.query_parser as bc_query_parser
import beancount.query.query_render as bc_query_render
import as bc_query_shell
import odf.table  # type:ignore[import]
import rt

from . import core
from . import rewrite
from .. import books
from .. import cliutil
from .. import config as configmod
from .. import data
from .. import rtutil

PROGNAME = 'query-report'
SENTINEL = object()
logger = logging.getLogger('conservancy_beancount.reports.query')

CellFunc = Callable[[Any], odf.table.TableCell]
EnvironmentFunctions = Dict[
    # The real key type is something like:
    #   Union[str, Tuple[str, Type, ...]]
    # but two issues with that. One, you can't use Ellipses in a Tuple like
    # that, so there's no short way to declare this. Second, Beancount doesn't
    # declare it anyway, and mypy infers it as Sequence[object]. So just use
    # that.
RowTypes = Sequence[Tuple[str, Type]]
Rows = Sequence[NamedTuple]
RTResult = Optional[Mapping[Any, Any]]
Store = List[Any]
QueryExpression = Union[
QueryStatement = Union[

# This class annotates the types that Beancount's RowContexts have when they're
# passed to EvalFunction.__call__(). These types get set across
# create_row_context and execute_query.
class PostingContext:
    posting: Posting
    entry: Transaction
    balance: Inventory
    options_map: OptionsMap
    account_types: Mapping
    open_close_map: Mapping
    commodity_map: Mapping
    price_map: Mapping
    # Dynamically set by execute_query
    store: Store


def ContextMeta(context: PostingContext) -> data.PostingMeta:
    """Build a read-only PostingMeta object from the query context"""
    # We use sys.maxsize as the index because using a constant is fast, and
    # that helps keep the object read-only: if it ever tries to manipulate
    # the transaction, it'll get an IndexError.
    return data.PostingMeta(context.entry, sys.maxsize, context.posting).detached()


class MetaDocs(bc_query_env.AnyMeta):
    """Return a list of document links from metadata."""
    def __init__(self, operands: List[bc_query_compile.EvalNode]) -> None:
        super(bc_query_env.AnyMeta, self).__init__(operands, set)
        # The second argument is our return type.
        # It should match the annotated return type of __call__.

    def __call__(self, context: PostingContext) -> Set[str]:
        raw_value = super().__call__(context)
        seq = raw_value.split() if isinstance(raw_value, str) else ''
        return set(seq)


class RTField(NamedTuple):
    key: str
    parse: Optional[Callable[[str], object]]
    unset_value: Optional[str] = None

    def load(self, rt_ticket: RTResult) -> object:
        value = rt_ticket.get(self.key) if rt_ticket else None
        if not value or value == self.unset_value:
            return None
        elif self.parse is None:
            return value
@@ -245,58 +252,52 @@ class RTTicket(bc_query_compile.EvalFunction):
        if not rest:
        super().__init__(operands, set)

    def _rt_key(self, key: str) -> RTField:
            return self.FIELDS[key]
        except KeyError:
            raise ValueError(f"unknown RT ticket field {key!r}") from None

    def _meta_key(self, key: str) -> str:
        if key in data.LINK_METADATA:
            return key
            raise ValueError(f"metadata key {key!r} does not contain documentation links")

    def __call__(self, context: PostingContext) -> Set[object]:
        rt_key: str
        meta_key: str
        limit: int
        rt_key, meta_key, limit = self.eval_args(context)
        rt_field = self._rt_key(rt_key)
        meta_key = self._meta_key(meta_key)
        if context.posting.meta is None:
            meta_value: Any = SENTINEL
            meta_value = context.posting.meta.get(meta_key, SENTINEL)
        if meta_value is SENTINEL:
            meta_value = context.entry.meta.get(meta_key)
        if not isinstance(meta_value, str) or limit < 1:
            meta_value = ''
        if limit < 1:
            return set()
        ticket_ids: Set[str] = set()
        for link_s in meta_value.split():
        for link_s in ContextMeta(context).report_links(meta_key):
            rt_id = rtutil.RT.parse(link_s)
            if rt_id is not None:
                if len(ticket_ids) >= limit:
        retval: Set[object] = set()
        for ticket_id in ticket_ids:
                rt_ticket = self._rt_cache[ticket_id]
            except KeyError:
                rt_ticket = self.RT_CLIENT.get_ticket(ticket_id)
                self._rt_cache[ticket_id] = rt_ticket
            field_value = rt_field.load(rt_ticket)
            if field_value is None:
            elif isinstance(field_value, list):
        return retval


class StrMeta(bc_query_env.AnyMeta):
    """Looks up metadata like AnyMeta, then always returns a string."""
0 comments (0 inline, 0 general)