Changeset - f0a5116429a4
[Not reviewed]
0 1 0
Brett Smith - 3 years ago 2021-03-12 19:58:14
brettcsmith@brettcsmith.org
query: Add ContextMeta function.

Deduplicate metadata lookup code.
1 file changed with 11 insertions and 10 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/query.py
Show inline comments
...
 
@@ -99,25 +99,24 @@ import beancount.query.shell as bc_query_shell
 
import odf.table  # type:ignore[import]
 
import rt
 

	
 
from . import core
 
from . import rewrite
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from .. import rtutil
 

	
 
PROGNAME = 'query-report'
 
SENTINEL = object()
 
logger = logging.getLogger('conservancy_beancount.reports.query')
 

	
 
CellFunc = Callable[[Any], odf.table.TableCell]
 
EnvironmentFunctions = Dict[
 
    # The real key type is something like:
 
    #   Union[str, Tuple[str, Type, ...]]
 
    # but two issues with that. One, you can't use Ellipses in a Tuple like
 
    # that, so there's no short way to declare this. Second, Beancount doesn't
 
    # declare it anyway, and mypy infers it as Sequence[object]. So just use
 
    # that.
 
    Sequence[object],
 
    Type[bc_query_compile.EvalFunction],
...
 
@@ -145,24 +144,32 @@ class PostingContext:
 
    posting: Posting
 
    entry: Transaction
 
    balance: Inventory
 
    options_map: OptionsMap
 
    account_types: Mapping
 
    open_close_map: Mapping
 
    commodity_map: Mapping
 
    price_map: Mapping
 
    # Dynamically set by execute_query
 
    store: Store
 

	
 

	
 
def ContextMeta(context: PostingContext) -> data.PostingMeta:
 
    """Build a read-only PostingMeta object from the query context"""
 
    # We use sys.maxsize as the index because using a constant is fast, and
 
    # that helps keep the object read-only: if it ever tries to manipulate
 
    # the transaction, it'll get an IndexError.
 
    return data.PostingMeta(context.entry, sys.maxsize, context.posting).detached()
 

	
 

	
 
class MetaDocs(bc_query_env.AnyMeta):
 
    """Return a list of document links from metadata."""
 
    def __init__(self, operands: List[bc_query_compile.EvalNode]) -> None:
 
        super(bc_query_env.AnyMeta, self).__init__(operands, set)
 
        # The second argument is our return type.
 
        # It should match the annotated return type of __call__.
 

	
 
    def __call__(self, context: PostingContext) -> Set[str]:
 
        raw_value = super().__call__(context)
 
        seq = raw_value.split() if isinstance(raw_value, str) else ''
 
        return set(seq)
 

	
...
 
@@ -257,34 +264,28 @@ class RTTicket(bc_query_compile.EvalFunction):
 
        if key in data.LINK_METADATA:
 
            return key
 
        else:
 
            raise ValueError(f"metadata key {key!r} does not contain documentation links")
 

	
 
    def __call__(self, context: PostingContext) -> Set[object]:
 
        rt_key: str
 
        meta_key: str
 
        limit: int
 
        rt_key, meta_key, limit = self.eval_args(context)
 
        rt_field = self._rt_key(rt_key)
 
        meta_key = self._meta_key(meta_key)
 
        if context.posting.meta is None:
 
            meta_value: Any = SENTINEL
 
        else:
 
            meta_value = context.posting.meta.get(meta_key, SENTINEL)
 
        if meta_value is SENTINEL:
 
            meta_value = context.entry.meta.get(meta_key)
 
        if not isinstance(meta_value, str) or limit < 1:
 
            meta_value = ''
 
        if limit < 1:
 
            return set()
 
        ticket_ids: Set[str] = set()
 
        for link_s in meta_value.split():
 
        for link_s in ContextMeta(context).report_links(meta_key):
 
            rt_id = rtutil.RT.parse(link_s)
 
            if rt_id is not None:
 
                ticket_ids.add(rt_id[0])
 
                if len(ticket_ids) >= limit:
 
                    break
 
        retval: Set[object] = set()
 
        for ticket_id in ticket_ids:
 
            try:
 
                rt_ticket = self._rt_cache[ticket_id]
 
            except KeyError:
 
                rt_ticket = self.RT_CLIENT.get_ticket(ticket_id)
 
                self._rt_cache[ticket_id] = rt_ticket
0 comments (0 inline, 0 general)