"""test_reports_query.py - Unit tests for query report""" # Copyright © 2021 Brett Smith # License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0 # # Full copyright and licensing details can be found at toplevel file # LICENSE.txt in the repository. import argparse import collections import copy import csv import datetime import io import itertools import re import odf.table import odf.text import pytest from . import testutil from beancount.core import data as bc_data from beancount.query import query_compile as bc_query_compile from beancount.query import query_execute as bc_query_execute from beancount.query import query_parser as bc_query_parser from conservancy_beancount.books import FiscalYear from conservancy_beancount.reports import query as qmod from conservancy_beancount import rtutil from decimal import Decimal UTC = datetime.timezone.utc class MockRewriteRuleset: def __init__(self, multiplier=2): self.multiplier = multiplier def rewrite(self, posts): for post in posts: number, currency = post.units number *= self.multiplier yield post._replace(units=testutil.Amount(number, currency)) class RowContext(bc_query_execute.RowContext): def __init__(self, entry, posting=None): super().__init__() self.entry = entry self.posting = posting @pytest.fixture(scope='module') def qparser(): return bc_query_parser.Parser() @pytest.fixture(scope='module') def rt(): return rtutil.RT(testutil.RTClient()) @pytest.fixture(scope='module') def ticket_query(): return qmod.RTTicket.with_client(testutil.RTClient(), 'testfixture') def const_operands(*args): return [bc_query_compile.EvalConstant(v) for v in args] def pipe_main(arglist, config, stdout_type=io.StringIO): stdout = stdout_type() stderr = io.StringIO() returncode = qmod.main(arglist, stdout, stderr, config) return returncode, stdout, stderr def test_rt_ticket_unconfigured(): with pytest.raises(RuntimeError): qmod.RTTicket(const_operands('id', 'rt-id')) @pytest.mark.parametrize('field_name', ['foo', 'bar']) def test_rt_ticket_bad_field(ticket_query, field_name): with pytest.raises(ValueError): ticket_query(const_operands(field_name, 'rt-id')) @pytest.mark.parametrize('meta_name', ['foo', 'bar']) def test_rt_ticket_bad_metadata(ticket_query, meta_name): with pytest.raises(ValueError): ticket_query(const_operands('id', meta_name)) @pytest.mark.parametrize('field_name,meta_name,expected', [ ('id', 'rt-id', {1}), ('Queue', 'approval', {'general'}), ('Requestors', 'invoice', {'mx1@example.org', 'requestor2@example.org'}), ('Due', 'tax-reporting', {datetime.datetime(2017, 1, 14, 12, 1, 0, tzinfo=UTC)}), ]) def test_rt_ticket_from_txn(ticket_query, field_name, meta_name, expected): func = ticket_query(const_operands(field_name, meta_name)) txn = testutil.Transaction(**{meta_name: 'rt:1'}, postings=[ ('Assets:Cash', 80), ]) context = RowContext(txn, txn.postings[0]) assert func(context) == expected @pytest.mark.parametrize('field_name,meta_name,expected', [ ('id', 'rt-id', {2}), ('Queue', 'approval', {'general'}), ('Requestors', 'invoice', {'mx2@example.org', 'requestor2@example.org'}), ('Due', 'tax-reporting', {datetime.datetime(2017, 1, 14, 12, 2, 0, tzinfo=UTC)}), ]) def test_rt_ticket_from_post(ticket_query, field_name, meta_name, expected): func = ticket_query(const_operands(field_name, meta_name)) txn = testutil.Transaction(**{meta_name: 'rt:1'}, postings=[ ('Assets:Cash', 110, {meta_name: 'rt:2/8'}), ]) context = RowContext(txn, txn.postings[0]) assert func(context) == expected @pytest.mark.parametrize('field_name,meta_name,expected,on_txn', [ ('id', 'approval', {1, 2}, True), ('Queue', 'check', {'general'}, False), ('Requestors', 'invoice', { 'mx1@example.org', 'mx2@example.org', 'requestor2@example.org', }, False), ]) def test_rt_ticket_multi_results(ticket_query, field_name, meta_name, expected, on_txn): func = ticket_query(const_operands(field_name, meta_name)) txn = testutil.Transaction(**{'rt-id': 'rt:1'}, postings=[ ('Assets:Cash', 110, {'rt-id': 'rt:2'}), ]) post = txn.postings[0] meta = txn.meta if on_txn else post.meta meta[meta_name] = 'rt:1/2 Docs/12.pdf rt:2/8' context = RowContext(txn, post) assert func(context) == expected @pytest.mark.parametrize('meta_value,on_txn', testutil.combine_values( ['', 'Docs/34.pdf', 'Docs/100.pdf Docs/120.pdf'], [True, False], )) def test_rt_ticket_no_results(ticket_query, meta_value, on_txn): func = ticket_query(const_operands('Queue', 'check')) txn = testutil.Transaction(**{'rt-id': 'rt:1'}, postings=[ ('Assets:Cash', 110, {'rt-id': 'rt:2'}), ]) post = txn.postings[0] meta = txn.meta if on_txn else post.meta meta['check'] = meta_value context = RowContext(txn, post) assert func(context) == set() def test_rt_ticket_caches_tickets(): rt_client = testutil.RTClient() rt_client.TICKET_DATA = testutil.RTClient.TICKET_DATA.copy() ticket_query = qmod.RTTicket.with_client(rt_client, 'cachetestA') func = ticket_query(const_operands('id', 'rt-id')) txn = testutil.Transaction(postings=[ ('Assets:Cash', 160, {'rt-id': 'rt:3'}), ]) context = RowContext(txn, txn.postings[0]) assert func(context) == {3} del rt_client.TICKET_DATA['3'] assert func(context) == {3} def test_rt_ticket_caches_tickets_not_found(): rt_client = testutil.RTClient() rt_client.TICKET_DATA = testutil.RTClient.TICKET_DATA.copy() rt3 = rt_client.TICKET_DATA.pop('3') ticket_query = qmod.RTTicket.with_client(rt_client, 'cachetestB') func = ticket_query(const_operands('id', 'rt-id')) txn = testutil.Transaction(postings=[ ('Assets:Cash', 160, {'rt-id': 'rt:3'}), ]) context = RowContext(txn, txn.postings[0]) assert func(context) == set() rt_client.TICKET_DATA['3'] = rt3 assert func(context) == set() def test_books_loader_empty(): result = qmod.BooksLoader(None)() assert not result.entries assert len(result.errors) == 1 def test_books_loader_plain(): books_path = testutil.test_path(f'books/books/2018.beancount') loader = testutil.TestBooksLoader(books_path) result = qmod.BooksLoader(loader)() assert not result.errors assert result.entries min_date = datetime.date(2018, 3, 1) assert all(ent.date >= min_date for ent in result.entries) def test_books_loader_rewrites(): rewrites = [MockRewriteRuleset()] books_path = testutil.test_path(f'books/books/2018.beancount') loader = testutil.TestBooksLoader(books_path) result = qmod.BooksLoader(loader, None, None, rewrites)() assert not result.errors assert result.entries numbers = frozenset( abs(post.units.number) for entry in result.entries for post in getattr(entry, 'postings', ()) ) assert numbers assert all(abs(number) >= 40 for number in numbers) @pytest.mark.parametrize('arglist,fy', testutil.combine_values( [['--report-type', 'text'], ['--format=text'], ['-f', 'txt']], range(2018, 2021), )) def test_text_query(arglist, fy): books_path = testutil.test_path(f'books/books/{fy}.beancount') config = testutil.TestConfig(books_path=books_path) arglist += ['select', 'date,', 'narration,', 'account,', 'position'] returncode, stdout, stderr = pipe_main(arglist, config) assert returncode == 0 stdout.seek(0) lines = iter(stdout) next(lines); next(lines) # Skip header for count, line in enumerate(lines, 1): assert re.match(rf'^{fy}-\d\d-\d\d\s+{fy} ', line) assert count >= 2 @pytest.mark.parametrize('arglist,fy', testutil.combine_values( [['--format=csv'], ['-f', 'csv'], ['-t', 'csv']], range(2018, 2021), )) def test_csv_query(arglist, fy): books_path = testutil.test_path(f'books/books/{fy}.beancount') config = testutil.TestConfig(books_path=books_path) arglist += ['select', 'date,', 'narration,', 'account,', 'position'] returncode, stdout, stderr = pipe_main(arglist, config) assert returncode == 0 stdout.seek(0) for count, row in enumerate(csv.DictReader(stdout), 1): assert re.fullmatch(rf'{fy}-\d\d-\d\d', row['date']) assert row['narration'].startswith(f'{fy} ') assert count >= 2 @pytest.mark.parametrize('end_index', range(3)) def test_rewrite_query(end_index): books_path = testutil.test_path(f'books/books/2018.beancount') config = testutil.TestConfig(books_path=books_path) accounts = ['Assets', 'Income'] expected = frozenset(accounts[:end_index]) rewrite_paths = [ testutil.test_path(f'userconfig/Rewrite{s}.yml') for s in expected ] arglist = [f'--rewrite-rules={path}' for path in rewrite_paths] arglist.append('--format=txt') arglist.append('select any_meta("root") as root') returncode, stdout, stderr = pipe_main(arglist, config) assert returncode == 0 stdout.seek(0) actual = frozenset(line.rstrip('\n') for line in stdout) assert expected.issubset(actual) assert frozenset(accounts).difference(expected).isdisjoint(actual) def test_ods_amount_formatting(qparser): statement = qparser.parse('SELECT UNITS(position)') row_types = [('amount', bc_data.Amount)] row_source = [(testutil.Amount(12),), (testutil.Amount(1480, 'JPY'),)] ods = qmod.QueryODS() ods.write_query(statement, row_types, row_source) actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) assert next(actual)[0].text == 'Amount' assert next(actual)[0].text == '$12.00' assert next(actual)[0].text == '¥1,480' assert next(actual, None) is None def test_ods_datetime_formatting(qparser): statement = qparser.parse('SELECT date') row_types = [('date', datetime.date)] row_source = [(testutil.PAST_DATE,), (testutil.FUTURE_DATE,)] ods = qmod.QueryODS() ods.write_query(statement, row_types, row_source) actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) assert next(actual)[0].text == 'Date' assert next(actual)[0].text == testutil.PAST_DATE.isoformat() assert next(actual)[0].text == testutil.FUTURE_DATE.isoformat() assert next(actual, None) is None @pytest.mark.parametrize('meta_key,meta_func', [ ('check', 'ANY_META'), ('purchase-order', 'META'), ('rt-id', 'META_DOCS'), ]) def test_ods_link_formatting(qparser, rt, meta_key, meta_func): meta_func_returns_list = meta_func == 'META_DOCS' statement = qparser.parse(f'SELECT {meta_func}({meta_key!r}) AS docs') row_types = [('docs', list if meta_func_returns_list else str)] row_source = [ (s.split() if meta_func_returns_list else s,) for s in ['rt:1/5', 'rt:3 Checks/9.pdf'] ] ods = qmod.QueryODS(rt) ods.write_query(statement, row_types, row_source) rows = iter(ods.document.spreadsheet.firstChild.getElementsByType(odf.table.TableRow)) assert next(rows).text == 'Docs' actual = iter( [link.text for link in row.getElementsByType(odf.text.A)] for row in rows ) assert next(actual) == ['photo.jpg'] assert next(actual) == ['rt:3', '9.pdf'] assert next(actual, None) is None def test_ods_meta_formatting(qparser): statement = qparser.parse('SELECT ANY_META("entity") AS entity') row_types = [('entity', object)] row_source = [(testutil.Amount(14),), (None,), ('foo bar',)] ods = qmod.QueryODS() ods.write_query(statement, row_types, row_source) actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) assert next(actual)[0].text == 'Entity' assert next(actual)[0].text == '$14.00' assert next(actual)[0].text == '' assert next(actual)[0].text == 'foo bar' assert next(actual, None) is None def test_ods_multicolumn_write(qparser, rt): statement = qparser.parse( 'SELECT MIN(date) AS date, SET(META_DOCS("rt-id")) AS tix, STR_META("entity") AS entity', ) row_types = [('date', datetime.date), ('tix', set), ('entity', str)] row_source = [ (testutil.PAST_DATE, {'rt:1'}, 'AA'), (testutil.FY_START_DATE, {'rt:2'}, 'BB'), (testutil.FUTURE_DATE, {'rt:3', 'rt:4'}, 'CC'), ] ods = qmod.QueryODS(rt) ods.write_query(statement, list(row_types), list(row_source)) actual = iter( cell.text for row in testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) for cell in row ) for expected, _ in row_types: assert next(actual) == expected.title() assert next(actual) == testutil.PAST_DATE.isoformat() assert next(actual) == 'rt:1' assert next(actual) == 'AA' assert next(actual) == testutil.FY_START_DATE.isoformat() assert next(actual) == 'rt:2' assert next(actual) == 'BB' assert next(actual) == testutil.FUTURE_DATE.isoformat() assert frozenset(next(actual).split('\0')) == row_source[-1][1] assert next(actual) == 'CC' assert next(actual, None) is None def test_ods_is_empty(qparser): statement = qparser.parse('SELECT * WHERE date < 1900-01-01') ods = qmod.QueryODS() assert ods.is_empty() ods.write_query(statement, [], []) assert not ods.is_empty() @pytest.mark.parametrize('fy,account,amt_prefix', [ (2018, 'Assets', '($'), (2019, 'Income', '$'), ]) def test_ods_output(fy, account, amt_prefix): books_path = testutil.test_path(f'books/books/{fy}.beancount') config = testutil.TestConfig(books_path=books_path) arglist = [ '-O', '-', '-f', 'ods', f'SELECT date, narration, UNITS(position) WHERE account ~ "^{account}:"', ] returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO) assert returncode == 0 with stdout: stdout.seek(0) ods_doc = odf.opendocument.load(stdout) rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow)) next(rows) # Skip header row amt_pattern = rf'^{re.escape(amt_prefix)}\d' for count, row in enumerate(rows, 1): date, narration, amount = row.childNodes assert re.fullmatch(rf'{fy}-\d{{2}}-\d{{2}}', date.text) assert narration.text.startswith(f'{fy} ') assert re.match(amt_pattern, amount.text) assert count def test_ods_aggregate_output(): books_path = testutil.test_path(f'books/books/2020.beancount') config = testutil.TestConfig(books_path=books_path) arglist = [ '-O', '-', '-f', 'ods', 'SELECT account, SET(narration), SUM(UNITS(position))', 'WHERE date >= 2020-04-01 AND date <= 2020-04-02', 'GROUP BY account ORDER BY account ASC', ] returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO) assert returncode == 0 with stdout: stdout.seek(0) ods_doc = odf.opendocument.load(stdout) rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow)) next(rows) # Skip header row actual = {} for row in rows: acct, descs, balance = row.childNodes actual[acct.text] = (frozenset(descs.text.split('\0')), balance.text) in_desc = {'2020 donation'} ex_desc = {'2020 bank maintenance fee'} assert actual['Income:Donations'] == (in_desc, '$20.20') assert actual['Expenses:BankingFees'] == (ex_desc, '$1.00') assert actual['Assets:Checking'] == (in_desc | ex_desc, '($21.20)')