Files @ 8af45e5f8a5b
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/test_reports_query.py

Brett Smith
query: Add BQL functions for dealing with link metadata.

query-report was heading to a place where it was going to bifurcate. You
could structure input with its own special input switches, and with ODS
output, it would have its own dedicated grouping logic and use that.

But those things shouldn't be tied together for users. Instead, add
functions to BQL to be able to do the kind of grouping we want. This commit
adds those. Next we'll extend the ODS output to detect and format these
groups correctly.
"""test_reports_query.py - Unit tests for query report"""
# Copyright © 2021  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import argparse
import collections
import copy
import csv
import datetime
import io
import itertools
import re

import odf.table
import odf.text
import pytest

from . import testutil

from beancount.core import data as bc_data
from conservancy_beancount.books import FiscalYear
from conservancy_beancount.reports import query as qmod
from conservancy_beancount import rtutil

from decimal import Decimal

class MockRewriteRuleset:
    def __init__(self, multiplier=2):
        self.multiplier = multiplier

    def rewrite(self, posts):
        for post in posts:
            number, currency = post.units
            number *= self.multiplier
            yield post._replace(units=testutil.Amount(number, currency))


@pytest.fixture(scope='module')
def rt():
    return rtutil.RT(testutil.RTClient())

def pipe_main(arglist, config, stdout_type=io.StringIO):
    stdout = stdout_type()
    stderr = io.StringIO()
    returncode = qmod.main(arglist, stdout, stderr, config)
    return returncode, stdout, stderr

def test_books_loader_empty():
    result = qmod.BooksLoader(None)()
    assert not result.entries
    assert len(result.errors) == 1

def test_books_loader_plain():
    books_path = testutil.test_path(f'books/books/2018.beancount')
    loader = testutil.TestBooksLoader(books_path)
    result = qmod.BooksLoader(loader)()
    assert not result.errors
    assert result.entries
    min_date = datetime.date(2018, 3, 1)
    assert all(ent.date >= min_date for ent in result.entries)

def test_books_loader_rewrites():
    rewrites = [MockRewriteRuleset()]
    books_path = testutil.test_path(f'books/books/2018.beancount')
    loader = testutil.TestBooksLoader(books_path)
    result = qmod.BooksLoader(loader, None, None, rewrites)()
    assert not result.errors
    assert result.entries
    numbers = frozenset(
        abs(post.units.number)
        for entry in result.entries
        for post in getattr(entry, 'postings', ())
    )
    assert numbers
    assert all(abs(number) >= 40 for number in numbers)

@pytest.mark.parametrize('arglist,fy', testutil.combine_values(
    [['--report-type', 'text'], ['--format=text'], ['-f', 'txt']],
    range(2018, 2021),
))
def test_text_query(arglist, fy):
    books_path = testutil.test_path(f'books/books/{fy}.beancount')
    config = testutil.TestConfig(books_path=books_path)
    arglist += ['select', 'date,', 'narration,', 'account,', 'position']
    returncode, stdout, stderr = pipe_main(arglist, config)
    assert returncode == 0
    stdout.seek(0)
    lines = iter(stdout)
    next(lines); next(lines)  # Skip header
    for count, line in enumerate(lines, 1):
        assert re.match(rf'^{fy}-\d\d-\d\d\s+{fy} ', line)
    assert count >= 2

@pytest.mark.parametrize('arglist,fy', testutil.combine_values(
    [['--format=csv'], ['-f', 'csv'], ['-t', 'csv']],
    range(2018, 2021),
))
def test_csv_query(arglist, fy):
    books_path = testutil.test_path(f'books/books/{fy}.beancount')
    config = testutil.TestConfig(books_path=books_path)
    arglist += ['select', 'date,', 'narration,', 'account,', 'position']
    returncode, stdout, stderr = pipe_main(arglist, config)
    assert returncode == 0
    stdout.seek(0)
    for count, row in enumerate(csv.DictReader(stdout), 1):
        assert re.fullmatch(rf'{fy}-\d\d-\d\d', row['date'])
        assert row['narration'].startswith(f'{fy} ')
    assert count >= 2

@pytest.mark.parametrize('end_index', range(3))
def test_rewrite_query(end_index):
    books_path = testutil.test_path(f'books/books/2018.beancount')
    config = testutil.TestConfig(books_path=books_path)
    accounts = ['Assets', 'Income']
    expected = frozenset(accounts[:end_index])
    rewrite_paths = [
        testutil.test_path(f'userconfig/Rewrite{s}.yml')
        for s in expected
    ]
    arglist = [f'--rewrite-rules={path}' for path in rewrite_paths]
    arglist.append('--format=txt')
    arglist.append('select any_meta("root") as root')
    returncode, stdout, stderr = pipe_main(arglist, config)
    assert returncode == 0
    stdout.seek(0)
    actual = frozenset(line.rstrip('\n') for line in stdout)
    assert expected.issubset(actual)
    assert frozenset(accounts).difference(expected).isdisjoint(actual)

def test_ods_amount_formatting():
    row_types = [('amount', bc_data.Amount)]
    row_source = [(testutil.Amount(12),), (testutil.Amount(1480, 'JPY'),)]
    ods = qmod.QueryODS()
    ods.write_query(row_types, row_source)
    actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
    assert next(actual)[0].text == 'Amount'
    assert next(actual)[0].text == '$12.00'
    assert next(actual)[0].text == '¥1,480'
    assert next(actual, None) is None

def test_ods_datetime_formatting():
    row_types = [('date', datetime.date)]
    row_source = [(testutil.PAST_DATE,), (testutil.FUTURE_DATE,)]
    ods = qmod.QueryODS()
    ods.write_query(row_types, row_source)
    actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
    assert next(actual)[0].text == 'Date'
    assert next(actual)[0].text == testutil.PAST_DATE.isoformat()
    assert next(actual)[0].text == testutil.FUTURE_DATE.isoformat()
    assert next(actual, None) is None

@pytest.mark.parametrize('meta_key,header_text', [
    ('check', 'Check'),
    ('purchase-order', 'Purchase Order'),
    ('rt-id', 'Ticket'),
])
def test_ods_link_formatting(rt, meta_key, header_text):
    row_types = [(meta_key.replace('-', '_'), object)]
    row_source = [('rt:1/5',), ('rt:3 Checks/9.pdf',)]
    ods = qmod.QueryODS(rt)
    ods.write_query(row_types, row_source)
    rows = iter(ods.document.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
    assert next(rows).text == header_text
    actual = iter(
        [link.text for link in row.getElementsByType(odf.text.A)]
        for row in rows
    )
    assert next(actual) == ['photo.jpg']
    assert next(actual) == ['rt:3', '9.pdf']
    assert next(actual, None) is None

def test_ods_meta_formatting():
    row_types = [('metadata', object)]
    row_source = [(testutil.Amount(14),), (None,), ('foo bar',)]
    ods = qmod.QueryODS()
    ods.write_query(row_types, row_source)
    actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
    assert next(actual)[0].text == 'Metadata'
    assert next(actual)[0].text == '$14.00'
    assert next(actual)[0].text == ''
    assert next(actual)[0].text == 'foo bar'
    assert next(actual, None) is None

def test_ods_multicolumn_write(rt):
    row_types = [('date', datetime.date), ('rt-id', object), ('desc', str)]
    row_source = [
        (testutil.PAST_DATE, 'rt:1', 'aaa'),
        (testutil.FY_START_DATE, 'rt:2', 'bbb'),
        (testutil.FUTURE_DATE, 'rt:3', 'ccc'),
    ]
    ods = qmod.QueryODS(rt)
    ods.write_query(row_types, row_source)
    actual = iter(
        cell.text
        for row in testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild)
        for cell in row
    )
    assert next(actual) == 'Date'
    assert next(actual) == 'Ticket'
    assert next(actual) == 'Desc'
    assert next(actual) == testutil.PAST_DATE.isoformat()
    assert next(actual) == 'rt:1'
    assert next(actual) == 'aaa'
    assert next(actual) == testutil.FY_START_DATE.isoformat()
    assert next(actual) == 'rt:2'
    assert next(actual) == 'bbb'
    assert next(actual) == testutil.FUTURE_DATE.isoformat()
    assert next(actual) == 'rt:3'
    assert next(actual) == 'ccc'
    assert next(actual, None) is None

def test_ods_is_empty():
    ods = qmod.QueryODS()
    assert ods.is_empty()
    ods.write_query([], [])
    assert not ods.is_empty()

@pytest.mark.parametrize('fy,account,amt_prefix', [
    (2018, 'Assets', '($'),
    (2019, 'Income', '$'),
])
def test_ods_output(fy, account, amt_prefix):
    books_path = testutil.test_path(f'books/books/{fy}.beancount')
    config = testutil.TestConfig(books_path=books_path)
    arglist = [
        '-O', '-',
        '-f', 'ods',
        f'SELECT date, narration, UNITS(position) WHERE account ~ "^{account}:"',
    ]
    returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO)
    assert returncode == 0
    stdout.seek(0)
    ods_doc = odf.opendocument.load(stdout)
    rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow))
    next(rows)  # Skip header row
    amt_pattern = rf'^{re.escape(amt_prefix)}\d'
    for count, row in enumerate(rows, 1):
        date, narration, amount = row.childNodes
        assert re.fullmatch(rf'{fy}-\d{{2}}-\d{{2}}', date.text)
        assert narration.text.startswith(f'{fy} ')
        assert re.match(amt_pattern, amount.text)
    assert count