Changeset - 0f58960b67f7
[Not reviewed]
0 2 0
Brett Smith - 3 years ago 2021-03-06 20:45:11
brettcsmith@brettcsmith.org
query: Add ODS output format.
2 files changed with 229 insertions and 8 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/query.py
Show inline comments
 
"""query.py - Report arbitrary queries with advanced loading and formatting"""
 
# Copyright © 2021  Brett Smith
 
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import argparse
 
import contextlib
 
import datetime
 
import enum
 
import itertools
 
import logging
 
import re
 
import sys
 

	
 
from typing import (
 
    cast,
 
    AbstractSet,
 
    Any,
 
    Callable,
 
    Dict,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    NamedTuple,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
    Type,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaValue,
 
    Posting,
 
    Transaction,
 
)
 

	
 
from decimal import Decimal
 
from pathlib import Path
 
from beancount.core.amount import _Amount as BeancountAmount
 

	
 
import beancount.query.numberify as bc_query_numberify
 
import beancount.query.query_compile as bc_query_compile
 
import beancount.query.query_env as bc_query_env
 
import beancount.query.query_execute as bc_query_execute
 
import beancount.query.query_parser as bc_query_parser
 
import beancount.query.query_render as bc_query_render
 
import beancount.query.shell as bc_query_shell
 
import odf.table  # type:ignore[import]
 

	
 
from . import core
 
from . import rewrite
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from .. import rtutil
 

	
 
BUILTIN_FIELDS: AbstractSet[str] = frozenset(itertools.chain(
 
    bc_query_env.TargetsEnvironment.columns,  # type:ignore[has-type]
 
    bc_query_env.TargetsEnvironment.functions,  # type:ignore[has-type]
 
))
 
PROGNAME = 'query-report'
 
QUERY_PARSER = bc_query_parser.Parser()
 
logger = logging.getLogger('conservancy_beancount.reports.query')
 

	
 
CellFunc = Callable[[Any], odf.table.TableCell]
 
RowTypes = Sequence[Tuple[str, Type]]
 
Rows = Sequence[NamedTuple]
 

	
 
class BooksLoader:
 
    """Closure to load books with a zero-argument callable
 

	
 
    This matches the load interface that BQLShell expects.
 
    """
 
    def __init__(
 
            self,
 
            books_loader: Optional[books.Loader],
 
            start_date: Optional[datetime.date]=None,
 
            stop_date: Optional[datetime.date]=None,
 
            rewrite_rules: Sequence[rewrite.RewriteRuleset]=(),
 
    ) -> None:
 
        self.books_loader = books_loader
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 
        self.rewrite_rules = rewrite_rules
 

	
 
    def __call__(self) -> books.LoadResult:
 
        logger.debug("BooksLoader called")
 
        result = books.Loader.dispatch(self.books_loader, self.start_date, self.stop_date)
 
        logger.debug("books loaded from Beancount")
 
        if self.rewrite_rules:
 
            for index, entry in enumerate(result.entries):
 
                # entry might not be a Transaction; we catch that later.
 
                # The type ignores are because the underlying Beancount type isn't
 
                # type-checkable.
 
                postings = data.Posting.from_txn(entry)  # type:ignore[arg-type]
 
                for ruleset in self.rewrite_rules:
 
                    postings = ruleset.rewrite(postings)
 
                try:
 
                    result.entries[index] = entry._replace(postings=list(postings))  # type:ignore[call-arg]
 
                except AttributeError:
 
                    pass
 
            logger.debug("rewrite rules applied")
 
        return result
 

	
 

	
 
class QueryODS(core.BaseODS[NamedTuple, None]):
 
    def is_empty(self) -> bool:
 
        return not self.sheet.childNodes
 

	
 
    def section_key(self, row: NamedTuple) -> None:
 
        return None
 

	
 
    def _cell_type(self, row_type: Type) -> CellFunc:
 
        if issubclass(row_type, BeancountAmount):
 
            return self.currency_cell
 
        elif issubclass(row_type, (int, float, Decimal)):
 
            return self.float_cell
 
        elif issubclass(row_type, datetime.date):
 
            return self.date_cell
 
        elif issubclass(row_type, str):
 
            return self.string_cell
 
        else:
 
            return self._generic_cell
 

	
 
    def _generic_cell(self, value: Any) -> odf.table.TableCell:
 
        return self.string_cell('' if value is None else str(value))
 

	
 
    def _link_cell(self, value: MetaValue) -> odf.table.TableCell:
 
        if isinstance(value, str):
 
            return self.meta_links_cell(value.split())
 
        else:
 
            return self._generic_cell(value)
 

	
 
    def _metadata_cell(self, value: MetaValue) -> odf.table.TableCell:
 
        return self._cell_type(type(value))(value)
 

	
 
    def _cell_types(self, row_types: RowTypes) -> Iterator[CellFunc]:
 
        for name, row_type in row_types:
 
            if row_type is object:
 
                if name.replace('_', '-') in data.LINK_METADATA:
 
                    yield self._link_cell
 
                else:
 
                    yield self._metadata_cell
 
            else:
 
                yield self._cell_type(row_type)
 

	
 
    def write_query(self, row_types: RowTypes, rows: Rows) -> None:
 
        if self.is_empty():
 
            self.sheet.setAttribute('name', "Query 1")
 
        else:
 
            self.use_sheet(f"Query {len(self.document.spreadsheet.childNodes) + 1}")
 
        for name, row_type in row_types:
 
            if row_type is object or issubclass(row_type, str):
 
                col_width = 2.0
 
            elif issubclass(row_type, BeancountAmount):
 
                col_width = 1.5
 
            else:
 
                col_width = 1.0
 
            col_style = self.column_style(col_width)
 
            self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
 
        self.add_row(*(
 
            self.string_cell(data.Metadata.human_name(name.replace('_', '-')),
 
                             stylename=self.style_bold)
 
            for name, _ in row_types
 
        ))
 
        self.lock_first_row()
 
        cell_funcs = list(self._cell_types(row_types))
 
        for row in rows:
 
            self.add_row(*(
 
                cell_func(value)
 
                for cell_func, value in zip(cell_funcs, row)
 
            ))
 

	
 

	
 
class BQLShell(bc_query_shell.BQLShell):
 
    def __init__(
 
            self,
 
            is_interactive: bool,
 
            loadfun: Callable[[], books.LoadResult],
 
            outfile: TextIO,
 
            default_format: str='text',
 
            do_numberify: bool=False,
 
            rt_wrapper: Optional[rtutil.RT]=None,
 
    ) -> None:
 
        super().__init__(is_interactive, loadfun, outfile, default_format, do_numberify)
 
        self.ods = QueryODS(rt_wrapper)
 

	
 
    def on_Select(self, statement: str) -> None:
 
        output_format: str = self.vars['format']
 
        try:
 
            render_func = getattr(self, f'_render_{output_format}')
 
        except AttributeError:
 
            logger.error("unknown output format %r", output_format)
 
            return
 

	
 
        try:
 
            logger.debug("compiling query")
 
            compiled_query = bc_query_compile.compile(
 
                statement, self.env_targets, self.env_postings, self.env_entries,
 
            )
 
            logger.debug("executing query")
 
            row_types, rows = bc_query_execute.execute_query(
 
                compiled_query, self.entries, self.options_map,
 
            )
 
            if self.vars['numberify'] and output_format != 'ods':
 
                logger.debug("numberifying query")
 
                row_types, rows = bc_query_numberify.numberify_results(
 
                    row_types, rows, self.options_map['dcontext'].build(),
 
                )
 
        except Exception as error:
 
            logger.error(str(error), exc_info=logger.isEnabledFor(logging.DEBUG))
 
            return
 

	
 
        if not rows and output_format != 'ods':
 
            print("(empty)", file=self.outfile)
 
        else:
 
            logger.debug("rendering query as %s", output_format)
 
            render_func(row_types, rows)
 

	
 
    def _render_csv(self, row_types: RowTypes, rows: Rows) -> None:
 
        bc_query_render.render_csv(
 
            row_types,
 
            rows,
 
            self.options_map['dcontext'],
 
            self.outfile,
 
            self.vars['expand'],
 
        )
 

	
 
    def _render_ods(self, row_types: RowTypes, rows: Rows) -> None:
 
        self.ods.write_query(row_types, rows)
 
        logger.info("results saved in sheet %s", self.ods.sheet.getAttribute('name'))
 

	
 
    def _render_text(self, row_types: RowTypes, rows: Rows) -> None:
 
        with contextlib.ExitStack() as stack:
 
            if self.is_interactive:
 
                output = stack.enter_context(self.get_pager())
 
            else:
 
                output = self.outfile
 
            bc_query_render.render_text(
 
                row_types,
 
                rows,
 
                self.options_map['dcontext'],
 
                output,
 
                self.vars['expand'],
 
                self.vars['boxed'],
 
                self.vars['spaced'],
 
            )
 

	
 

	
 
class JoinOperator(enum.Enum):
 
    AND = 'AND'
 
    OR = 'OR'
 

	
 
    def join(self, parts: Iterable[str]) -> str:
 
        return f' {self.value} '.join(parts)
 

	
 

	
 
class ReportFormat(enum.Enum):
 
    TEXT = 'text'
 
    TXT = TEXT
 
    CSV = 'csv'
 
    # ODS = 'ods'
 
    ODS = 'ods'
 

	
 

	
 
def _date_condition(
 
        date: Union[int, datetime.date],
 
        year_to_date: Callable[[int], datetime.date],
 
        op: str,
 
) -> str:
 
    if isinstance(date, int):
 
        date = year_to_date(date)
 
    return f'date {op} {date.isoformat()}'
 

	
 
def build_query(
 
        args: argparse.Namespace,
 
        fy: books.FiscalYear,
 
        in_file: Optional[TextIO]=None,
 
) -> Optional[str]:
 
    if not args.query:
 
        args.query = [] if in_file is None else [line[:-1] for line in in_file]
 
    plain_query = ' '.join(args.query)
 
    if not plain_query or plain_query.isspace():
 
        return None
 
    try:
 
        QUERY_PARSER.parse(plain_query)
 
    except bc_query_parser.ParseError:
 
        if args.join is None:
 
            args.join = JoinOperator.AND
 
        select = [
 
            'date',
 
            'ANY_META("entity") as entity',
 
            'narration',
 
            'position',
 
            'COST(position)',
 
            'ANY_META("entity") AS entity',
 
            'narration AS description',
 
            'COST(position) AS booked_amount',
 
            *(f'ANY_META("{field}") AS {field.replace("-", "_")}'
 
              if field not in BUILTIN_FIELDS
 
              and re.fullmatch(r'[a-z][-_A-Za-z0-9]*', field)
 
              else field
 
              for field in args.select),
 
        ]
 
        conds = [f'({args.join.join(args.query)})']
 
        if args.start_date is not None:
 
            conds.append(_date_condition(args.start_date, fy.first_date, '>='))
 
        if args.stop_date is not None:
 
            conds.append(_date_condition(args.stop_date, fy.next_fy_date, '<'))
 
        return f'SELECT {", ".join(select)} WHERE {" AND ".join(conds)}'
 
    else:
 
        if args.join:
 
            raise ValueError("cannot specify --join with a full query")
 
        if args.select:
 
            raise ValueError("cannot specify --select with a full query")
 
        return plain_query
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.year_or_date_arg,
 
        help="""Begin loading entries from this fiscal year. When query-report
 
builds the query, it will include a condition `date >= DATE`.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.year_or_date_arg,
 
        help="""End loading entries from this fiscal year. When query-report
 
builds the query, it will include a condition `date < DATE`. If you specify a
 
begin date but not an end date, the default end date will be the end of the
 
fiscal year of the begin date.
 
""")
 
    cliutil.add_rewrite_rules_argument(parser)
 
    format_arg = cliutil.EnumArgument(ReportFormat)
 
    parser.add_argument(
 
        '--report-type', '--format', '-t', '-f',
 
        metavar='TYPE',
 
        type=format_arg.enum_type,
 
        help="""Format of report to generate. Choices are
...
 
@@ -292,74 +381,88 @@ keys.
 
        metavar='COLUMN',
 
        help="""Group output by this column
 
""")
 
    # query_group.add_argument(
 
    #     '--order-by', '--sort', '-r',
 
    #     metavar='COLUMN',
 
    #     help="""Order output by this column
 
    # """),
 
    query_group.add_argument(
 
        '--join', '-j',
 
        metavar='OP',
 
        type=join_arg.enum_type,
 
        help=f"""Join your WHERE conditions with this operator.
 
Choices are {join_arg.choices_str()}. Default 'and'.
 
"""),
 
    query_group.add_argument(
 
        'query',
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Full query or WHERE conditions to run non-interactively
 
""")
 

	
 
    args = parser.parse_args(arglist)
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    fy = config.fiscal_year_begin()
 
    if args.stop_date is None and args.start_date is not None:
 
        args.stop_date = fy.next_fy_date(args.start_date)
 
    try:
 
        query = build_query(args, fy, None if sys.stdin.isatty() else sys.stdin)
 
    except ValueError as error:
 
        logger.error(error.args[0], exc_info=logger.isEnabledFor(logging.DEBUG))
 
        return 2
 
    is_interactive = query is None and sys.stdin.isatty()
 
    if args.report_type is None:
 
        try:
 
            args.report_type = ReportFormat[args.output_file.suffix[1:].upper()]
 
        except (AttributeError, KeyError):
 
            args.report_type = ReportFormat.TEXT # if is_interactive else ReportFormat.ODS
 
            args.report_type = ReportFormat.TEXT if is_interactive else ReportFormat.ODS
 

	
 
    load_func = BooksLoader(
 
        config.books_loader(),
 
        args.start_date,
 
        args.stop_date,
 
        [rewrite.RewriteRuleset.from_yaml(path) for path in args.rewrite_rules],
 
    )
 
    shell = BQLShell(
 
        is_interactive,
 
        load_func,
 
        stdout,
 
        args.report_type.value,
 
        args.numberify,
 
        config.rt_wrapper(),
 
    )
 
    shell.on_Reload()
 
    if query is None:
 
        shell.cmdloop()
 
    else:
 
        shell.onecmd(query)
 

	
 
    if not shell.ods.is_empty():
 
        shell.ods.set_common_properties(config.books_repo())
 
        shell.ods.set_custom_property('BeanQuery', query or '<interactive>')
 
        if args.output_file is None:
 
            out_dir_path = config.repository_path() or Path()
 
            args.output_file = out_dir_path / 'QueryResults_{}.ods'.format(
 
                datetime.datetime.now().isoformat(timespec='minutes'),
 
            )
 
            logger.info("Writing spreadsheet to %s", args.output_file)
 
        ods_file = cliutil.bytes_output(args.output_file, stdout)
 
        shell.ods.save_file(ods_file)
 

	
 
    return cliutil.ExitCode.OK
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
tests/test_reports_query.py
Show inline comments
 
"""test_reports_query.py - Unit tests for query report"""
 
# Copyright © 2021  Brett Smith
 
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import argparse
 
import collections
 
import copy
 
import csv
 
import datetime
 
import io
 
import itertools
 
import re
 

	
 
import odf.table
 
import odf.text
 
import pytest
 

	
 
from . import testutil
 

	
 
from beancount.core import data as bc_data
 
from conservancy_beancount.books import FiscalYear
 
from conservancy_beancount.reports import query as qmod
 
from conservancy_beancount import rtutil
 

	
 
from decimal import Decimal
 

	
 
class MockRewriteRuleset:
 
    def __init__(self, multiplier=2):
 
        self.multiplier = multiplier
 

	
 
    def rewrite(self, posts):
 
        for post in posts:
 
            number, currency = post.units
 
            number *= self.multiplier
 
            yield post._replace(units=testutil.Amount(number, currency))
 

	
 

	
 
@pytest.fixture(scope='module')
 
def fy():
 
    return FiscalYear(3, 1)
 

	
 
def pipe_main(arglist, config):
 
    stdout = io.StringIO()
 
@pytest.fixture(scope='module')
 
def rt():
 
    return rtutil.RT(testutil.RTClient())
 

	
 
def pipe_main(arglist, config, stdout_type=io.StringIO):
 
    stdout = stdout_type()
 
    stderr = io.StringIO()
 
    returncode = qmod.main(arglist, stdout, stderr, config)
 
    return returncode, stdout, stderr
 

	
 
def query_args(query=None, start_date=None, stop_date=None, join=None, select=None):
 
    if isinstance(join, str):
 
        join = qmod.JoinOperator[join]
 
    if select is None:
 
        select = []
 
    elif isinstance(select, str):
 
        select = select.split(',')
 
    return argparse.Namespace(**locals())
 

	
 
def test_books_loader_empty():
 
    result = qmod.BooksLoader(None)()
 
    assert not result.entries
 
    assert len(result.errors) == 1
 

	
 
def test_books_loader_plain():
 
    books_path = testutil.test_path(f'books/books/2018.beancount')
 
    loader = testutil.TestBooksLoader(books_path)
 
    result = qmod.BooksLoader(loader)()
 
    assert not result.errors
 
    assert result.entries
 
    min_date = datetime.date(2018, 3, 1)
 
    assert all(ent.date >= min_date for ent in result.entries)
 

	
 
def test_books_loader_rewrites():
 
    rewrites = [MockRewriteRuleset()]
 
    books_path = testutil.test_path(f'books/books/2018.beancount')
 
    loader = testutil.TestBooksLoader(books_path)
 
    result = qmod.BooksLoader(loader, None, None, rewrites)()
 
    assert not result.errors
 
    assert result.entries
 
    numbers = frozenset(
 
        abs(post.units.number)
 
        for entry in result.entries
 
        for post in getattr(entry, 'postings', ())
 
    )
 
    assert numbers
 
    assert all(abs(number) >= 40 for number in numbers)
 

	
 
@pytest.mark.parametrize('file_s', [None, '', ' \n \n\n'])
 
def test_build_query_empty(fy, file_s):
 
    args = query_args()
 
    if file_s is None:
 
        query = qmod.build_query(args, fy)
 
    else:
...
 
@@ -213,48 +221,158 @@ def test_build_query_from_file_where_clauses(fy):
 
def test_text_query(arglist, fy):
 
    books_path = testutil.test_path(f'books/books/{fy}.beancount')
 
    config = testutil.TestConfig(books_path=books_path)
 
    arglist += ['select', 'date,', 'narration,', 'account,', 'position']
 
    returncode, stdout, stderr = pipe_main(arglist, config)
 
    assert returncode == 0
 
    stdout.seek(0)
 
    lines = iter(stdout)
 
    next(lines); next(lines)  # Skip header
 
    for count, line in enumerate(lines, 1):
 
        assert re.match(rf'^{fy}-\d\d-\d\d\s+{fy} ', line)
 
    assert count >= 2
 

	
 
@pytest.mark.parametrize('arglist,fy', testutil.combine_values(
 
    [['--format=csv'], ['-f', 'csv'], ['-t', 'csv']],
 
    range(2018, 2021),
 
))
 
def test_csv_query(arglist, fy):
 
    books_path = testutil.test_path(f'books/books/{fy}.beancount')
 
    config = testutil.TestConfig(books_path=books_path)
 
    arglist += ['select', 'date,', 'narration,', 'account,', 'position']
 
    returncode, stdout, stderr = pipe_main(arglist, config)
 
    assert returncode == 0
 
    stdout.seek(0)
 
    for count, row in enumerate(csv.DictReader(stdout), 1):
 
        assert re.fullmatch(rf'{fy}-\d\d-\d\d', row['date'])
 
        assert row['narration'].startswith(f'{fy} ')
 
    assert count >= 2
 

	
 
@pytest.mark.parametrize('end_index', range(3))
 
def test_rewrite_query(end_index):
 
    books_path = testutil.test_path(f'books/books/2018.beancount')
 
    config = testutil.TestConfig(books_path=books_path)
 
    accounts = ['Assets', 'Income']
 
    expected = frozenset(accounts[:end_index])
 
    rewrite_paths = [
 
        testutil.test_path(f'userconfig/Rewrite{s}.yml')
 
        for s in expected
 
    ]
 
    arglist = [f'--rewrite-rules={path}' for path in rewrite_paths]
 
    arglist.append('--format=txt')
 
    arglist.append('select any_meta("root") as root')
 
    returncode, stdout, stderr = pipe_main(arglist, config)
 
    assert returncode == 0
 
    stdout.seek(0)
 
    actual = frozenset(line.rstrip('\n') for line in stdout)
 
    assert expected.issubset(actual)
 
    assert frozenset(accounts).difference(expected).isdisjoint(actual)
 

	
 
def test_ods_amount_formatting():
 
    row_types = [('amount', bc_data.Amount)]
 
    row_source = [(testutil.Amount(12),), (testutil.Amount(1480, 'JPY'),)]
 
    ods = qmod.QueryODS()
 
    ods.write_query(row_types, row_source)
 
    actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
 
    assert next(actual)[0].text == 'Amount'
 
    assert next(actual)[0].text == '$12.00'
 
    assert next(actual)[0].text == '¥1,480'
 
    assert next(actual, None) is None
 

	
 
def test_ods_datetime_formatting():
 
    row_types = [('date', datetime.date)]
 
    row_source = [(testutil.PAST_DATE,), (testutil.FUTURE_DATE,)]
 
    ods = qmod.QueryODS()
 
    ods.write_query(row_types, row_source)
 
    actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
 
    assert next(actual)[0].text == 'Date'
 
    assert next(actual)[0].text == testutil.PAST_DATE.isoformat()
 
    assert next(actual)[0].text == testutil.FUTURE_DATE.isoformat()
 
    assert next(actual, None) is None
 

	
 
@pytest.mark.parametrize('meta_key,header_text', [
 
    ('check', 'Check'),
 
    ('purchase-order', 'Purchase Order'),
 
    ('rt-id', 'Ticket'),
 
])
 
def test_ods_link_formatting(rt, meta_key, header_text):
 
    row_types = [(meta_key.replace('-', '_'), object)]
 
    row_source = [('rt:1/5',), ('rt:3 Checks/9.pdf',)]
 
    ods = qmod.QueryODS(rt)
 
    ods.write_query(row_types, row_source)
 
    rows = iter(ods.document.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
 
    assert next(rows).text == header_text
 
    actual = iter(
 
        [link.text for link in row.getElementsByType(odf.text.A)]
 
        for row in rows
 
    )
 
    assert next(actual) == ['photo.jpg']
 
    assert next(actual) == ['rt:3', '9.pdf']
 
    assert next(actual, None) is None
 

	
 
def test_ods_meta_formatting():
 
    row_types = [('metadata', object)]
 
    row_source = [(testutil.Amount(14),), (None,), ('foo bar',)]
 
    ods = qmod.QueryODS()
 
    ods.write_query(row_types, row_source)
 
    actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
 
    assert next(actual)[0].text == 'Metadata'
 
    assert next(actual)[0].text == '$14.00'
 
    assert next(actual)[0].text == ''
 
    assert next(actual)[0].text == 'foo bar'
 
    assert next(actual, None) is None
 

	
 
def test_ods_multicolumn_write(rt):
 
    row_types = [('date', datetime.date), ('rt-id', object), ('desc', str)]
 
    row_source = [
 
        (testutil.PAST_DATE, 'rt:1', 'aaa'),
 
        (testutil.FY_START_DATE, 'rt:2', 'bbb'),
 
        (testutil.FUTURE_DATE, 'rt:3', 'ccc'),
 
    ]
 
    ods = qmod.QueryODS(rt)
 
    ods.write_query(row_types, row_source)
 
    actual = iter(
 
        cell.text
 
        for row in testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
 
        for cell in row
 
    )
 
    assert next(actual) == 'Date'
 
    assert next(actual) == 'Ticket'
 
    assert next(actual) == 'Desc'
 
    assert next(actual) == testutil.PAST_DATE.isoformat()
 
    assert next(actual) == 'rt:1'
 
    assert next(actual) == 'aaa'
 
    assert next(actual) == testutil.FY_START_DATE.isoformat()
 
    assert next(actual) == 'rt:2'
 
    assert next(actual) == 'bbb'
 
    assert next(actual) == testutil.FUTURE_DATE.isoformat()
 
    assert next(actual) == 'rt:3'
 
    assert next(actual) == 'ccc'
 
    assert next(actual, None) is None
 

	
 
def test_ods_is_empty():
 
    ods = qmod.QueryODS()
 
    assert ods.is_empty()
 
    ods.write_query([], [])
 
    assert not ods.is_empty()
 

	
 
@pytest.mark.parametrize('fy,account,amt_prefix', [
 
    (2018, 'Assets', '($'),
 
    (2019, 'Income', '$'),
 
])
 
def test_ods_output(fy, account, amt_prefix):
 
    books_path = testutil.test_path(f'books/books/{fy}.beancount')
 
    config = testutil.TestConfig(books_path=books_path)
 
    arglist = ['-O', '-', '-f', 'ods', f'account ~ "^{account}:"']
 
    returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO)
 
    assert returncode == 0
 
    stdout.seek(0)
 
    ods_doc = odf.opendocument.load(stdout)
 
    rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
 
    next(rows)  # Skip header row
 
    amt_pattern = rf'^{re.escape(amt_prefix)}\d'
 
    for count, row in enumerate(rows, 1):
 
        date, entity, narration, amount = row.childNodes
 
        assert re.fullmatch(rf'{fy}-\d{{2}}-\d{{2}}', date.text)
 
        assert narration.text.startswith(f'{fy} ')
 
        assert re.match(amt_pattern, amount.text)
 
    assert count
0 comments (0 inline, 0 general)