Changeset - 0e8745bf512d
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-07-15 14:14:46
brettcsmith@brettcsmith.org
ledger: Include all fund accounts in default project ledger reports.

This brings the reporting into sync with the fund report.
3 files changed with 11 insertions and 2 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/ledger.py
Show inline comments
...
 
@@ -185,334 +185,341 @@ class LedgerODS(core.BaseODS[data.Posting, data.Account]):
 
                maybe_split.get(subkey, []) + must_split_tally, sheet_size, subkey,
 
            )
 
            # We must be willing to split out at least as many sheets as there
 
            # are accounts that didn't fit. Do that first.
 
            yield from itertools.islice(split_names, len(must_split_tally))
 
            # After that, we can be in one of two cases:
 
            # 1. There is no next sheet. All the accounts, including the
 
            #    maybe_splits and must_splits, fit on planned subsheets.
 
            #    Update state to note we don't need a sheet for them anymore.
 
            # 2. The next sheet is named `subkey`, and is planned to include
 
            #    all of our maybe_split accounts. However, we don't need to
 
            #    yield that sheet name, because those accounts already fit in
 
            #    the sheet we're planning, and it would be a needless split.
 
            next_sheet_name = next(split_names, None)
 
            if next_sheet_name is None:
 
                maybe_split.pop(subkey, None)
 
            else:
 
                assert next_sheet_name == subkey
 
                assert not any(split_names)
 
        if maybe_split:
 
            yield sheet_name
 

	
 
    @classmethod
 
    def plan_sheets(
 
            cls,
 
            tally_by_account: Mapping[data.Account, int],
 
            base_sheets: Sequence[str],
 
            sheet_size: int,
 
    ) -> Sequence[str]:
 
        sorted_tally: PostTally = [
 
            (count, account)
 
            for account, count in tally_by_account.items()
 
        ]
 
        sorted_tally.sort()
 
        split_tally = cls._group_tally(
 
            sorted_tally,
 
            operator.methodcaller('is_under', *base_sheets),
 
        )
 
        return [
 
            sheet_name
 
            for key in base_sheets
 
            for sheet_name in cls._split_sheet(split_tally[key], sheet_size, key)
 
        ]
 

	
 
    def section_key(self, row: data.Posting) -> data.Account:
 
        return row.account
 

	
 
    def start_sheet(self, sheet_name: str) -> None:
 
        self.use_sheet(sheet_name.replace(':', ' '))
 
        columns_key = data.Account(sheet_name).is_under(*self.ACCOUNT_COLUMNS)
 
        # columns_key must not be None because ACCOUNT_COLUMNS has an entry
 
        # for all five root accounts.
 
        assert columns_key is not None
 
        self.metadata_columns = self.ACCOUNT_COLUMNS[columns_key]
 
        self.sheet_columns: Sequence[str] = [
 
            *self.CORE_COLUMNS,
 
            *(data.Metadata.human_name(meta_key) for meta_key in self.metadata_columns),
 
        ]
 
        for col_name in self.sheet_columns:
 
            self.sheet.addElement(odf.table.TableColumn(
 
                stylename=self.column_styles.get(col_name, self.default_column),
 
            ))
 
        self.add_row(*(
 
            self.string_cell(col_name, stylename=self.style_bold)
 
            for col_name in self.sheet_columns
 
        ))
 
        self.lock_first_row()
 

	
 
    def _report_section_balance(self, key: data.Account, date_key: str) -> None:
 
        uses_opening = key.is_under('Assets', 'Equity', 'Liabilities')
 
        related = self.account_groups[key]
 
        if date_key == 'start':
 
            if not uses_opening:
 
                return
 
            date = self.date_range.start
 
            balance = related.start_bal
 
            description = "Opening Balance"
 
        else:
 
            date = self.date_range.stop
 
            if uses_opening:
 
                balance = related.stop_bal
 
                description = "Ending Balance"
 
            else:
 
                balance = related.period_bal
 
                description = "Period Total"
 
        self.add_row(
 
            self.date_cell(date, stylename=self.merge_styles(
 
                self.style_bold, self.style_date,
 
            )),
 
            odf.table.TableCell(),
 
            self.string_cell(description, stylename=self.style_bold),
 
            odf.table.TableCell(),
 
            self.balance_cell(self.norm_func(balance), stylename=self.style_bold),
 
        )
 

	
 
    def start_section(self, key: data.Account) -> None:
 
        self.add_row()
 
        self.add_row(
 
            odf.table.TableCell(),
 
            self.string_cell(
 
                f"{key} Ledger"
 
                f" From {self.date_range.start.isoformat()}"
 
                f" To {self.date_range.stop.isoformat()}",
 
                stylename=self.style_bold,
 
                numbercolumnsspanned=len(self.sheet_columns) - 1,
 
            ),
 
        )
 
        self.norm_func = core.normalize_amount_func(key)
 
        self._report_section_balance(key, 'start')
 

	
 
    def end_section(self, key: data.Account) -> None:
 
        self._report_section_balance(key, 'stop')
 

	
 
    def write_row(self, row: data.Posting) -> None:
 
        if row.cost is None:
 
            amount_cell = odf.table.TableCell()
 
        else:
 
            amount_cell = self.currency_cell(self.norm_func(row.units))
 
        self.add_row(
 
            self.date_cell(row.meta.date),
 
            self.string_cell(row.meta.get('entity') or ''),
 
            self.string_cell(row.meta.txn.narration),
 
            amount_cell,
 
            self.currency_cell(self.norm_func(row.at_cost())),
 
            *(self.meta_links_cell(row.meta.report_links(key))
 
              if key in data.LINK_METADATA
 
              else self.string_cell(row.meta.get(key, ''))
 
              for key in self.metadata_columns),
 
        )
 

	
 
    def write_balance_sheet(self) -> None:
 
        self.use_sheet("Balance")
 
        self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(3)))
 
        self.sheet.addElement(odf.table.TableColumn(stylename=self.column_style(1.5)))
 
        self.add_row(
 
            self.string_cell("Account", stylename=self.style_bold),
 
            self.string_cell("Balance", stylename=self.style_bold),
 
        )
 
        self.lock_first_row()
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"Ledger From {self.date_range.start.isoformat()}"
 
            f" To {self.date_range.stop.isoformat()}",
 
            stylename=self.merge_styles(self.style_centertext, self.style_bold),
 
            numbercolumnsspanned=2,
 
        ))
 
        self.add_row()
 
        for account, balance in core.account_balances(self.account_groups):
 
            if account is core.OPENING_BALANCE_NAME:
 
                text = f"Balance as of {self.date_range.start.isoformat()}"
 
                style = self.merge_styles(self.style_bold, self.style_endtext)
 
            elif account is core.ENDING_BALANCE_NAME:
 
                text = f"Balance as of {self.date_range.stop.isoformat()}"
 
                style = self.merge_styles(self.style_bold, self.style_endtext)
 
            else:
 
                text = account
 
                style = self.style_endtext
 
            self.add_row(
 
                self.string_cell(text, stylename=style),
 
                self.balance_cell(-balance, stylename=style),
 
            )
 

	
 
    def write(self, rows: Iterable[data.Posting]) -> None:
 
        related_cls = core.PeriodPostings.with_start_date(self.date_range.start)
 
        self.account_groups = dict(related_cls.group_by_account(
 
            post for post in rows if post.meta.date < self.date_range.stop
 
        ))
 
        self.write_balance_sheet()
 
        tally_by_account_iter = (
 
            (account, len(related))
 
            for account, related in self.account_groups.items()
 
        )
 
        tally_by_account = {
 
            account: count
 
            for account, count in tally_by_account_iter
 
            if count
 
        }
 
        sheet_names = self.plan_sheets(
 
            tally_by_account, self.required_sheet_names, self.sheet_size,
 
        )
 
        using_sheet_index = -1
 
        for sheet_index, account in core.sort_and_filter_accounts(
 
                tally_by_account, sheet_names,
 
        ):
 
            while using_sheet_index < sheet_index:
 
                using_sheet_index += 1
 
                self.start_sheet(sheet_names[using_sheet_index])
 
            super().write(self.account_groups[account])
 
        for index in range(using_sheet_index + 1, len(sheet_names)):
 
            self.start_sheet(sheet_names[index])
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date, or 30 days from today if the start
 
date was also not specified.
 
""")
 
    parser.add_argument(
 
        '--account', '-a',
 
        dest='sheet_names',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""Show this account in the report. You can specify this option
 
multiple times. If not specified, the default set adapts to your search
 
criteria.
 
""")
 
    parser.add_argument(
 
        '--sheet-size', '--size',
 
        metavar='SIZE',
 
        type=int,
 
        default=LedgerODS.SHEET_SIZE,
 
        help="""Try to limit sheets to this many rows. The report will
 
automatically create new sheets to make this happen. When that's not possible,
 
it will issue a warning.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is `LedgerReport_<StartDate>_<StopDate>.ods`.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.sheet_names is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.sheet_names = ['Income', 'Expenses', 'Assets:Receivable', 'Liabilities:Payable']
 
            args.sheet_names = [
 
                'Income',
 
                'Expenses',
 
                'Assets:Receivable',
 
                'Assets:Prepaid',
 
                'Liabilities:UnearnedIncome',
 
                'Liabilities:Payable',
 
            ]
 
        else:
 
            args.sheet_names = list(LedgerODS.ACCOUNT_COLUMNS)
 
    return args
 

	
 
def diff_year(date: datetime.date, diff: int) -> datetime.date:
 
    new_year = date.year + diff
 
    try:
 
        return date.replace(year=new_year)
 
    except ValueError:
 
        # The original date is Feb 29, which doesn't exist in the new year.
 
        if diff < 0:
 
            return datetime.date(new_year, 2, 28)
 
        else:
 
            return datetime.date(new_year, 3, 1)
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    today = datetime.date.today()
 
    if args.start_date is None:
 
        args.start_date = diff_year(today, -1)
 
        if args.stop_date is None:
 
            args.stop_date = today + datetime.timedelta(days=30)
 
    elif args.stop_date is None:
 
        args.stop_date = diff_year(args.start_date, 1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    else:
 
        entries, load_errors, _ = books_loader.load_fy_range(args.start_date, args.stop_date)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 

	
 
    postings = data.Posting.from_entries(entries)
 
    for search_term in args.search_terms:
 
        postings = search_term.filter_postings(postings)
 

	
 
    rt_wrapper = config.rt_wrapper()
 
    if rt_wrapper is None:
 
        logger.warning("could not initialize RT client; spreadsheet links will be broken")
 
    report = LedgerODS(
 
        args.start_date,
 
        args.stop_date,
 
        args.sheet_names,
 
        rt_wrapper,
 
        args.sheet_size,
 
    )
 
    report.write(postings)
 
    if not report.account_groups:
 
        logger.warning("no matching postings found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 

	
 
    if args.output_file is None:
 
        out_dir_path = config.repository_path() or Path()
 
        args.output_file = out_dir_path / 'LedgerReport_{}_{}.ods'.format(
 
            args.start_date.isoformat(), args.stop_date.isoformat(),
 
        )
 
        logger.info("Writing report to %s", args.output_file)
 
    ods_file = cliutil.bytes_output(args.output_file, stdout)
 
    report.save_file(ods_file)
 
    return 0 if returncode == 0 else 16 + returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
setup.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.5.8',
 
    version='1.5.9',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'babel>=2.6',  # Debian:python3-babel
 
        'beancount>=2.2',  # Debian:beancount
 
        # 1.4.1 crashes when trying to save some documents.
 
        'odfpy>=1.4.0,!=1.4.1',  # Debian:python3-odf
 
        'PyYAML>=3.0',  # Debian:python3-yaml
 
        'regex',  # Debian:python3-regex
 
        'rt>=2.0',
 
    ],
 
    setup_requires=[
 
        'pytest-mypy',
 
        'pytest-runner',  # Debian:python3-pytest-runner
 
    ],
 
    tests_require=[
 
        'mypy>=0.770',  # Debian:python3-mypy
 
        'pytest',  # Debian:python3-pytest
 
    ],
 

	
 
    packages=[
 
        'conservancy_beancount',
 
        'conservancy_beancount.plugin',
 
        'conservancy_beancount.reports',
 
    ],
 
    entry_points={
 
        'console_scripts': [
 
            'accrual-report = conservancy_beancount.reports.accrual:entry_point',
 
            'fund-report = conservancy_beancount.reports.fund:entry_point',
 
            'ledger-report = conservancy_beancount.reports.ledger:entry_point',
 
            'opening-balances = conservancy_beancount.tools.opening_balances:entry_point',
 
        ],
 
    },
 
)
tests/test_reports_ledger.py
Show inline comments
 
"""test_reports_ledger.py - Unit tests for general ledger report"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import copy
 
import datetime
 
import io
 
import re
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
import odf.table
 
import odf.text
 

	
 
from beancount.core import data as bc_data
 
from beancount import loader as bc_loader
 
from conservancy_beancount import data
 
from conservancy_beancount.reports import core
 
from conservancy_beancount.reports import ledger
 

	
 
Acct = data.Account
 

	
 
_ledger_load = bc_loader.load_file(testutil.test_path('books/ledger.beancount'))
 
DEFAULT_REPORT_SHEETS = [
 
    'Balance',
 
    'Income',
 
    'Expenses',
 
    'Equity',
 
    'Assets:Receivable',
 
    'Liabilities:Payable',
 
    'Assets:PayPal',
 
    'Assets',
 
    'Liabilities',
 
]
 
PROJECT_REPORT_SHEETS = [
 
    'Balance',
 
    'Income',
 
    'Expenses',
 
    'Assets:Receivable',
 
    'Assets:Prepaid',
 
    'Liabilities:UnearnedIncome',
 
    'Liabilities:Payable',
 
]
 
OVERSIZE_RE = re.compile(
 
    r'^([A-Za-z0-9:]+) has ([0-9,]+) rows, over size ([0-9,]+)$'
 
)
 
START_DATE = datetime.date(2018, 3, 1)
 
MID_DATE = datetime.date(2019, 3, 1)
 
STOP_DATE = datetime.date(2020, 3, 1)
 

	
 
@pytest.fixture
 
def ledger_entries():
 
    return copy.deepcopy(_ledger_load[0])
 

	
 
class NotFound(Exception): pass
 
class NoSheet(NotFound): pass
 
class NoHeader(NotFound): pass
 

	
 
class ExpectedPostings(core.RelatedPostings):
 
    def slice_date_range(self, start_date, end_date):
 
        postings = enumerate(self)
 
        for start_index, post in postings:
 
            if start_date <= post.meta.date:
 
                break
 
        else:
 
            start_index += 1
 
        if end_date <= post.meta.date:
 
            end_index = start_index
 
        else:
 
            for end_index, post in postings:
 
                if end_date <= post.meta.date:
 
                    break
 
            else:
 
                end_index = None
 
        return (self[:start_index].balance_at_cost(),
 
                self[start_index:end_index])
 

	
 
    def check_report(self, ods, start_date, end_date):
 
        account = self[0].account
 
        norm_func = core.normalize_amount_func(account)
 
        open_bal, expect_posts = self.slice_date_range(start_date, end_date)
 
        open_bal = norm_func(open_bal)
 
        for sheet in ods.getElementsByType(odf.table.Table):
 
            sheet_account = sheet.getAttribute('name').replace(' ', ':')
 
            if sheet_account and account.is_under(sheet_account):
 
                break
 
        else:
 
            raise NoSheet(account)
 
        rows = iter(sheet.getElementsByType(odf.table.TableRow))
 
        for row in rows:
 
            cells = row.childNodes
 
            if len(cells) == 2 and cells[-1].text.startswith(f'{account} '):
 
                break
 
        else:
 
            if expect_posts:
 
                raise NoHeader(account)
 
            else:
 
                return
 
        closing_bal = norm_func(expect_posts.balance_at_cost())
 
        if account.is_under('Assets', 'Equity', 'Liabilities'):
 
            opening_row = testutil.ODSCell.from_row(next(rows))
 
            assert opening_row[0].value == start_date
 
            assert opening_row[4].text == open_bal.format(None, empty='0', sep='\0')
 
            closing_bal += open_bal
 
        for expected in expect_posts:
 
            cells = iter(testutil.ODSCell.from_row(next(rows)))
 
            assert next(cells).value == expected.meta.date
 
            assert next(cells).text == (expected.meta.get('entity') or '')
 
            assert next(cells).text == (expected.meta.txn.narration or '')
 
            if expected.cost is None:
 
                assert not next(cells).text
 
                assert next(cells).value == norm_func(expected.units.number)
 
            else:
 
                assert next(cells).value == norm_func(expected.units.number)
 
                assert next(cells).value == norm_func(expected.at_cost().number)
 
        closing_row = testutil.ODSCell.from_row(next(rows))
 
        assert closing_row[0].value == end_date
 
        assert closing_row[4].text == closing_bal.format(None, empty='$0.00', sep='\0')
 

	
 

	
 
def get_sheet_names(ods):
 
    return [sheet.getAttribute('name').replace(' ', ':')
 
            for sheet in ods.getElementsByType(odf.table.Table)]
 

	
 
def check_oversize_logs(caplog, accounts, sheet_size):
 
    actual = {}
 
    for log in caplog.records:
 
        match = OVERSIZE_RE.match(log.message)
 
        if match:
 
            assert int(match.group(3).replace(',', '')) == sheet_size
 
            actual[match.group(1)] = int(match.group(2).replace(',', ''))
 
    expected = {name: size for name, size in accounts.items() if size > sheet_size}
 
    assert actual == expected
 

	
 
def test_plan_sheets_no_change():
 
    have = {
 
        Acct('Assets:Cash'): 10,
 
        Acct('Income:Donations'): 20,
 
    }
 
    want = ['Assets', 'Income']
 
    actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100)
 
    assert actual == want
 

	
 
@pytest.mark.parametrize('have', [
 
    {},
 
    {Acct('Income:Other'): 10},
 
    {Acct('Assets:Checking'): 20, Acct('Expenses:Other'): 15},
 
])
 
def test_plan_sheets_includes_accounts_without_transactions(have):
 
    want = ['Assets', 'Income', 'Expenses']
 
    actual = ledger.LedgerODS.plan_sheets(have, want.copy(), 100)
 
    assert actual == want
 

	
 
def test_plan_sheets_single_split():
 
    have = {
 
        Acct('Assets:Cash'): 60,
 
        Acct('Assets:Checking'): 80,
 
        Acct('Income:Donations'): 50,
 
        Acct('Expenses:Travel'): 90,
 
        Acct('Expenses:FilingFees'): 25,
 
    }
 
    want = ['Assets', 'Income', 'Expenses']
 
    actual = ledger.LedgerODS.plan_sheets(have, want, 100)
 
    assert actual == [
 
        'Assets:Checking',
 
        'Assets',
 
        'Income',
 
        'Expenses:Travel',
 
        'Expenses',
 
    ]
 

	
 
def test_plan_sheets_split_subtree():
 
    have = {
 
        Acct('Assets:Bank1:Checking'): 80,
 
        Acct('Assets:Bank1:Savings'): 10,
 
        Acct('Assets:Cash:USD'): 20,
 
        Acct('Assets:Cash:EUR'): 15,
 
    }
 
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
 
    assert actual == ['Assets:Bank1', 'Assets']
 

	
 
def test_plan_sheets_ambiguous_split():
 
    have = {
 
        Acct('Assets:Bank1:Checking'): 80,
 
        Acct('Assets:Bank1:Savings'): 40,
 
        Acct('Assets:Receivable:Accounts'): 40,
 
        Acct('Assets:Cash'): 10,
 
    }
 
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
 
    # :Savings cannot fit with :Checking, so it's important that the return
 
    # value disambiguate that.
 
    assert actual == ['Assets:Bank1:Checking', 'Assets']
 

	
 
def test_plan_sheets_oversize(caplog):
 
    have = {
 
        Acct('Assets:Checking'): 150,
 
        Acct('Assets:Cash'): 50,
 
    }
 
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
 
    assert actual == ['Assets:Checking', 'Assets']
 
    check_oversize_logs(caplog, have, 100)
 

	
 
def test_plan_sheets_all_oversize(caplog):
 
    have = {
 
        Acct('Assets:Checking'): 150,
 
        Acct('Assets:Cash'): 150,
 
    }
 
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
 
    # In this case, each account should appear in alphabetical order.
 
    assert actual == ['Assets:Cash', 'Assets:Checking']
 
    check_oversize_logs(caplog, have, 100)
 

	
 
def test_plan_sheets_full_split_required(caplog):
 
    have = {
 
        Acct('Assets:Bank:Savings'): 98,
 
        Acct('Assets:Bank:Checking'): 96,
 
        Acct('Assets:Bank:Investment'): 94,
 
    }
 
    actual = ledger.LedgerODS.plan_sheets(have, ['Assets'], 100)
 
    assert actual == ['Assets:Bank:Checking', 'Assets:Bank:Savings', 'Assets']
 
    assert not caplog.records
 

	
 
@pytest.mark.parametrize('start_date,stop_date', [
 
    (START_DATE, STOP_DATE),
 
    (START_DATE, MID_DATE),
 
    (MID_DATE, STOP_DATE),
 
    (START_DATE.replace(month=6), START_DATE.replace(month=12)),
 
    (STOP_DATE, STOP_DATE.replace(month=12)),
 
])
 
def test_date_range_report(ledger_entries, start_date, stop_date):
 
    postings = list(data.Posting.from_entries(ledger_entries))
 
    report = ledger.LedgerODS(start_date, stop_date)
 
    report.write(iter(postings))
 
    for _, expected in ExpectedPostings.group_by_account(postings):
 
        expected.check_report(report.document, start_date, stop_date)
 

	
 
@pytest.mark.parametrize('sheet_names', [
 
    ('Income', 'Expenses'),
 
    ('Assets:Receivable', 'Liabilities:Payable'),
 
])
 
def test_account_names_report(ledger_entries, sheet_names):
 
    postings = list(data.Posting.from_entries(ledger_entries))
 
    report = ledger.LedgerODS(START_DATE, STOP_DATE, sheet_names=sheet_names)
 
    report.write(iter(postings))
 
    for key, expected in ExpectedPostings.group_by_account(postings):
 
        should_find = key.startswith(sheet_names)
 
        try:
 
            expected.check_report(report.document, START_DATE, STOP_DATE)
 
        except NotFound:
 
            assert not should_find
 
        else:
 
            assert should_find
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/ledger.beancount'),
 
            rt_client=testutil.RTClient(),
 
        )
 
    arglist.insert(0, '--output-file=-')
 
    output = io.BytesIO()
 
    errors = io.StringIO()
 
    retcode = ledger.main(arglist, output, errors, config)
 
    output.seek(0)
 
    return retcode, output, errors
 

	
 
def test_main(ledger_entries):
 
    retcode, output, errors = run_main([
 
        '-b', START_DATE.isoformat(),
 
        '-e', STOP_DATE.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:]
 
    postings = data.Posting.from_entries(ledger_entries)
 
    for _, expected in ExpectedPostings.group_by_account(postings):
 
        expected.check_report(ods, START_DATE, STOP_DATE)
 

	
 
@pytest.mark.parametrize('project,start_date,stop_date', [
 
    ('eighteen', START_DATE, MID_DATE.replace(day=30)),
 
    ('nineteen', MID_DATE, STOP_DATE),
 
])
 
def test_main_project_report(ledger_entries, project, start_date, stop_date):
 
    postings = data.Posting.from_entries(ledger_entries)
 
    for key, related in ExpectedPostings.group_by_meta(postings, 'project'):
 
        if key == project:
 
            break
 
    assert key == project
 
    retcode, output, errors = run_main([
 
        f'--begin={start_date.isoformat()}',
 
        f'--end={stop_date.isoformat()}',
 
        project,
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
0 comments (0 inline, 0 general)