Changeset - 6703d1af87ad
[Not reviewed]
0 2 0
Brett Smith - 3 years ago 2021-03-15 17:19:03
brettcsmith@brettcsmith.org
reports: BaseODS puts each line of strings in a P tag.

This seems to be the most straightforward way to get Calc to automatically
determine a nice row height for multi-line string cells. This has become a
lot more noticeable now that query-report supports putting postal addresses
in cells.
2 files changed with 13 insertions and 4 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
...
 
@@ -1247,388 +1247,395 @@ class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
 
        date_style.addElement(odf.number.Month(style='long'))
 
        date_style.addElement(odf.number.Text(text='-'))
 
        date_style.addElement(odf.number.Day(style='long'))
 
        self.style_date = self.ensure_child(
 
            styles,
 
            odf.style.Style,
 
            name=f'{date_style.getAttribute("name")}Cell',
 
            family='table-cell',
 
            datastylename=date_style,
 
        )
 

	
 
        self.style_starttext: odf.style.Style
 
        self.style_centertext: odf.style.Style
 
        self.style_endtext: odf.style.Style
 
        for textalign in ['start', 'center', 'end']:
 
            aligned_style = self.replace_child(
 
                styles, odf.style.Style, name=f'{textalign.title()}Text',
 
            )
 
            aligned_style.setAttribute('family', 'table-cell')
 
            aligned_style.addElement(odf.style.ParagraphProperties(textalign=textalign))
 
            setattr(self, f'style_{textalign}text', aligned_style)
 

	
 
        self.style_total = self.border_style(Border.TOP, '1pt')
 
        self.style_endtotal = self.border_style(Border.TOP | Border.BOTTOM, '1pt')
 
        self.style_bottomline = self.merge_styles(
 
            self.style_total,
 
            self.border_style(Border.BOTTOM, '2pt', 'double'),
 
        )
 

	
 
    ### Properties
 

	
 
    def set_common_properties(self,
 
                              repo: Optional[git.Repo]=None,
 
                              command: Optional[Sequence[str]]=sys.argv,
 
    ) -> None:
 
        if repo is None:
 
            git_shahex = '<none>'
 
            git_dirty = True
 
        else:
 
            git_shahex = repo.head.commit.hexsha
 
            git_dirty = repo.is_dirty()
 
        self.set_custom_property('GitSHA', git_shahex)
 
        self.set_custom_property('GitDirty', git_dirty, 'boolean')
 
        if command is not None:
 
            command_s = ' '.join(shlex.quote(s) for s in command)
 
            self.set_custom_property('ReportCommand', command_s)
 

	
 
    def set_custom_property(self,
 
                            name: str,
 
                            value: Any,
 
                            valuetype: Optional[str]=None,
 
    ) -> odf.meta.UserDefined:
 
        if valuetype is None:
 
            if isinstance(value, bool):
 
                valuetype = 'boolean'
 
            elif isinstance(value, (datetime.date, datetime.datetime)):
 
                valuetype = 'date'
 
            elif isinstance(value, (int, float, Decimal)):
 
                valuetype = 'float'
 
        if not isinstance(value, str):
 
            if valuetype == 'boolean':
 
                value = 'true' if value else 'false'
 
            elif valuetype == 'date':
 
                value = value.isoformat()
 
            else:
 
                value = str(value)
 
        retval = self.ensure_child(self.document.meta, odf.meta.UserDefined, name=name)
 
        if valuetype is None:
 
            try:
 
                retval.removeAttribute('valuetype')
 
            except KeyError:
 
                pass
 
        else:
 
            retval.setAttribute('valuetype', valuetype)
 
        retval.childNodes.clear()
 
        retval.addText(value)
 
        return retval
 

	
 
    def set_properties(self, *,
 
                       created: Optional[datetime.datetime]=None,
 
                       generator: str='conservancy_beancount',
 
    ) -> None:
 
        if created is None:
 
            created = datetime.datetime.now()
 
        created_elem = self.ensure_child(self.document.meta, odf.meta.CreationDate)
 
        created_elem.childNodes.clear()
 
        created_elem.addText(created.isoformat())
 
        generator_elem = self.ensure_child(self.document.meta, odf.meta.Generator)
 
        generator_elem.childNodes.clear()
 
        generator_elem.addText(f'{generator}/{cliutil.VERSION} {TOOLSVERSION}')
 

	
 
    ### Rows and cells
 

	
 
    def add_annotation(
 
            self,
 
            text: Optional[str]=None,
 
            when: Optional[datetime.datetime]=None,
 
            parent: Optional[odf.table.TableCell]=None,
 
    ) -> odf.office.Annotation:
 
        if when is None:
 
            when = datetime.datetime.now()
 
        retval = odf.office.Annotation()
 
        retval.appendChild(odf.dc.Date(text=when.isoformat(timespec='seconds')))
 
        if text is not None:
 
            retval.appendChild(odf.text.P(text=text))
 
        if parent is not None:
 
            parent.appendChild(retval)
 
        return retval
 

	
 
    def add_row(self, *cells: odf.table.TableCell, **attrs: Any) -> odf.table.TableRow:
 
        row = odf.table.TableRow(**attrs)
 
        for cell in cells:
 
            row.addElement(cell)
 
        self.sheet.addElement(row)
 
        return row
 

	
 
    def row_count(self, sheet: Optional[odf.table.Table]=None) -> int:
 
        if sheet is None:
 
            sheet = self.sheet
 
        TableRow = odf.table.TableRow
 
        return sum(1 for cell in sheet.childNodes if cell.isInstanceOf(TableRow))
 

	
 
    def balance_cell(self, balance: Balance, **attrs: Any) -> odf.table.TableCell:
 
        balance = balance.clean_copy() or balance
 
        balance_currency_count = len(balance)
 
        if balance_currency_count == 0:
 
            return self.float_cell(0, **attrs)
 
        elif balance_currency_count == 1:
 
            amount = next(iter(balance.values()))
 
            attrs['stylename'] = self.merge_styles(
 
                attrs.get('stylename'), self.currency_style(amount.currency),
 
            )
 
            return self.currency_cell(amount, **attrs)
 
        else:
 
            lines = [babel.numbers.format_currency(number, currency, get_commodity_format(
 
                self.locale, currency, None, self.currency_fmt_key,
 
            )) for number, currency in balance.values()]
 
            attrs['stylename'] = self.merge_styles(
 
                attrs.get('stylename'), self.style_endtext,
 
            )
 
            return self.multiline_cell(lines, **attrs)
 

	
 
    def currency_cell(self, amount: bc_amount._Amount, **attrs: Any) -> odf.table.TableCell:
 
        if 'stylename' not in attrs:
 
            attrs['stylename'] = self.currency_style(amount.currency)
 
        number, currency = amount
 
        cell = odf.table.TableCell(valuetype='currency', value=number, **attrs)
 
        cell.addElement(odf.text.P(text=babel.numbers.format_currency(
 
            number, currency, locale=self.locale, format_type=self.currency_fmt_key,
 
        )))
 
        return cell
 

	
 
    def date_cell(self, date: datetime.date, **attrs: Any) -> odf.table.TableCell:
 
        attrs.setdefault('stylename', self.style_date)
 
        cell = odf.table.TableCell(valuetype='date', datevalue=date, **attrs)
 
        cell.addElement(odf.text.P(text=date.isoformat()))
 
        return cell
 

	
 
    def float_cell(self, value: Union[int, float, Decimal], **attrs: Any) -> odf.table.TableCell:
 
        cell = odf.table.TableCell(valuetype='float', value=value, **attrs)
 
        cell.addElement(odf.text.P(text=str(value)))
 
        return cell
 

	
 
    def _meta_link_pairs(self, links: Iterable[Optional[str]]) -> Iterator[Tuple[str, str]]:
 
        for href in links:
 
            if href is None:
 
                continue
 
            elif self.rt_wrapper is not None:
 
                rt_ids = self.rt_wrapper.parse(href)
 
                rt_href = rt_ids and self.rt_wrapper.url(*rt_ids)
 
            else:
 
                rt_ids = None
 
                rt_href = None
 
            if rt_ids is None or rt_href is None:
 
                # '..' pops the ODS filename off the link path. In other words,
 
                # make the link relative to the directory the ODS is in.
 
                href_path = Path('..', href)
 
                href = urlparse.quote(str(href_path))
 
                text = href_path.name
 
            else:
 
                rt_path = urlparse.urlparse(rt_href).path
 
                if rt_path.endswith('/Ticket/Display.html'):
 
                    text = rtutil.RT.unparse(*rt_ids)
 
                else:
 
                    text = urlparse.unquote(Path(rt_path).name)
 
                href = rt_href
 
            yield (href, text)
 

	
 
    def meta_links_cell(self, links: Iterable[Optional[str]], **attrs: Any) -> odf.table.TableCell:
 
        return self.multilink_cell(self._meta_link_pairs(links), **attrs)
 

	
 
    def multiline_cell(self, lines: Iterable[Any], **attrs: Any) -> odf.table.TableCell:
 
        item_lines = [str(item).splitlines() for item in lines if item is not None]
 
        if any(len(seq) > 1 for seq in item_lines):
 
            for seq in item_lines:
 
                seq.append('')
 
            seq.pop()
 
        cell = odf.table.TableCell(valuetype='string', **attrs)
 
        for line in lines:
 
            cell.addElement(odf.text.P(text=str(line)))
 
        for seq in item_lines:
 
            for line in seq:
 
                cell.addElement(odf.text.P(text=line))
 
        return cell
 

	
 
    def multilink_cell(self, links: Iterable[LinkType], **attrs: Any) -> odf.table.TableCell:
 
        cell = odf.table.TableCell(valuetype='string', **attrs)
 
        for link in links:
 
            if isinstance(link, tuple):
 
                href, text = link
 
            else:
 
                href = link
 
                text = None
 
            cell.addElement(odf.text.P())
 
            cell.lastChild.addElement(odf.text.A(
 
                type='simple', href=href, text=text or href,
 
            ))
 
        return cell
 

	
 
    def string_cell(self, text: str, **attrs: Any) -> odf.table.TableCell:
 
        cell = odf.table.TableCell(valuetype='string', **attrs)
 
        cell.addElement(odf.text.P(text=text))
 
        for line in text.splitlines():
 
            cell.addElement(odf.text.P(text=line))
 
        return cell
 

	
 
    def write_row(self, row: RT) -> None:
 
        """Write a single row of input data to the spreadsheet
 

	
 
        This default implementation adds a single row to the spreadsheet,
 
        with one cell per element of the row. The type of each element
 
        determines what kind of cell is created.
 

	
 
        This implementation will help get you started, but you'll probably
 
        want to override it to specify styles.
 
        """
 
        out_row = odf.table.TableRow()
 
        for cell_source in row:
 
            if isinstance(cell_source, (int, float, Decimal)):
 
                cell = self.float_cell(cell_source)
 
            else:
 
                cell = self.string_cell(cell_source)
 
            out_row.addElement(cell)
 
        self.sheet.addElement(out_row)
 

	
 
    def save_file(self, out_file: BinaryIO) -> None:
 
        self.document.write(out_file)
 

	
 
    def save_path(self, path: Path, mode: str='w') -> None:
 
        with path.open(f'{mode}b') as out_file:
 
            out_file = cast(BinaryIO, out_file)
 
            self.save_file(out_file)
 

	
 

	
 
def account_balances(
 
        groups: Mapping[data.Account, PeriodPostings],
 
        order: Optional[Sequence[str]]=None,
 
) -> Iterator[Tuple[str, Balance]]:
 
    """Iterate account balances over a date range
 

	
 
    1. ``subclass = PeriodPostings.with_start_date(start_date)``
 
    2. ``groups = dict(subclass.group_by_account(postings))``
 
    3. ``for acct, bal in account_balances(groups, [optional ordering]): ...``
 

	
 
    This function returns an iterator of 2-tuples ``(account, balance)``
 
    that you can use to generate a report in the style of ``ledger balance``.
 
    The accounts are accounts in ``groups`` that appeared under one of the
 
    account name strings in ``order``. ``balance`` is the corresponding
 
    balance over the time period (``groups[key].period_bal``). Accounts are
 
    iterated in the order provided by ``sort_and_filter_accounts()``.
 

	
 
    The first 2-tuple is ``(OPENING_BALANCE_NAME, balance)`` with the balance of
 
    all these accounts as of ``start_date``.
 
    The final 2-tuple is ``(ENDING_BALANCE_NAME, balance)`` with the final
 
    balance of all these accounts as of ``start_date``.
 
    The iterator will always yield these special 2-tuples, even when there are
 
    no accounts in the input or to report.
 
    """
 
    if order is None:
 
        order = ['Equity', 'Income', 'Expenses']
 
    acct_seq = [account for _, account in sort_and_filter_accounts(groups, order)]
 
    yield (OPENING_BALANCE_NAME, sum(
 
        (groups[key].start_bal for key in acct_seq),
 
        MutableBalance(),
 
    ))
 
    for key in acct_seq:
 
        postings = groups[key]
 
        try:
 
            in_date_range = postings[-1].meta.date >= postings.START_DATE
 
        except IndexError:
 
            in_date_range = False
 
        if in_date_range:
 
            yield (key, groups[key].period_bal)
 
    yield (ENDING_BALANCE_NAME, sum(
 
        (groups[key].stop_bal for key in acct_seq),
 
        MutableBalance(),
 
    ))
 

	
 
def get_commodity_format(locale: babel.core.Locale,
 
                         code: str,
 
                         amount: Optional[DecimalCompat]=None,
 
                         format_type: str='accounting',
 
) -> str:
 
    """Return a format string for a commodity
 

	
 
    Typical use looks like::
 

	
 
      number, code = post.units
 
      fmt = get_commodity_format(locale, code)
 
      units_s = babel.numbers.format_currency(number, code, fmt)
 

	
 
    When the commodity code refers to a real currency, you get the same format
 
    string provided by Babel.
 

	
 
    For other commodities like stock, you get a format code built from the
 
    locale's currency unit pattern.
 

	
 
    If ``amount`` is defined, the format string will be specifically for that
 
    number, whether positive or negative. Otherwise, the format string may
 
    define both positive and negative formats.
 
    """
 
    fmt: str = locale.currency_formats[format_type].pattern
 
    if amount is not None:
 
        fmt, _, neg_fmt = fmt.partition(';')
 
        if amount < 0 and neg_fmt:
 
            fmt = neg_fmt
 
    symbol = babel.numbers.get_currency_symbol(code, locale)
 
    if symbol != code:
 
        return fmt
 
    else:
 
        long_fmt: str = babel.numbers.get_currency_unit_pattern(code, locale=locale)
 
        return re.sub(
 
            r'[#0,.\s¤]+',
 
            lambda match: long_fmt.format(
 
                match.group(0).replace('¤', '').strip(), '¤¤',
 
            ),
 
            fmt,
 
        )
 

	
 
def normalize_amount_func(account_name: str) -> Callable[[T], T]:
 
    """Get a function to normalize amounts for reporting
 

	
 
    Given an account name, return a function that can be used on "amounts"
 
    under that account (including numbers, Amount objects, and Balance objects)
 
    to normalize them for reporting. Right now that means make flipping the
 
    sign for accounts where "normal" postings are negative.
 
    """
 
    if account_name.startswith(('Assets:', 'Expenses:')):
 
        # We can't just return operator.pos because Beancount's Amount class
 
        # doesn't implement __pos__.
 
        return lambda amt: amt
 
    elif account_name.startswith(('Equity:', 'Income:', 'Liabilities:')):
 
        return operator.neg
 
    else:
 
        raise ValueError(f"unrecognized account name {account_name!r}")
 

	
 
def sort_and_filter_accounts(
 
        accounts: Iterable[data.Account],
 
        order: Sequence[str],
 
) -> Iterator[Tuple[int, data.Account]]:
 
    """Reorganize accounts based on an ordered set of names
 

	
 
    This function takes a iterable of Account objects, and a sequence of
 
    account names. Usually the account names are higher parts of the account
 
    hierarchy like Income, Equity, or Assets:Receivable.
 

	
 
    It returns an iterator of 2-tuples, ``(index, account)`` where ``index`` is
 
    an index into the ordering sequence, and ``account`` is one of the input
 
    Account objects that's under the account name ``order[index]``. Tuples are
 
    sorted, so ``index`` increases monotonically, and Account objects using the
 
    same index are yielded sorted by name.
 

	
 
    For example, if your order is
 
    ``['Liabilities:Payable', 'Assets:Receivable']``, the return value will
 
    first yield zero or more results with index 0 and an account under
 
    Liabilities:Payable, then zero or more results with index 1 and an account
 
    under Accounts:Receivable.
 

	
 
    Input Accounts that are not under any of the account names in ``order`` do
 
    not appear in the output iterator. That's the filtering part.
 

	
 
    Note that if none of the input Accounts are under one of the ordering
 
    sequence accounts, its index will never appear in the results. This is why
 
    the 2-tuples include an index rather than the original account name string,
 
    to make it easier for callers to know when this happens and do something
 
    with unused ordering accounts.
 
    """
 
    index_map = {s: ii for ii, s in enumerate(order)}
 
    retval: Mapping[int, List[data.Account]] = collections.defaultdict(list)
 
    for account in accounts:
 
        acct_key = account.is_under(*order)
 
        if acct_key is not None:
 
            retval[index_map[acct_key]].append(account)
 
    return (
 
        (key, account)
 
        for key in sorted(retval)
 
        for account in sorted(retval[key])
 
    )
tests/test_reports_fund.py
Show inline comments
...
 
@@ -4,311 +4,313 @@
 
#
 
# Full copyright and licensing details can be found at toplevel file
 
# LICENSE.txt in the repository.
 

	
 
import collections
 
import copy
 
import datetime
 
import io
 
import itertools
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
import babel.numbers
 
import odf.opendocument
 
import odf.table
 

	
 
from beancount import loader as bc_loader
 
from conservancy_beancount import data
 
from conservancy_beancount.reports import core
 
from conservancy_beancount.reports import fund
 

	
 
from decimal import Decimal
 

	
 
_ledger_load = bc_loader.load_file(testutil.test_path('books/fund.beancount'))
 
START_DATE = datetime.date(2018, 3, 1)
 
MID_DATE = datetime.date(2019, 3, 1)
 
STOP_DATE = datetime.date(2020, 3, 1)
 

	
 
EQUITY_ROOT_ACCOUNTS = ('Expenses:', 'Equity:', 'Income:')
 

	
 
OPENING_BALANCES = {
 
    'Alpha': 3000,
 
    'Bravo': 2000,
 
    'Charlie': 1000,
 
    'Conservancy': 4000,
 
    'Delta': 0,
 
}
 

	
 
BALANCES_BY_YEAR = {
 
    ('Conservancy', 2018): [
 
        ('Income:Other', 40),
 
        ('Expenses:Other', -4),
 
        ('Assets:Receivable:Accounts', 40),
 
        ('Liabilities:Payable:Accounts', 4),
 
    ],
 
    ('Conservancy', 2019): [
 
        ('Income:Other', 42),
 
        ('Expenses:Other', Decimal('-4.20')),
 
        ('Equity:Funds:Unrestricted', 100),
 
        ('Equity:Realized:CurrencyConversion', Decimal('6.20')),
 
        ('Assets:Receivable:Accounts', -40),
 
        ('Liabilities:Payable:Accounts', -4),
 
    ],
 
    ('Alpha', 2018): [
 
        ('Income:Other', 60),
 
        ('Liabilities:UnearnedIncome', 30),
 
        ('Assets:Prepaid:Expenses', 20),
 
    ],
 
    ('Alpha', 2019): [
 
        ('Income:Other', 30),
 
        ('Expenses:Other', -26),
 
        ('Assets:Prepaid:Expenses', -20),
 
        ('Liabilities:UnearnedIncome', -30),
 
    ],
 
    ('Bravo', 2018): [
 
        ('Expenses:Other', -20),
 
    ],
 
    ('Bravo', 2019): [
 
        ('Income:Other', 200),
 
    ],
 
    ('Charlie', 2019): [
 
        ('Equity:Funds:Restricted', -100),
 
    ],
 
    ('Delta', 2018): [
 
        ('Income:Other', Decimal('.40')),
 
    ],
 
    ('Delta', 2019): [
 
        ('Income:Other', Decimal('4.60')),
 
    ],
 
}
 

	
 
clean_account_meta = pytest.fixture(autouse=True)(testutil.clean_account_meta)
 

	
 
@pytest.fixture
 
def fund_entries():
 
    return copy.deepcopy(_ledger_load[0])
 

	
 
def split_text_lines(output):
 
    for line in output:
 
        account, amount = line.rsplit(None, 1)
 
        yield account.strip(), amount
 

	
 
def format_amount(amount, currency='USD'):
 
    return babel.numbers.format_currency(
 
        amount, currency, format_type='accounting',
 
    )
 

	
 
def check_text_balances(actual, expected, *expect_accounts):
 
    balance = Decimal()
 
    for expect_account in expect_accounts:
 
        expect_amount = expected[expect_account]
 
        balance += expect_amount
 
        if expect_account.startswith('Expenses:'):
 
            expect_amount *= -1
 
        if expect_amount:
 
            actual_account, actual_amount = next(actual)
 
            assert actual_account == expect_account
 
            assert actual_amount == format_amount(expect_amount)
 
    return balance
 

	
 
def check_text_report(output, project, start_date, stop_date):
 
    _, _, project = project.rpartition('=')
 
    balance_amount = Decimal(OPENING_BALANCES[project])
 
    expected = collections.defaultdict(Decimal)
 
    for year in range(2018, stop_date.year):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(project, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    balance_amount += amount
 
                else:
 
                    expected[account] += amount
 
    actual = split_text_lines(output)
 
    next(actual); next(actual)  # Discard headers
 
    open_acct, open_amt = next(actual)
 
    assert open_acct == "{} balance as of {}".format(
 
        project, start_date.isoformat(),
 
    )
 
    assert open_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Income:Other',
 
        'Expenses:Other',
 
        'Equity:Funds:Restricted',
 
        'Equity:Funds:Unrestricted',
 
        'Equity:Realized:CurrencyConversion',
 
    )
 
    next(actual)
 
    end_acct, end_amt = next(actual)
 
    assert end_acct == "{} balance as of {}".format(
 
        project, stop_date.isoformat(),
 
    )
 
    assert end_amt == format_amount(balance_amount)
 
    next(actual)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Assets:Receivable:Accounts',
 
        'Assets:Prepaid:Expenses',
 
        'Liabilities:Payable:Accounts',
 
        'Liabilities:UnearnedIncome',
 
    )
 
    assert next(actual, None) is None
 

	
 
def check_cell_balance(cell, balance):
 
    if balance:
 
        assert cell.value == balance
 
    else:
 
        assert not cell.value
 

	
 
def check_ods_sheet(sheet, account_balances, *, full):
 
    total_keys = ['opening', 'Income', 'Expenses', 'Equity']
 
    if full:
 
        account_bals = account_balances.copy()
 
        unrestricted = account_bals.pop('Conservancy')
 
        total_keys += [
 
            'Assets:Receivable',
 
            'Assets:Prepaid',
 
            'Liabilities',
 
            'Liabilities:Payable',
 
        ]
 
    else:
 
        account_bals = {
 
            key: balances
 
            for key, balances in account_balances.items()
 
            if key != 'Conservancy' and any(v >= .5 for v in balances.values())
 
        }
 
    totals = {key: Decimal() for key in total_keys}
 
    for fund, balances in account_bals.items():
 
        for key in totals:
 
            totals[key] += balances[key]
 
    account_bals[''] = totals
 
    if full:
 
        account_bals['Unrestricted'] = unrestricted
 
    for row in itertools.islice(sheet.getElementsByType(odf.table.TableRow), 4, None):
 
        cells = iter(testutil.ODSCell.from_row(row))
 
        try:
 
            fund = next(cells).firstChild.text
 
        except (AttributeError, StopIteration):
 
        except AttributeError:
 
            fund = ''
 
        except StopIteration:
 
            continue
 
        try:
 
            balances = account_bals.pop(fund)
 
        except KeyError:
 
            pytest.fail(f"report included unexpected fund {fund}")
 
        check_cell_balance(next(cells), balances['opening'])
 
        check_cell_balance(next(cells), balances['Income'])
 
        if full:
 
            check_cell_balance(next(cells), -balances['Expenses'])
 
            check_cell_balance(next(cells), balances['Equity'])
 
        else:
 
            check_cell_balance(
 
                next(cells), -sum(balances[key] for key in ['Expenses', 'Equity']),
 
            )
 
        check_cell_balance(next(cells), sum(balances[key] for key in [
 
            'opening', 'Income', 'Expenses', 'Equity',
 
        ]))
 
        if full:
 
            check_cell_balance(next(cells), balances['Assets:Receivable'])
 
            check_cell_balance(next(cells), balances['Assets:Prepaid'])
 
            check_cell_balance(next(cells), balances['Liabilities'])
 
            check_cell_balance(next(cells), balances['Liabilities:Payable'])
 
        assert next(cells, None) is None
 
        if full and fund == 'Unrestricted':
 
            assert '' not in account_bals, "Unrestricted funds reported before subtotals"
 
            for key, bal in balances.items():
 
                totals[key] += bal
 
            account_bals[''] = totals
 
    assert not account_bals, "did not see all funds in report"
 

	
 
def check_ods_report(ods, start_date, stop_date):
 
    account_bals = collections.OrderedDict((key, {
 
        'opening': Decimal(amount),
 
        'Income': Decimal(0),
 
        'Expenses': Decimal(0),
 
        'Equity': Decimal(0),
 
        'Assets:Receivable': Decimal(0),
 
        'Assets:Prepaid': Decimal(0),
 
        'Liabilities:Payable': Decimal(0),
 
        'Liabilities': Decimal(0),  # UnearnedIncome
 
    }) for key, amount in sorted(OPENING_BALANCES.items()))
 
    for fund, year in itertools.product(account_bals, range(2018, stop_date.year)):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(fund, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    if year < start_date.year:
 
                        acct_key = 'opening'
 
                    else:
 
                        acct_key, _, _ = account.partition(':')
 
                else:
 
                    acct_key, _, _ = account.rpartition(':')
 
                account_bals[fund][acct_key] += amount
 
    sheets = iter(ods.getElementsByType(odf.table.Table))
 
    check_ods_sheet(next(sheets), account_bals, full=False)
 
    check_ods_sheet(next(sheets), account_bals, full=True)
 
    assert next(sheets, None) is None, "found unexpected sheet"
 

	
 
def run_main(out_type, arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/fund.beancount'),
 
        )
 
    arglist.insert(0, '--output-file=-')
 
    output = out_type()
 
    errors = io.StringIO()
 
    retcode = fund.main(arglist, output, errors, config)
 
    output.seek(0)
 
    return retcode, output, errors
 

	
 
@pytest.mark.parametrize('project,start_date,stop_date', [
 
    ('Conservancy', START_DATE, STOP_DATE),
 
    ('project=Conservancy', MID_DATE, STOP_DATE),
 
    ('Conservancy', START_DATE, MID_DATE),
 
    ('Alpha', START_DATE, STOP_DATE),
 
    ('project=Alpha', MID_DATE, STOP_DATE),
 
    ('Alpha', START_DATE, MID_DATE),
 
    ('Bravo', START_DATE, STOP_DATE),
 
    ('project=Bravo', MID_DATE, STOP_DATE),
 
    ('Bravo', START_DATE, MID_DATE),
 
    ('project=Charlie', START_DATE, STOP_DATE),
 
])
 
def test_text_report(project, start_date, stop_date):
 
    retcode, output, errors = run_main(io.StringIO, [
 
        '-b', start_date.isoformat(), '-e', stop_date.isoformat(), project,
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_text_report(output, project, start_date, stop_date)
 

	
 
def test_text_report_empty_balances():
 
    retcode, output, errors = run_main(io.StringIO, [
 
        '-t', 'text', '-b', '2018-01-01',
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 

	
 
@pytest.mark.parametrize('start_date,stop_date', [
 
    (START_DATE, STOP_DATE),
 
    (MID_DATE, STOP_DATE),
 
    (START_DATE, MID_DATE),
 
])
 
def test_ods_report(start_date, stop_date):
 
    retcode, output, errors = run_main(io.BytesIO, [
 
        '--begin', start_date.isoformat(), '--end', stop_date.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    check_ods_report(ods, start_date, stop_date)
 

	
 
def test_main_no_postings(caplog):
 
    retcode, output, errors = run_main(io.StringIO, ['NonexistentProject'])
 
    assert retcode == 65
 
    assert any(log.levelname == 'WARNING' for log in caplog.records)
0 comments (0 inline, 0 general)