"""test_reports_query.py - Unit tests for query report""" # Copyright © 2021 Brett Smith # License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0 # # Full copyright and licensing details can be found at toplevel file # LICENSE.txt in the repository. import argparse import collections import copy import csv import datetime import io import itertools import re import odf.table import odf.text import pytest from . import testutil from beancount.core import data as bc_data from beancount.query import query_parser as bc_query_parser from conservancy_beancount.books import FiscalYear from conservancy_beancount.reports import query as qmod from conservancy_beancount import rtutil from decimal import Decimal class MockRewriteRuleset: def __init__(self, multiplier=2): self.multiplier = multiplier def rewrite(self, posts): for post in posts: number, currency = post.units number *= self.multiplier yield post._replace(units=testutil.Amount(number, currency)) @pytest.fixture(scope='module') def qparser(): return bc_query_parser.Parser() @pytest.fixture(scope='module') def rt(): return rtutil.RT(testutil.RTClient()) def pipe_main(arglist, config, stdout_type=io.StringIO): stdout = stdout_type() stderr = io.StringIO() returncode = qmod.main(arglist, stdout, stderr, config) return returncode, stdout, stderr def test_books_loader_empty(): result = qmod.BooksLoader(None)() assert not result.entries assert len(result.errors) == 1 def test_books_loader_plain(): books_path = testutil.test_path(f'books/books/2018.beancount') loader = testutil.TestBooksLoader(books_path) result = qmod.BooksLoader(loader)() assert not result.errors assert result.entries min_date = datetime.date(2018, 3, 1) assert all(ent.date >= min_date for ent in result.entries) def test_books_loader_rewrites(): rewrites = [MockRewriteRuleset()] books_path = testutil.test_path(f'books/books/2018.beancount') loader = testutil.TestBooksLoader(books_path) result = qmod.BooksLoader(loader, None, None, rewrites)() assert not result.errors assert result.entries numbers = frozenset( abs(post.units.number) for entry in result.entries for post in getattr(entry, 'postings', ()) ) assert numbers assert all(abs(number) >= 40 for number in numbers) @pytest.mark.parametrize('arglist,fy', testutil.combine_values( [['--report-type', 'text'], ['--format=text'], ['-f', 'txt']], range(2018, 2021), )) def test_text_query(arglist, fy): books_path = testutil.test_path(f'books/books/{fy}.beancount') config = testutil.TestConfig(books_path=books_path) arglist += ['select', 'date,', 'narration,', 'account,', 'position'] returncode, stdout, stderr = pipe_main(arglist, config) assert returncode == 0 stdout.seek(0) lines = iter(stdout) next(lines); next(lines) # Skip header for count, line in enumerate(lines, 1): assert re.match(rf'^{fy}-\d\d-\d\d\s+{fy} ', line) assert count >= 2 @pytest.mark.parametrize('arglist,fy', testutil.combine_values( [['--format=csv'], ['-f', 'csv'], ['-t', 'csv']], range(2018, 2021), )) def test_csv_query(arglist, fy): books_path = testutil.test_path(f'books/books/{fy}.beancount') config = testutil.TestConfig(books_path=books_path) arglist += ['select', 'date,', 'narration,', 'account,', 'position'] returncode, stdout, stderr = pipe_main(arglist, config) assert returncode == 0 stdout.seek(0) for count, row in enumerate(csv.DictReader(stdout), 1): assert re.fullmatch(rf'{fy}-\d\d-\d\d', row['date']) assert row['narration'].startswith(f'{fy} ') assert count >= 2 @pytest.mark.parametrize('end_index', range(3)) def test_rewrite_query(end_index): books_path = testutil.test_path(f'books/books/2018.beancount') config = testutil.TestConfig(books_path=books_path) accounts = ['Assets', 'Income'] expected = frozenset(accounts[:end_index]) rewrite_paths = [ testutil.test_path(f'userconfig/Rewrite{s}.yml') for s in expected ] arglist = [f'--rewrite-rules={path}' for path in rewrite_paths] arglist.append('--format=txt') arglist.append('select any_meta("root") as root') returncode, stdout, stderr = pipe_main(arglist, config) assert returncode == 0 stdout.seek(0) actual = frozenset(line.rstrip('\n') for line in stdout) assert expected.issubset(actual) assert frozenset(accounts).difference(expected).isdisjoint(actual) def test_ods_amount_formatting(qparser): statement = qparser.parse('SELECT UNITS(position)') row_types = [('amount', bc_data.Amount)] row_source = [(testutil.Amount(12),), (testutil.Amount(1480, 'JPY'),)] ods = qmod.QueryODS() ods.write_query(statement, row_types, row_source) actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) assert next(actual)[0].text == 'Amount' assert next(actual)[0].text == '$12.00' assert next(actual)[0].text == '¥1,480' assert next(actual, None) is None def test_ods_datetime_formatting(qparser): statement = qparser.parse('SELECT date') row_types = [('date', datetime.date)] row_source = [(testutil.PAST_DATE,), (testutil.FUTURE_DATE,)] ods = qmod.QueryODS() ods.write_query(statement, row_types, row_source) actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) assert next(actual)[0].text == 'Date' assert next(actual)[0].text == testutil.PAST_DATE.isoformat() assert next(actual)[0].text == testutil.FUTURE_DATE.isoformat() assert next(actual, None) is None @pytest.mark.parametrize('meta_key,meta_func', [ ('check', 'ANY_META'), ('purchase-order', 'META'), ('rt-id', 'META_DOCS'), ]) def test_ods_link_formatting(qparser, rt, meta_key, meta_func): meta_func_returns_list = meta_func == 'META_DOCS' statement = qparser.parse(f'SELECT {meta_func}({meta_key!r}) AS docs') row_types = [('docs', list if meta_func_returns_list else str)] row_source = [ (s.split() if meta_func_returns_list else s,) for s in ['rt:1/5', 'rt:3 Checks/9.pdf'] ] ods = qmod.QueryODS(rt) ods.write_query(statement, row_types, row_source) rows = iter(ods.document.spreadsheet.firstChild.getElementsByType(odf.table.TableRow)) assert next(rows).text == 'Docs' actual = iter( [link.text for link in row.getElementsByType(odf.text.A)] for row in rows ) assert next(actual) == ['photo.jpg'] assert next(actual) == ['rt:3', '9.pdf'] assert next(actual, None) is None def test_ods_meta_formatting(qparser): statement = qparser.parse('SELECT ANY_META("entity") AS entity') row_types = [('entity', object)] row_source = [(testutil.Amount(14),), (None,), ('foo bar',)] ods = qmod.QueryODS() ods.write_query(statement, row_types, row_source) actual = testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) assert next(actual)[0].text == 'Entity' assert next(actual)[0].text == '$14.00' assert next(actual)[0].text == '' assert next(actual)[0].text == 'foo bar' assert next(actual, None) is None def test_ods_multicolumn_write(qparser, rt): statement = qparser.parse( 'SELECT MIN(date) AS date, SET(META_DOCS("rt-id")) AS tix, STR_META("entity") AS entity', ) row_types = [('date', datetime.date), ('tix', set), ('entity', str)] row_source = [ (testutil.PAST_DATE, {'rt:1'}, 'AA'), (testutil.FY_START_DATE, {'rt:2'}, 'BB'), (testutil.FUTURE_DATE, {'rt:3', 'rt:4'}, 'CC'), ] ods = qmod.QueryODS(rt) ods.write_query(statement, list(row_types), list(row_source)) actual = iter( cell.text for row in testutil.ODSCell.from_sheet(ods.document.spreadsheet.firstChild) for cell in row ) for expected, _ in row_types: assert next(actual) == expected.title() assert next(actual) == testutil.PAST_DATE.isoformat() assert next(actual) == 'rt:1' assert next(actual) == 'AA' assert next(actual) == testutil.FY_START_DATE.isoformat() assert next(actual) == 'rt:2' assert next(actual) == 'BB' assert next(actual) == testutil.FUTURE_DATE.isoformat() assert frozenset(next(actual).split('\0')) == row_source[-1][1] assert next(actual) == 'CC' assert next(actual, None) is None def test_ods_is_empty(qparser): statement = qparser.parse('SELECT * WHERE date < 1900-01-01') ods = qmod.QueryODS() assert ods.is_empty() ods.write_query(statement, [], []) assert not ods.is_empty() @pytest.mark.parametrize('fy,account,amt_prefix', [ (2018, 'Assets', '($'), (2019, 'Income', '$'), ]) def test_ods_output(fy, account, amt_prefix): books_path = testutil.test_path(f'books/books/{fy}.beancount') config = testutil.TestConfig(books_path=books_path) arglist = [ '-O', '-', '-f', 'ods', f'SELECT date, narration, UNITS(position) WHERE account ~ "^{account}:"', ] returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO) assert returncode == 0 with stdout: stdout.seek(0) ods_doc = odf.opendocument.load(stdout) rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow)) next(rows) # Skip header row amt_pattern = rf'^{re.escape(amt_prefix)}\d' for count, row in enumerate(rows, 1): date, narration, amount = row.childNodes assert re.fullmatch(rf'{fy}-\d{{2}}-\d{{2}}', date.text) assert narration.text.startswith(f'{fy} ') assert re.match(amt_pattern, amount.text) assert count def test_ods_aggregate_output(): books_path = testutil.test_path(f'books/books/2020.beancount') config = testutil.TestConfig(books_path=books_path) arglist = [ '-O', '-', '-f', 'ods', 'SELECT account, SET(narration), SUM(UNITS(position))', 'WHERE date >= 2020-04-01 AND date <= 2020-04-02', 'GROUP BY account ORDER BY account ASC', ] returncode, stdout, stderr = pipe_main(arglist, config, io.BytesIO) assert returncode == 0 with stdout: stdout.seek(0) ods_doc = odf.opendocument.load(stdout) rows = iter(ods_doc.spreadsheet.firstChild.getElementsByType(odf.table.TableRow)) next(rows) # Skip header row actual = {} for row in rows: acct, descs, balance = row.childNodes actual[acct.text] = (frozenset(descs.text.split('\0')), balance.text) in_desc = {'2020 donation'} ex_desc = {'2020 bank maintenance fee'} assert actual['Income:Donations'] == (in_desc, '$20.20') assert actual['Expenses:BankingFees'] == (ex_desc, '$1.00') assert actual['Assets:Checking'] == (in_desc | ex_desc, '($21.20)')