Changeset - 461536453835
[Not reviewed]
0 8 0
Brett Smith - 4 years ago 2020-07-27 14:54:04
brettcsmith@brettcsmith.org
cliutil: Add ReturnFlag.

Take this opportunity to re-standardize flag values now that it's clear
what's most common.
8 files changed with 37 insertions and 38 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/cliutil.py
Show inline comments
 
"""cliutil - Utilities for CLI tools"""
 
PKGNAME = 'conservancy_beancount'
 
LICENSE = """
 
Copyright © 2020  Brett Smith
 

	
 
This program is free software: you can redistribute it and/or modify
 
it under the terms of the GNU Affero General Public License as published by
 
the Free Software Foundation, either version 3 of the License, or
 
(at your option) any later version.
 

	
 
This program is distributed in the hope that it will be useful,
 
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
GNU Affero General Public License for more details.
 

	
 
You should have received a copy of the GNU Affero General Public License
 
along with this program.  If not, see <https://www.gnu.org/licenses/>."""
 

	
 
import argparse
 
import datetime
 
import enum
 
import io
 
import logging
 
import operator
 
import os
 
import pkg_resources
 
import re
 
import signal
 
import sys
 
import traceback
 
import types
 

	
 
from pathlib import Path
 

	
 
from . import data
 
from . import filters
 
from . import rtutil
 

	
 
from typing import (
 
    cast,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    IO,
 
    Iterable,
 
    NamedTuple,
 
    NoReturn,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Type,
 
    Union,
 
)
 
from .beancount_types import (
 
    MetaKey,
 
)
 

	
 
OutputFile = Union[int, IO]
 

	
 
STDSTREAM_PATH = Path('-')
 
VERSION = pkg_resources.require(PKGNAME)[0].version
 

	
 
class ExceptHook:
 
    def __init__(self,
 
                 logger: Optional[logging.Logger]=None,
 
                 default_exitcode: int=3,
 
    ) -> None:
 
        if logger is None:
 
            logger = logging.getLogger()
 
        self.logger = logger
 
        self.default_exitcode = default_exitcode
 

	
 
    def __call__(self,
 
                 exc_type: Type[BaseException],
 
                 exc_value: BaseException,
 
                 exc_tb: types.TracebackType,
 
    ) -> NoReturn:
 
        exitcode = self.default_exitcode
 
        if isinstance(exc_value, KeyboardInterrupt):
 
            signal.signal(signal.SIGINT, signal.SIG_DFL)
 
            os.kill(0, signal.SIGINT)
 
            signal.pause()
 
        elif isinstance(exc_value, OSError):
 
            exitcode += 1
 
            msg = "I/O error: {e.filename}: {e.strerror}".format(e=exc_value)
 
        else:
 
            parts = [type(exc_value).__name__, *exc_value.args]
 
            msg = "internal " + ": ".join(parts)
 
        self.logger.critical(msg)
 
        self.logger.debug(
 
            ''.join(traceback.format_exception(exc_type, exc_value, exc_tb)),
 
        )
 
        raise SystemExit(exitcode)
 

	
 

	
 
class InfoAction(argparse.Action):
 
    def __call__(self,
 
                 parser: argparse.ArgumentParser,
 
                 namespace: argparse.Namespace,
 
                 values: Union[Sequence[Any], str, None]=None,
 
                 option_string: Optional[str]=None,
 
    ) -> NoReturn:
 
        if isinstance(self.const, str):
 
            info = self.const
 
            exitcode = 0
 
        else:
 
            info, exitcode = self.const
 
        print(info)
 
        raise SystemExit(exitcode)
 

	
 

	
 
class LogLevel(enum.IntEnum):
 
    DEBUG = logging.DEBUG
 
    INFO = logging.INFO
 
    WARNING = logging.WARNING
 
    ERROR = logging.ERROR
 
    CRITICAL = logging.CRITICAL
 
    WARN = WARNING
 
    ERR = ERROR
 
    CRIT = CRITICAL
 

	
 
    @classmethod
 
    def from_arg(cls, arg: str) -> int:
 
        try:
 
            return cls[arg.upper()].value
 
        except KeyError:
 
            raise ValueError(f"unknown loglevel {arg!r}") from None
 

	
 
    @classmethod
 
    def choices(cls) -> Iterable[str]:
 
        for level in sorted(cls, key=operator.attrgetter('value')):
 
            yield level.name.lower()
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    """Common return codes for tools
 

	
 
    Tools should combine these flags to report different errors, and then use
 
    ReturnFlag.returncode(flags) to report their final exit status code.
 

	
 
    Values 1, 2, 4, and 8 should be reserved for this class to be shared across
 
    all tools. Flags 16, 32, and 64 are available for tools to report their own
 
    specific errors.
 
    """
 
    LOAD_ERRORS = 1
 
    NOTHING_TO_REPORT = 2
 
    _RESERVED4 = 4
 
    _RESERVED8 = 8
 

	
 
    @classmethod
 
    def returncode(cls, flags: int) -> int:
 
        return 0 if flags == 0 else 16 + flags
 

	
 

	
 
class SearchTerm(NamedTuple):
 
    """NamedTuple representing a user's metadata filter
 

	
 
    SearchTerm knows how to parse and store posting metadata filters provided
 
    by the user in `key=value` format. Reporting tools can use this to filter
 
    postings that match the user's criteria, to report on subsets of the books.
 

	
 
    Typical usage looks like::
 

	
 
      argument_parser.add_argument(
 
        'search_terms',
 
        type=SearchTerm.arg_parser(),
 
        …,
 
      )
 

	
 
      args = argument_parser.parse_args(…)
 
      for query in args.search_terms:
 
        postings = query.filter_postings(postings)
 
    """
 
    meta_key: MetaKey
 
    pattern: str
 

	
 
    @classmethod
 
    def arg_parser(cls,
 
                   default_key: Optional[str]=None,
 
                   ticket_default_key: Optional[str]=None,
 
    ) -> Callable[[str], 'SearchTerm']:
 
        """Build a SearchTerm parser
 

	
 
        This method returns a function that can parse strings in ``key=value``
 
        format and return a corresponding SearchTerm.
 

	
 
        If you specify a default key, then strings that just specify a ``value``
 
        will be parsed as if they said ``default_key=value``. Otherwise,
 
        parsing strings without a metadata key will raise a ValueError.
 

	
 
        If you specify a default key ticket links, then values in the format
 
        ``number``, ``rt:number``, or ``rt://ticket/number`` will be parsed as
 
        if they said ``ticket_default_key=value``.
 
        """
 
        if ticket_default_key is None:
 
            ticket_default_key = default_key
 
        def parse_search_term(arg: str) -> 'SearchTerm':
 
            key: Optional[str] = None
 
            if re.match(r'^[a-z][-\w]*=', arg):
 
                key, _, raw_link = arg.partition('=')
 
            else:
 
                raw_link = arg
 
            rt_ids = rtutil.RT.parse(raw_link)
 
            if rt_ids is None:
 
                rt_ids = rtutil.RT.parse('rt:' + raw_link)
 
            if rt_ids is None:
 
                if key is None:
 
                    key = default_key
 
                pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
 
            else:
 
                ticket_id, attachment_id = rt_ids
 
                if key is None:
 
                    if attachment_id is None:
 
                        key = ticket_default_key
 
                    else:
 
                        key = default_key
 
                pattern = rtutil.RT.metadata_regexp(
 
                    ticket_id,
 
                    attachment_id,
 
                    first_link_only=key == 'rt-id' and attachment_id is None,
 
                )
 
            if key is None:
 
                raise ValueError(f"invalid search term {arg!r}: no metadata key")
 
            return cls(key, pattern)
 
        return parse_search_term
 

	
 
    def filter_postings(self, postings: Iterable[data.Posting]) -> Iterable[data.Posting]:
 
        return filters.filter_meta_match(
 
            postings, self.meta_key, re.compile(self.pattern),
 
        )
 

	
 

	
 
def add_loglevel_argument(parser: argparse.ArgumentParser,
 
                          default: LogLevel=LogLevel.INFO) -> argparse.Action:
 
    return parser.add_argument(
 
        '--loglevel',
 
        metavar='LEVEL',
 
        default=default.value,
 
        type=LogLevel.from_arg,
 
        help="Show logs at this level and above."
 
        f" Specify one of {', '.join(LogLevel.choices())}."
 
        f" Default {default.name.lower()}.",
 
    )
 

	
 
def add_version_argument(parser: argparse.ArgumentParser) -> argparse.Action:
 
    progname = parser.prog or sys.argv[0]
 
    return parser.add_argument(
 
        '--version', '--copyright', '--license',
 
        action=InfoAction,
 
        nargs=0,
 
        const=f"{progname} version {VERSION}\n{LICENSE}",
 
        help="Show program version and license information",
 
    )
 

	
 
def date_arg(arg: str) -> datetime.date:
 
    return datetime.datetime.strptime(arg, '%Y-%m-%d').date()
 

	
 
def year_or_date_arg(arg: str) -> Union[int, datetime.date]:
 
    """Get either a date or a year (int) from an argument string
 

	
 
    This is a useful argument type for arguments that will be passed into
 
    Books loader methods which can accept either a fiscal year or a full date.
 
    """
 
    try:
 
        year = int(arg, 10)
 
    except ValueError:
 
        ok = False
 
    else:
 
        ok = datetime.MINYEAR <= year <= datetime.MAXYEAR
 
    if ok:
 
        return year
 
    else:
 
        return date_arg(arg)
 

	
 
def make_entry_point(mod_name: str, prog_name: str=sys.argv[0]) -> Callable[[], int]:
 
    """Create an entry_point function for a tool
 

	
 
    The returned function is suitable for use as an entry_point in setup.py.
 
    It sets up the root logger and excepthook, then calls the module's main
 
    function.
 
    """
 
    def entry_point():  # type:ignore
 
        prog_mod = sys.modules[mod_name]
 
        setup_logger()
 
        prog_mod.logger = logging.getLogger(prog_name)
 
        sys.excepthook = ExceptHook(prog_mod.logger)
 
        return prog_mod.main()
 
    return entry_point
 

	
 
def setup_logger(logger: Union[str, logging.Logger]='',
 
                 stream: TextIO=sys.stderr,
 
                 fmt: str='%(name)s: %(levelname)s: %(message)s',
 
) -> logging.Logger:
 
    """Set up a logger with a StreamHandler with the given format"""
 
    if isinstance(logger, str):
 
        logger = logging.getLogger(logger)
 
    formatter = logging.Formatter(fmt)
 
    handler = logging.StreamHandler(stream)
 
    handler.setFormatter(formatter)
 
    logger.addHandler(handler)
 
    return logger
 

	
 
def set_loglevel(logger: logging.Logger, loglevel: int=logging.INFO) -> None:
 
    """Set the loglevel for a tool or module
 

	
 
    If the given logger is not under a hierarchy, this function sets the
 
    loglevel for the root logger, along with some specific levels for libraries
 
    used by reporting tools. Otherwise, it's the same as
 
    ``logger.setLevel(loglevel)``.
 
    """
 
    if '.' not in logger.name:
 
        logger = logging.getLogger()
 
        if loglevel <= logging.DEBUG:
 
            # At the debug level, the rt module logs the full body of every
 
            # request and response. That's too much.
 
            logging.getLogger('rt.rt').setLevel(logging.INFO)
 
    logger.setLevel(loglevel)
 

	
 
def bytes_output(path: Optional[Path]=None,
 
                 default: OutputFile=sys.stdout,
 
                 mode: str='w',
 
) -> BinaryIO:
 
    """Get a file-like object suitable for binary output
 

	
 
    If ``path`` is ``None`` or ``-``, returns a file-like object backed by
 
    ``default``. If ``default`` is a file descriptor or text IO object, this
 
    method returns a file-like object that writes to the same place.
 

	
 
    Otherwise, returns ``path.open(mode)``.
 
    """
 
    mode = f'{mode}b'
 
    if path is None or path == STDSTREAM_PATH:
 
        if isinstance(default, int):
 
            retval = open(default, mode)
 
        elif isinstance(default, TextIO):
 
            retval = default.buffer
 
        else:
 
            retval = default
 
    else:
 
        retval = path.open(mode)
 
    return cast(BinaryIO, retval)
 

	
 
def text_output(path: Optional[Path]=None,
 
                default: OutputFile=sys.stdout,
 
                mode: str='w',
 
                encoding: Optional[str]=None,
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -446,348 +446,341 @@ class OutgoingReport(BaseReport):
 
        e_check = echeck
 
        paypal = 'PayPal'
 
        pay_pal = paypal
 
        vendorportal = 'Vendor Portal'
 
        vendor_portal = vendorportal
 
        wire = 'Wire'
 
        fxwire = wire
 
        fx_wire = fxwire
 
        uswire = wire
 
        us_wire = uswire
 

	
 

	
 
    def __init__(self, rt_wrapper: rtutil.RT, out_file: TextIO) -> None:
 
        super().__init__(out_file)
 
        self.rt_wrapper = rt_wrapper
 
        self.rt_client = rt_wrapper.rt
 

	
 
    def _primary_rt_id(self, posts: AccrualPostings) -> rtutil.TicketAttachmentIds:
 
        rt_ids = posts.first_meta_links('rt-id')
 
        rt_id = next(rt_ids, None)
 
        rt_id2 = next(rt_ids, None)
 
        if rt_id is None:
 
            raise ValueError("no rt-id links found")
 
        elif rt_id2 is not None:
 
            raise ValueError("multiple rt-id links found")
 
        parsed = rtutil.RT.parse(rt_id)
 
        if parsed is None:
 
            raise ValueError("rt-id is not a valid RT reference")
 
        else:
 
            return parsed
 

	
 
    def _get_payment_method(self, posts: AccrualPostings, ticket_id: str) -> Optional[str]:
 
        payment_methods = posts.meta_values('payment-method')
 
        payment_methods.discard(None)
 
        if all(isinstance(s, str) for s in payment_methods):
 
            # type ignore for <https://github.com/python/mypy/issues/7853>
 
            payment_methods = {s.strip().lower() for s in payment_methods}  # type:ignore[union-attr]
 
        log_prefix = f"cannot set payment-method for rt:{ticket_id}:"
 
        payment_method_count = len(payment_methods)
 
        if payment_method_count != 1:
 
            self.logger.warning("%s %s metadata values found",
 
                                log_prefix, payment_method_count)
 
            return None
 
        payment_method = payment_methods.pop()
 
        if not isinstance(payment_method, str):
 
            self.logger.warning("%s %r is not a string value",
 
                                log_prefix, payment_method)
 
            return None
 
        try:
 
            currency, method_key = payment_method.split(None, 1)
 
        except ValueError:
 
            self.logger.warning("%s no method specified in %r",
 
                                log_prefix, payment_method)
 
            return None
 
        curr_match = re.fullmatch(r'[a-z]{3}', currency)
 
        if curr_match is None:
 
            self.logger.warning("%s invalid currency %r",
 
                                log_prefix, currency)
 
        try:
 
            method_enum = self.PaymentMethods[re.sub(r'[- ]', '_', method_key)]
 
        except KeyError:
 
            self.logger.warning("%s invalid method %r",
 
                                log_prefix, method_key)
 
            curr_match = None
 
        if curr_match is None:
 
            return None
 
        else:
 
            return f'{currency.upper()} {method_enum.value}'
 

	
 
    def _report(self, posts: AccrualPostings, index: int) -> Iterable[str]:
 
        try:
 
            ticket_id, _ = self._primary_rt_id(posts)
 
            ticket = self.rt_client.get_ticket(ticket_id)
 
            # Note we only use this when ticket is None.
 
            errmsg = f"ticket {ticket_id} not found"
 
        except (ValueError, rt.RtError) as error:
 
            ticket = None
 
            errmsg = error.args[0]
 
        if ticket is None:
 
            meta = posts[0].meta
 
            self.logger.error(
 
                "can't generate outgoings report for %s %s %s because no RT ticket available: %s",
 
                meta.date.isoformat(),
 
                meta.get('entity', '<no entity>'),
 
                meta.get('invoice', '<no invoice>'),
 
                errmsg,
 
            )
 
            return
 

	
 
        try:
 
            rt_requestor = self.rt_client.get_user(ticket['Requestors'][0])
 
        except (IndexError, rt.RtError):
 
            rt_requestor = None
 
        if rt_requestor is None:
 
            requestor = ''
 
            requestor_name = ''
 
        else:
 
            requestor_name = (
 
                rt_requestor.get('RealName')
 
                or ticket.get('CF.{payment-to}')
 
                or ''
 
            )
 
            requestor = f'{requestor_name} <{rt_requestor["EmailAddress"]}>'.strip()
 

	
 
        last_zero_index = -1
 
        for index, (post, balance) in enumerate(posts.iter_with_balance()):
 
            if balance.is_zero():
 
                prior_zero_index = last_zero_index
 
                last_zero_index = index
 
        if last_zero_index == index:
 
            last_zero_index = prior_zero_index
 
        posts = posts[last_zero_index + 1:]
 

	
 
        balance = -posts.balance_at_cost()
 
        balance_s = balance.format(None)
 
        raw_balance = -posts.balance()
 
        payment_amount = raw_balance.format('¤¤ #,##0.00')
 
        if raw_balance != balance:
 
            payment_amount += f' ({balance_s})'
 
            balance_s = f'{raw_balance} ({balance_s})'
 

	
 
        payment_to = ticket.get('CF.{payment-to}') or requestor_name
 
        contract_links = list(posts.all_meta_links('contract'))
 
        if contract_links:
 
            contract_s = ' , '.join(self.rt_wrapper.iter_urls(
 
                contract_links, missing_fmt='<BROKEN RT LINK: {}>',
 
            ))
 
        else:
 
            contract_s = "NO CONTRACT GOVERNS THIS TRANSACTION"
 
        projects = [v for v in posts.meta_values('project')
 
                    if isinstance(v, str)]
 

	
 
        yield "PAYMENT FOR APPROVAL:"
 
        yield f"REQUESTOR: {requestor}"
 
        yield f"PAYMENT TO: {payment_to}"
 
        yield f"TOTAL TO PAY: {balance_s}"
 
        yield f"AGREEMENT: {contract_s}"
 
        yield f"PROJECT: {', '.join(projects)}"
 
        yield "\nBEANCOUNT ENTRIES:\n"
 

	
 
        last_txn: Optional[Transaction] = None
 
        for post in posts:
 
            txn = post.meta.txn
 
            if txn is not last_txn:
 
                last_txn = txn
 
                txn = self.rt_wrapper.txn_with_urls(txn, '{}')
 
                # Suppress payment-method metadata from the report.
 
                txn.meta.pop('payment-method', None)
 
                for txn_post in txn.postings:
 
                    if txn_post.meta:
 
                        txn_post.meta.pop('payment-method', None)
 
                yield bc_printer.format_entry(txn)
 

	
 
        cf_targets = {
 
            'payment-amount': payment_amount,
 
            'payment-method': (self._get_payment_method(posts, ticket_id)
 
                               or ticket.get('CF.{payment-method}')),
 
            'payment-to': payment_to,
 
        }
 

	
 
        cf_updates = {
 
            f'CF_{key}': value
 
            for key, value in cf_targets.items()
 
            if ticket.get(f'CF.{{{key}}}') != value
 
        }
 
        if cf_updates:
 
            try:
 
                ok = self.rt_client.edit_ticket(ticket_id, **cf_updates)
 
            except rt.RtError:
 
                self.logger.debug("RT exception on edit_ticket", exc_info=True)
 
                ok = False
 
            if not ok:
 
                self.logger.warning("failed to set custom fields for rt:%s", ticket_id)
 

	
 

	
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    # 2 was used in the past, it can probably be reclaimed.
 
    REPORT_ERRORS = 4
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, one of `aging`, `balance`, or
 
`outgoing`. If not specified, the default is `aging` when no search terms are
 
given, `outgoing` for search terms that return a single outstanding payable,
 
and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
 
        type=int,
 
        default=0,
 
        help="""How far back to search the books for related transactions.
 
You can either specify a fiscal year, or a negative offset from the current
 
fiscal year, to start loading entries from. The default is to load the current,
 
unaudited books.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for the balance and outgoing reports, and a generated
 
filename for other reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None and not any(
 
            term.meta_key == 'invoice' or term.meta_key == 'rt-id'
 
            for term in args.search_terms
 
    ):
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    else:
 
        load_since = None if args.report_type == ReportType.AGING else args.since
 
        entries, load_errors, _ = books_loader.load_all(load_since)
 
    filters.remove_opening_balance_txn(entries)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
        returncode |= cliutil.ReturnFlag.LOAD_ERRORS
 

	
 
    postings = list(filter_search(
 
        data.Posting.from_entries(entries), args.search_terms,
 
    ))
 
    if not postings:
 
        logger.warning("no matching entries found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 
        returncode |= cliutil.ReturnFlag.NOTHING_TO_REPORT
 
    # groups is a mapping of metadata value strings to AccrualPostings.
 
    # The keys are basically arbitrary, the report classes don't rely on them,
 
    # but they do help symbolize what's being grouped.
 
    # For the outgoing approval report, groups maps rt-id link strings to
 
    # associated accruals.
 
    # For all other reports, groups comes from AccrualReport.make_consistent().
 
    groups: PostGroups
 
    if args.report_type is None or args.report_type is ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.group_by_first_meta_link(postings, 'rt-id'))
 
        if args.report_type is None and len(groups) == 1:
 
            key = next(iter(groups))
 
            group = groups[key]
 
            account = group[0].account
 
            if (AccrualAccount.by_account(account) is AccrualAccount.PAYABLE
 
                and all(post.account == account for post in group)
 
                and not group.balance().ge_zero()
 
                and key):  # Make sure we have a usable rt-id
 
                args.report_type = ReportType.OUTGOING
 
    if args.report_type is not ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.make_consistent(postings))
 
    if args.report_type is not ReportType.AGING:
 
        groups = {
 
            key: posts for key, posts in groups.items() if not posts.is_paid()
 
        } or groups
 
    del postings
 

	
 
    report: Optional[BaseReport] = None
 
    output_path: Optional[Path] = None
 
    if args.report_type is ReportType.AGING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate aging report: RT client is required")
 
        else:
 
            now = datetime.datetime.now()
 
            if args.output_file is None:
 
                out_dir_path = config.repository_path() or Path()
 
                args.output_file = out_dir_path / now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods')
 
                logger.info("Writing report to %s", args.output_file)
 
            out_bin = cliutil.bytes_output(args.output_file, stdout)
 
            report = AgingReport(rt_wrapper, out_bin)
 
    elif args.report_type is ReportType.OUTGOING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate outgoing report: RT client is required")
 
        else:
 
            out_file = cliutil.text_output(args.output_file, stdout)
 
            report = OutgoingReport(rt_wrapper, out_file)
 
    else:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = BalanceReport(out_file)
 

	
 
    if report is None:
 
        returncode |= ReturnFlag.REPORT_ERRORS
 
        returncode |= 16
 
    else:
 
        report.run(groups)
 
    return 0 if returncode == 0 else 16 + returncode
 
    return cliutil.ReturnFlag.returncode(returncode)
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/reports/fund.py
Show inline comments
...
 
@@ -94,334 +94,329 @@ UNRESTRICTED_FUND = 'Conservancy'
 
logger = logging.getLogger('conservancy_beancount.reports.fund')
 

	
 
class ODSReport(core.BaseODS[FundPosts, None]):
 
    def __init__(self, start_date: datetime.date, stop_date: datetime.date) -> None:
 
        super().__init__()
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 

	
 
    def section_key(self, row: FundPosts) -> None:
 
        return None
 

	
 
    def start_spreadsheet(self, *, expanded: bool=True) -> None:
 
        headers = [["Fund"], ["Balance as of", self.start_date.isoformat()]]
 
        if expanded:
 
            sheet_name = "With Breakdowns"
 
            headers += [["Income"], ["Expenses"], ["Equity"]]
 
        else:
 
            sheet_name = "Fund Report"
 
            headers += [["Additions"], ["Releases from", "Restrictions"]]
 
        headers.append(["Balance as of", self.stop_date.isoformat()])
 
        if expanded:
 
            headers += [
 
                ["Of which", "Receivable"],
 
                ["Of which", "Prepaid Expenses"],
 
                ["Of which", "Payable"],
 
                ["Of which", "Unearned Income"],
 
            ]
 

	
 
        self.use_sheet(sheet_name)
 
        for header in headers:
 
            first_line = header[0]
 
            if first_line == 'Fund':
 
                width = 2.0
 
            elif first_line == 'Balance as of':
 
                width = 1.5
 
            elif first_line == 'Of which':
 
                width = 1.3
 
            else:
 
                width = 1.2
 
            col_style = self.column_style(width)
 
            self.sheet.addElement(odf.table.TableColumn(stylename=col_style))
 

	
 
        center_bold = self.merge_styles(self.style_centertext, self.style_bold)
 
        row = self.add_row(*(
 
            self.multiline_cell(header, stylename=center_bold)
 
            for header in headers
 
        ))
 
        row.firstChild.setAttribute(
 
            'stylename', self.merge_styles(self.style_endtext, self.style_bold),
 
        )
 
        self.lock_first_row()
 
        self.lock_first_column()
 
        self.add_row()
 
        self.add_row(self.string_cell(
 
            f"Fund Report From {self.start_date.isoformat()} To {self.stop_date.isoformat()}",
 
            stylename=center_bold,
 
            numbercolumnsspanned=6,
 
        ))
 
        self.add_row()
 

	
 
    def end_spreadsheet(self) -> None:
 
        start_sheet = self.sheet
 
        self.set_open_sheet(self.sheet)
 
        self.start_spreadsheet(expanded=False)
 
        bal_indexes = [0, 1, 2, 4]
 
        totals = [core.MutableBalance() for _ in bal_indexes]
 
        threshold = Decimal('.5')
 
        for fund, balances in self.balances.items():
 
            balances = [balances[index] for index in bal_indexes]
 
            if (not all(bal.clean_copy(threshold).le_zero() for bal in balances)
 
                and fund != UNRESTRICTED_FUND):
 
                self.write_balances(fund, balances)
 
                for total, bal in zip(totals, balances):
 
                    total += bal
 
        self.write_balances('', totals, self.merge_styles(
 
            self.border_style(core.Border.TOP, '.75pt'),
 
            self.border_style(core.Border.BOTTOM, '1.5pt', 'double'),
 
        ))
 
        self.document.spreadsheet.childNodes.reverse()
 
        self.sheet = start_sheet
 

	
 
    def _row_balances(self, accounts_map: AccountsMap) -> Iterator[core.Balance]:
 
        acct_order = ['Income', 'Expenses', 'Equity']
 
        key_order = [core.OPENING_BALANCE_NAME, *acct_order, core.ENDING_BALANCE_NAME]
 
        balances: Dict[str, core.Balance] = {key: core.MutableBalance() for key in key_order}
 
        for acct_s, balance in core.account_balances(accounts_map, acct_order):
 
            if acct_s in balances:
 
                balances[acct_s] = balance
 
            else:
 
                acct_root, _, _ = acct_s.partition(':')
 
                balances[acct_root] += balance
 
        for key in key_order:
 
            if key == 'Expenses':
 
                yield balances[key]
 
            else:
 
                yield -balances[key]
 
        for info_key in INFO_ACCOUNTS:
 
            for _, balance in core.account_balances(accounts_map, [info_key]):
 
                pass
 
            yield core.normalize_amount_func(info_key)(balance)
 

	
 
    def write_balances(self,
 
                       fund: str,
 
                       balances: Iterable[core.Balance],
 
                       style: Union[None, str, odf.style.Style]=None,
 
    ) -> odf.table.TableRow:
 
        return self.add_row(
 
            self.string_cell(fund, stylename=self.style_endtext),
 
            *(self.balance_cell(bal, stylename=style) for bal in balances),
 
        )
 

	
 
    def write_row(self, row: FundPosts) -> None:
 
        fund, accounts_map = row
 
        self.balances[fund] = list(self._row_balances(accounts_map))
 
        if fund != UNRESTRICTED_FUND:
 
            self.write_balances(fund, self.balances[fund])
 

	
 
    def write(self, rows: Iterable[FundPosts]) -> None:
 
        self.balances: Dict[str, Sequence[core.Balance]] = collections.OrderedDict()
 
        super().write(rows)
 
        try:
 
            unrestricted = self.balances[UNRESTRICTED_FUND]
 
        except KeyError:
 
            pass
 
        else:
 
            self.add_row()
 
            self.write_balances("Unrestricted", unrestricted)
 

	
 

	
 
class TextReport:
 
    def __init__(self,
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
                 out_file: TextIO) -> None:
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 
        self.out_file = out_file
 

	
 
    def _account_balances(self,
 
                          fund: str,
 
                          account_map: AccountsMap,
 
    ) -> Iterator[Tuple[str, Sequence[str]]]:
 
        total_fmt = f'{fund} balance as of {{}}'
 
        for acct_s, balance in core.account_balances(account_map, EQUITY_ACCOUNTS):
 
            if acct_s is core.OPENING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.start_date.isoformat())
 
            elif acct_s is core.ENDING_BALANCE_NAME:
 
                acct_s = total_fmt.format(self.stop_date.isoformat())
 
            yield acct_s, (-balance).format(None, sep='\0').split('\0')
 
        for _, account in core.sort_and_filter_accounts(account_map, INFO_ACCOUNTS):
 
            balance = account_map[account].stop_bal
 
            if not balance.is_zero():
 
                balance = core.normalize_amount_func(account)(balance)
 
                yield account, balance.format(None, sep='\0').split('\0')
 

	
 
    def write(self, rows: Iterable[FundPosts]) -> None:
 
        output = [
 
            line
 
            for fund, account_map in rows
 
            for line in self._account_balances(fund, account_map)
 
        ]
 
        acct_width = max(len(acct_s) for acct_s, _ in output) + 2
 
        bal_width = max(len(s) for _, bal_s in output for s in bal_s)
 
        bal_width = max(bal_width, 8)
 
        line_fmt = f'{{:>{acct_width}}}  {{:>{bal_width}}}'
 
        print(line_fmt.replace('{:>', '{:^').format("ACCOUNT", "BALANCE"),
 
              file=self.out_file)
 
        fund_start = f' balance as of {self.start_date.isoformat()}'
 
        for acct_s, bal_seq in output:
 
            if acct_s.endswith(fund_start):
 
                print(line_fmt.format('―' * acct_width, '―' * bal_width),
 
                      file=self.out_file)
 
            bal_iter = iter(bal_seq)
 
            print(line_fmt.format(acct_s, next(bal_iter)), file=self.out_file)
 
            for bal_s in bal_iter:
 
                print(line_fmt.format('', bal_s), file=self.out_file)
 

	
 

	
 
class ReportType(enum.Enum):
 
    TEXT = TextReport
 
    ODS = ODSReport
 
    TXT = TEXT
 
    SPREADSHEET = ODS
 

	
 
    @classmethod
 
    def from_arg(cls, s: str) -> 'ReportType':
 
        try:
 
            return cls[s.upper()]
 
        except KeyError:
 
            raise ValueError(f"no report type matches {s!r}") from None
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date.
 
""")
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='TYPE',
 
        type=ReportType.from_arg,
 
        help="""Type of report to generate. `text` gives a plain two-column text
 
report listing accounts and balances over the period, and is the default when
 
you search for a specific project/fund. `ods` produces a higher-level
 
spreadsheet, meant to provide an overview of all funds, and is the default when
 
you don't specify a project/fund.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for text reports, and a generated filename for ODS
 
reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.report_type = ReportType.TEXT
 
        else:
 
            args.report_type = ReportType.ODS
 
    return args
 

	
 
def diff_year(date: datetime.date, diff: int) -> datetime.date:
 
    new_year = date.year + diff
 
    try:
 
        return date.replace(year=new_year)
 
    except ValueError:
 
        # The original date is Feb 29, which doesn't exist in the new year.
 
        if diff < 0:
 
            return datetime.date(new_year, 2, 28)
 
        else:
 
            return datetime.date(new_year, 3, 1)
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    if args.stop_date is None:
 
        if args.start_date is None:
 
            args.stop_date = datetime.date.today()
 
        else:
 
            args.stop_date = diff_year(args.start_date, 1)
 
    if args.start_date is None:
 
        args.start_date = diff_year(args.stop_date, -1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    else:
 
        entries, load_errors, _ = books_loader.load_fy_range(args.start_date, args.stop_date)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
        returncode |= cliutil.ReturnFlag.LOAD_ERRORS
 

	
 
    postings = (
 
        post
 
        for post in data.Posting.from_entries(entries)
 
        if post.meta.date < args.stop_date
 
    )
 
    for search_term in args.search_terms:
 
        postings = search_term.filter_postings(postings)
 
    fund_postings = {
 
        key: related
 
        for key, related in core.RelatedPostings.group_by_meta(postings, 'project')
 
        if isinstance(key, str)
 
    }
 
    period_cls = core.PeriodPostings.with_start_date(args.start_date)
 
    fund_map = collections.OrderedDict(
 
        (fund, dict(period_cls.group_by_account(fund_postings[fund])))
 
        for fund in sorted(fund_postings, key=lambda s: locale.strxfrm(s.casefold()))
 
    )
 
    if not fund_map:
 
        logger.warning("no matching postings found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 
        returncode |= cliutil.ReturnFlag.NOTHING_TO_REPORT
 
    elif args.report_type is ReportType.TEXT:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = TextReport(args.start_date, args.stop_date, out_file)
 
        report.write(fund_map.items())
 
    else:
 
        ods_report = ODSReport(args.start_date, args.stop_date)
 
        ods_report.write(fund_map.items())
 
        if args.output_file is None:
 
            out_dir_path = config.repository_path() or Path()
 
            args.output_file = out_dir_path / 'FundReport_{}_{}.ods'.format(
 
                args.start_date.isoformat(), args.stop_date.isoformat(),
 
            )
 
            logger.info("Writing report to %s", args.output_file)
 
        ods_file = cliutil.bytes_output(args.output_file, stdout)
 
        ods_report.save_file(ods_file)
 
    return 0 if returncode == 0 else 16 + returncode
 
    return cliutil.ReturnFlag.returncode(returncode)
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/reports/ledger.py
Show inline comments
...
 
@@ -426,423 +426,418 @@ class LedgerODS(core.BaseODS[data.Posting, None]):
 
            account: count + 3
 
            for account, count in tally_by_account_iter
 
        }
 
        sheet_names = self.plan_sheets(
 
            tally_by_account, self.required_sheet_names, self.sheet_size,
 
        )
 
        using_sheet_index = -1
 
        for sheet_index, account in core.sort_and_filter_accounts(
 
                tally_by_account, sheet_names,
 
        ):
 
            if not account.is_open_on_date(self.date_range.start):
 
                continue
 
            while using_sheet_index < sheet_index:
 
                using_sheet_index += 1
 
                self.start_sheet(sheet_names[using_sheet_index])
 
            self.norm_func = core.normalize_amount_func(account)
 
            postings = self.account_groups[account]
 
            if postings:
 
                totals_set = self.totals_with_entries
 
            else:
 
                totals_set = self.totals_without_entries
 
            want_totals = account.is_under(*totals_set) is not None
 
            if postings or want_totals:
 
                self.write_header(account)
 
            if want_totals:
 
                self._report_section_balance(account, 'start')
 
            self.write_entries(account, postings)
 
            if want_totals:
 
                self._report_section_balance(account, 'stop')
 
        for index in range(using_sheet_index + 1, len(sheet_names)):
 
            self.start_sheet(sheet_names[index])
 

	
 

	
 
class TransactionFilter(enum.IntFlag):
 
    ZERO = 1
 
    CREDIT = 2
 
    DEBIT = 4
 
    ALL = ZERO | CREDIT | DEBIT
 

	
 
    @classmethod
 
    def from_arg(cls, s: str) -> 'TransactionFilter':
 
        try:
 
            return cls[s.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown transaction filter {s!r}")
 

	
 
    @classmethod
 
    def post_flag(cls, post: data.Posting) -> int:
 
        norm_func = core.normalize_amount_func(post.account)
 
        number = norm_func(post.units.number)
 
        if not number:
 
            return cls.ZERO
 
        elif number > 0:
 
            return cls.CREDIT
 
        else:
 
            return cls.DEBIT
 

	
 

	
 
class TransactionODS(LedgerODS):
 
    CORE_COLUMNS: Sequence[str] = [
 
        'Date',
 
        'Description',
 
        'Account',
 
        data.Metadata.human_name('entity'),
 
        'Original Amount',
 
        'Booked Amount',
 
    ]
 
    METADATA_COLUMNS: Sequence[str] = [
 
        'project',
 
        'rt-id',
 
        'receipt',
 
        'check',
 
        'invoice',
 
        'contract',
 
        'approval',
 
        'paypal-id',
 
        'check-number',
 
        'bank-statement',
 
    ]
 

	
 
    def __init__(self,
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
                 accounts: Optional[Sequence[str]]=None,
 
                 rt_wrapper: Optional[rtutil.RT]=None,
 
                 sheet_size: Optional[int]=None,
 
                 totals_with_entries: Optional[Sequence[str]]=None,
 
                 totals_without_entries: Optional[Sequence[str]]=None,
 
                 txn_filter: int=TransactionFilter.ALL,
 
    ) -> None:
 
        super().__init__(
 
            start_date,
 
            stop_date,
 
            accounts,
 
            rt_wrapper,
 
            sheet_size,
 
            totals_with_entries,
 
            totals_without_entries,
 
        )
 
        self.txn_filter = txn_filter
 
        if self.txn_filter == TransactionFilter.CREDIT:
 
            self.report_name = "Receipts"
 
        elif self.txn_filter == TransactionFilter.DEBIT:
 
            self.report_name = "Disbursements"
 
        else:
 
            self.report_name = "Transactions"
 

	
 
    def _wanted_txns(self, postings: Iterable[data.Posting]) -> Iterator[Transaction]:
 
        last_txn: Optional[Transaction] = None
 
        for post in postings:
 
            txn = post.meta.txn
 
            if (txn is not last_txn
 
                and TransactionFilter.post_flag(post) & self.txn_filter):
 
                yield txn
 
                last_txn = txn
 

	
 
    def metadata_columns_for(self, sheet_name: str) -> Sequence[str]:
 
        return self.METADATA_COLUMNS
 

	
 
    def write_balance_sheet(self) -> None:
 
        return
 

	
 
    def _report_section_balance(self, key: data.Account, date_key: str) -> None:
 
        if self.txn_filter == TransactionFilter.ALL:
 
            super()._report_section_balance(key, date_key)
 
        elif date_key == 'stop':
 
            balance = core.Balance(
 
                post.at_cost()
 
                for txn in self._wanted_txns(self.account_groups[key])
 
                for post in data.Posting.from_txn(txn)
 
                if post.account == key
 
            )
 
            self._write_total_row(self.date_range.stop, "Period Activity", balance)
 

	
 
    def _account_tally(self, account: data.Account) -> int:
 
        return sum(len(txn.postings)
 
                   for txn in self._wanted_txns(self.account_groups[account]))
 

	
 
    def write_entries(self, account: data.Account, rows: Iterable[data.Posting]) -> None:
 
        for txn in self._wanted_txns(rows):
 
            post_list = list(data.Posting.from_txn(txn))
 
            post_list.sort(key=lambda post: (
 
                0 if post.account == account else 1,
 
                -abs(post.at_cost().number),
 
            ))
 
            postings = iter(post_list)
 
            post1 = next(postings)
 
            if post1.cost is None:
 
                amount_cell = odf.table.TableCell()
 
            else:
 
                amount_cell = self.currency_cell(self.norm_func(post1.units))
 
            self.add_row(
 
                self.date_cell(txn.date),
 
                self.string_cell(txn.narration),
 
                self.string_cell(post1.account),
 
                self.string_cell(post1.meta.get('entity') or ''),
 
                amount_cell,
 
                self.currency_cell(self.norm_func(post1.at_cost())),
 
                *(self.meta_links_cell(post1.meta.report_links(key))
 
                  if key in data.LINK_METADATA
 
                  else self.string_cell(post1.meta.get(key, ''))
 
                  for key in self.metadata_columns),
 
            )
 
            for post in postings:
 
                meta_cells: List[odf.table.TableCell] = []
 
                for meta_key in self.metadata_columns:
 
                    try:
 
                        dup = post.meta[meta_key] is txn.meta[meta_key]
 
                    except KeyError:
 
                        dup = False
 
                    if dup:
 
                        meta_cell = odf.table.TableCell()
 
                    elif meta_key in data.LINK_METADATA:
 
                        meta_cell = self.meta_links_cell(post.meta.report_links(meta_key))
 
                    else:
 
                        meta_cell = self.string_cell(post.meta.get(meta_key, ''))
 
                    meta_cells.append(meta_cell)
 
                if post.cost is None:
 
                    amount_cell = odf.table.TableCell()
 
                else:
 
                    amount_cell = self.currency_cell(self.norm_func(post.units))
 
                self.add_row(
 
                    odf.table.TableCell(),
 
                    odf.table.TableCell(),
 
                    self.string_cell(post.account),
 
                    self.string_cell(post.meta.get('entity') or ''),
 
                    amount_cell,
 
                    self.currency_cell(self.norm_func(post.at_cost())),
 
                    *meta_cells,
 
                )
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 
    NOTHING_TO_REPORT = 8
 

	
 

	
 
class CashReportAction(argparse.Action):
 
    def __call__(self,
 
                 parser: argparse.ArgumentParser,
 
                 namespace: argparse.Namespace,
 
                 values: Union[Sequence[Any], str, None]=None,
 
                 option_string: Optional[str]=None,
 
    ) -> None:
 
        namespace.txn_filter = self.const
 
        if namespace.accounts is None:
 
            namespace.accounts = []
 
        namespace.accounts.append('Assets:PayPal')
 
        namespace.accounts.append('Cash')
 
        if namespace.stop_date is None:
 
            namespace.stop_date = datetime.date.today()
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--disbursements',
 
        action=CashReportAction,
 
        const=TransactionFilter.DEBIT,
 
        nargs=0,
 
        help="""Shortcut to set all the necessary options to generate a cash
 
disbursements report.
 
""")
 
    parser.add_argument(
 
        '--receipts',
 
        action=CashReportAction,
 
        const=TransactionFilter.CREDIT,
 
        nargs=0,
 
        help="""Shortcut to set all the necessary options to generate a cash
 
receipts report.
 
""")
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date, or 30 days from today if the start
 
date was also not specified.
 
""")
 
    parser.add_argument(
 
        '--transactions', '-t',
 
        dest='txn_filter',
 
        metavar='TYPE',
 
        type=TransactionFilter.from_arg,
 
        help="""Report whole transactions rather than individual postings.
 
The type argument selects which type of transactions to report. Choices are
 
credit, debit, or all.
 
""")
 
    parser.add_argument(
 
        '--account', '-a',
 
        dest='accounts',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""Show this account in the report. You can specify this option
 
multiple times. You can specify a part of the account hierarchy, or an account
 
classification from metadata. If not specified, the default set adapts to your
 
search criteria.
 
""")
 
    parser.add_argument(
 
        '--show-totals', '-S',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""When entries for this account appear in the report, include
 
account balance(s) as well. You can specify this option multiple times. Pass in
 
a part of the account hierarchy. The default is all accounts.
 
""")
 
    parser.add_argument(
 
        '--add-totals', '-T',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""When an account could be included in the report but does not
 
have any entries in the date range, include a header and account balance(s) for
 
it. You can specify this option multiple times. Pass in a part of the account
 
hierarchy. The default set adapts to your search criteria.
 
""")
 
    parser.add_argument(
 
        '--sheet-size', '--size',
 
        metavar='SIZE',
 
        type=int,
 
        default=LedgerODS.SHEET_SIZE,
 
        help="""Try to limit sheets to this many rows. The report will
 
automatically create new sheets to make this happen. When that's not possible,
 
it will issue a warning.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is `LedgerReport_<StartDate>_<StopDate>.ods`.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.add_totals is None and args.search_terms:
 
        args.add_totals = []
 
    if args.accounts is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.accounts = [
 
                'Income',
 
                'Expenses',
 
                'Assets:Receivable',
 
                'Assets:Prepaid',
 
                'Liabilities:UnearnedIncome',
 
                'Liabilities:Payable',
 
            ]
 
        else:
 
            args.accounts = list(LedgerODS.ACCOUNT_COLUMNS)
 
    return args
 

	
 
def diff_year(date: datetime.date, diff: int) -> datetime.date:
 
    new_year = date.year + diff
 
    try:
 
        return date.replace(year=new_year)
 
    except ValueError:
 
        # The original date is Feb 29, which doesn't exist in the new year.
 
        if diff < 0:
 
            return datetime.date(new_year, 2, 28)
 
        else:
 
            return datetime.date(new_year, 3, 1)
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    today = datetime.date.today()
 
    if args.start_date is None:
 
        args.start_date = diff_year(today, -1)
 
        if args.stop_date is None:
 
            args.stop_date = today + datetime.timedelta(days=30)
 
    elif args.stop_date is None:
 
        args.stop_date = diff_year(args.start_date, 1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, options = books.Loader.load_none(config.config_file_path())
 
    else:
 
        entries, load_errors, options = books_loader.load_fy_range(args.start_date, args.stop_date)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
        returncode |= cliutil.ReturnFlag.LOAD_ERRORS
 

	
 
    data.Account.load_from_books(entries, options)
 
    postings = data.Posting.from_entries(entries)
 
    for search_term in args.search_terms:
 
        postings = search_term.filter_postings(postings)
 

	
 
    rt_wrapper = config.rt_wrapper()
 
    if rt_wrapper is None:
 
        logger.warning("could not initialize RT client; spreadsheet links will be broken")
 
    try:
 
        if args.txn_filter is None:
 
            report = LedgerODS(
 
                args.start_date,
 
                args.stop_date,
 
                args.accounts,
 
                rt_wrapper,
 
                args.sheet_size,
 
                args.show_totals,
 
                args.add_totals,
 
            )
 
        else:
 
            report = TransactionODS(
 
                args.start_date,
 
                args.stop_date,
 
                args.accounts,
 
                rt_wrapper,
 
                args.sheet_size,
 
                args.show_totals,
 
                args.add_totals,
 
                args.txn_filter,
 
            )
 
    except ValueError as error:
 
        logger.error("%s: %r", *error.args)
 
        return 2
 
    report.write(postings)
 
    if not any(report.account_groups.values()):
 
        logger.warning("no matching postings found to report")
 
        returncode |= ReturnFlag.NOTHING_TO_REPORT
 
        returncode |= cliutil.ReturnFlag.NOTHING_TO_REPORT
 

	
 
    if args.output_file is None:
 
        out_dir_path = config.repository_path() or Path()
 
        args.output_file = out_dir_path / '{}Report_{}_{}.ods'.format(
 
            report.report_name,
 
            args.start_date.isoformat(),
 
            args.stop_date.isoformat(),
 
        )
 
        logger.info("Writing report to %s", args.output_file)
 
    ods_file = cliutil.bytes_output(args.output_file, stdout)
 
    report.save_file(ods_file)
 
    return 0 if returncode == 0 else 16 + returncode
 
    return cliutil.ReturnFlag.returncode(returncode)
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/tools/opening_balances.py
Show inline comments
 
#!/usr/bin/env python3
 
"""opening_balances.py - Tool to generate opening balances transactions
 

	
 
This tool generates an opening balances transaction for a given date and writes
 
it to stdout. Use this when you close the books for a year to record the final
 
balances for that year.
 

	
 
Run it without arguments to generate opening balances for the current fiscal
 
year. You can also specify a fiscal year to generate opening balances for, or
 
even a specific date (which can be helpful for testing or debugging).
 
"""
 
# SPDX-FileCopyrightText: © 2020 Martin Michlmayr <tbm@cyrius.com>
 
# SPDX-FileCopyrightText: © 2020 Brett Smith
 
# SPDX-License-Identifier: AGPL-3.0-or-later
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import copy
 
import datetime
 
import enum
 
import locale
 
import logging
 
import sys
 

	
 
from typing import (
 
    Dict,
 
    Hashable,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    NamedTuple,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
)
 
from ..beancount_types import (
 
    Error,
 
    MetaKey,
 
    MetaValue,
 
    Transaction,
 
)
 

	
 
from decimal import Decimal, ROUND_HALF_EVEN, ROUND_HALF_UP
 

	
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from ..reports.core import Balance
 

	
 
from beancount.core import data as bc_data
 
from beancount.core import display_context as bc_dcontext
 
from beancount.parser import printer as bc_printer
 

	
 
from beancount.core.convert import get_cost
 
from beancount.core.inventory import Inventory
 
from beancount.core.position import Position, get_position
 

	
 
EQUITY_ACCOUNTS = frozenset([
 
    'Equity',
 
    'Expenses',
 
    'Income',
 
])
 
FUND_ACCOUNTS = frozenset([
 
    'Assets:Prepaid',
 
    'Assets:Receivable',
 
    'Equity:Funds',
 
    'Equity:Realized',
 
    'Expenses',
 
    'Income',
 
    'Liabilities:Payable',
 
    'Liabilities:UnearnedIncome',
 
])
 
RESTRICTED_ACCOUNT = data.Account('Equity:Funds:Restricted')
 
UNRESTRICTED_ACCOUNT = data.Account('Equity:Funds:Unrestricted')
 
PROGNAME = 'opening-balances'
 
logger = logging.getLogger('conservancy_beancount.tools.opening_balances')
 

	
 
def quantize_amount(
 
        amount: data.Amount,
 
        exp: Decimal=Decimal('.01'),
 
        rounding: str=ROUND_HALF_EVEN,
 
) -> data.Amount:
 
    return amount._replace(number=amount.number.quantize(exp, rounding=rounding))
 

	
 
class AccountWithFund(NamedTuple):
 
    account: data.Account
 
    fund: Optional[MetaValue]
 

	
 
    def sortkey(self) -> Hashable:
 
        account, fund = self
 
        return (
 
            0 if fund is None else 1,
 
            locale.strxfrm(account),
 
            locale.strxfrm(str(fund).casefold()),
 
        )
 

	
 

	
 
class Posting(data.Posting):
 
    @staticmethod
 
    def _position_sortkey(position: Position) -> str:
 
        units, cost = position
 
        if cost is None:
 
            # Beancount type-declares that position.cost must be a Cost, but
 
            # in practice that's not true. Call get_position(post) on any
 
            # post without a cost and see what it returns. Hence the ignore.
 
            return units.currency  # type:ignore[unreachable]
 
        else:
 
            return f'{units.currency} {cost.currency} {cost.date.isoformat()}'
 

	
 
    @classmethod
 
    def build_opening(
 
            cls,
 
            key: AccountWithFund,
 
            meta_key: MetaKey,
 
            inventory: Inventory,
 
    ) -> Iterator[bc_data.Posting]:
 
        account, project = key
 
        if project is None:
 
            meta: Optional[Dict[MetaKey, MetaValue]] = None
 
        else:
 
            meta = {meta_key: project}
 
        for units, cost in sorted(inventory, key=cls._position_sortkey):
 
            if cost is None:
 
                units = quantize_amount(units)
 
            yield bc_data.Posting(
 
                account, units, cost, None, None, copy.copy(meta),
 
            )
 

	
 

	
 
class ReturnFlag(enum.IntFlag):
 
    LOAD_ERRORS = 1
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        '--fund-metadata-key', '-m',
 
        metavar='KEY',
 
        dest='meta_key',
 
        default='project',
 
        help="""Name of the fund metadata key. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        '--unrestricted-fund', '-u',
 
        metavar='PROJECT',
 
        default='Conservancy',
 
        help="""Name of the unrestricted fund. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        'as_of_date',
 
        metavar='YEAR_OR_DATE',
 
        type=cliutil.year_or_date_arg,
 
        nargs='?',
 
        help="""Date to generate opening balances for. You can provide just
 
a year to generate balances for the start of that fiscal year. Defaults to the
 
current fiscal year.
 
""")
 
    return parser.parse_args(arglist)
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    fy = config.fiscal_year_begin()
 
    if args.as_of_date is None:
 
        args.as_of_date = fy.for_date()
 
    if isinstance(args.as_of_date, int):
 
        args.as_of_date = fy.first_date(args.as_of_date)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
    else:
 
        entries, load_errors, _ = books_loader.load_fy_range(0, args.as_of_date)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 
        returncode |= ReturnFlag.LOAD_ERRORS
 
        returncode |= cliutil.ReturnFlag.LOAD_ERRORS
 

	
 
    inventories: Mapping[AccountWithFund, Inventory] = collections.defaultdict(Inventory)
 
    for post in Posting.from_entries(entries):
 
        if post.meta.date >= args.as_of_date:
 
            continue
 
        account = post.account
 
        fund_acct_match = post.account.is_under(*FUND_ACCOUNTS)
 
        is_equity = account.root_part() in EQUITY_ACCOUNTS
 
        if fund_acct_match is None:
 
            project: MetaValue = None
 
        else:
 
            project = post.meta.get(args.meta_key)
 
            if project is None:
 
                bc_printer.print_error(Error(
 
                    post.meta, "no fund specified", post.meta.txn,
 
                ), file=stderr)
 
                project = args.unrestricted_fund
 
            if is_equity:
 
                if project == args.unrestricted_fund:
 
                    account = UNRESTRICTED_ACCOUNT
 
                else:
 
                    account = RESTRICTED_ACCOUNT
 
        inventory = inventories[AccountWithFund(account, project)]
 
        if is_equity:
 
            inventory.add_amount(post.at_cost())
 
        else:
 
            inventory.add_position(get_position(post))
 

	
 
    opening_date = args.as_of_date - datetime.timedelta(1)
 
    opening = bc_data.Transaction(  # type:ignore[operator]
 
        None,  # meta
 
        opening_date,
 
        '*',
 
        None,  # payee
 
        f"Opening balances for FY{fy.for_date(args.as_of_date)}",
 
        frozenset(),  # tags
 
        frozenset(),  # links
 
        [post
 
         for key in sorted(inventories, key=AccountWithFund.sortkey)
 
         for post in Posting.build_opening(key, args.meta_key, inventories[key])
 
        ])
 
    balance = Balance(get_cost(get_position(post))
 
                      for post in opening.postings)
 
    for amount in balance.clean_copy().values():
 
        opening.postings.append(bc_data.Posting(
 
            UNRESTRICTED_ACCOUNT, quantize_amount(-amount), None, None, None,
 
            {args.meta_key: args.unrestricted_fund},
 
        ))
 
    dcontext = bc_dcontext.DisplayContext()
 
    dcontext.set_commas(True)
 
    bc_printer.print_entry(opening, dcontext, file=stdout)
 
    return 0 if returncode == 0 else 16 + returncode
 
    return cliutil.ReturnFlag.returncode(returncode)
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
tests/test_reports_accrual.py
Show inline comments
...
 
@@ -586,216 +586,216 @@ def test_outgoing_report_good_payment_method(caplog, accrual_postings, arg):
 
        ('Liabilities:Payable:Accounts', -100, meta),
 
    ])
 
    rt_client = RTClient()
 
    run_outgoing(rt_id, data.Posting.from_txn(txn), rt_client)
 
    assert not caplog.records
 
    cf_values = rt_client.edits[rt_id[3:]]['CF_payment-method'].split()
 
    assert cf_values[0] == arg.split()[0].upper()
 
    assert len(cf_values) > 1
 

	
 
@pytest.mark.parametrize('arg', [
 
    '',
 
    'usd',
 
    'usd nonexistent',
 
    'check',
 
    'us check',
 
    *testutil.NON_STRING_METADATA_VALUES,
 
])
 
def test_outgoing_report_bad_payment_method(caplog, accrual_postings, arg):
 
    rt_id = 'rt:40'
 
    meta = {'rt-id': rt_id, 'invoice': 'rt:40/100', 'payment-method': arg}
 
    txn = testutil.Transaction(postings=[
 
        ('Liabilities:Payable:Accounts', -100, meta),
 
    ])
 
    rt_client = RTClient()
 
    run_outgoing(rt_id, data.Posting.from_txn(txn), rt_client)
 
    assert caplog.records
 
    for log in caplog.records:
 
        assert log.levelname == 'WARNING'
 
        assert log.message.startswith(f'cannot set payment-method for {rt_id}: ')
 
    assert 'CF_payment-method' not in rt_client.edits[rt_id[3:]]
 

	
 
def test_outgoing_report_without_rt_id(accrual_postings, caplog):
 
    invoice = 'rt://ticket/515/attachments/5150'
 
    related = accruals_by_meta(
 
        accrual_postings, invoice, wrap_type=accrual.AccrualPostings,
 
    )
 
    output = run_outgoing(None, related)
 
    assert caplog.records
 
    log = caplog.records[0]
 
    assert log.message.startswith(
 
        f"can't generate outgoings report for 2010-05-15 MatchingProgram {invoice}"
 
        " because no RT ticket available:",
 
    )
 
    assert not output.getvalue()
 

	
 
def run_aging_report(postings, today):
 
    postings = (
 
        post for post in postings
 
        if post.account.is_under('Assets:Receivable', 'Liabilities:Payable')
 
    )
 
    groups = dict(accrual.AccrualPostings.make_consistent(postings))
 
    output = io.BytesIO()
 
    rt_wrapper = rtutil.RT(RTClient())
 
    report = accrual.AgingReport(rt_wrapper, output, today)
 
    report.run(groups)
 
    return output
 

	
 
@pytest.mark.parametrize('date', [
 
    datetime.date(2010, 3, 1),
 
    # Both these dates are chosen for their off-by-one potential:
 
    # the first is exactly 30 days after the 2010-06-10 payable;
 
    # the second is exactly 60 days after the 2010-05-15 receivable.
 
    datetime.date(2010, 7, 10),
 
    datetime.date(2010, 7, 14),
 
    # The remainder just shuffle the age buckets some.
 
    datetime.date(2010, 12, 1),
 
    datetime.date(2011, 6, 1),
 
    datetime.date(2011, 12, 1),
 
    datetime.date(2012, 3, 1),
 
])
 
def test_aging_report_date_cutoffs(accrual_postings, date):
 
    output = run_aging_report(accrual_postings, date)
 
    check_aging_ods(output, date)
 

	
 
def test_aging_report_entity_consistency(accrual_postings):
 
    date = datetime.date.today()
 
    output = run_aging_report((
 
        post for post in accrual_postings
 
        if post.meta.get('rt-id') == 'rt:480'
 
        and post.units.number < 0
 
    ), date)
 
    check_aging_ods(output, date, [], [
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyA', 125, 'rt:480/4800'),
 
        AgingRow.make_simple('2010-04-15', 'MultiPartyB', 125, 'rt:480/4800'),
 
    ])
 

	
 
def run_main(arglist, config=None, out_type=io.StringIO):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/accruals.beancount'),
 
            rt_client=RTClient(),
 
        )
 
    if out_type is io.BytesIO:
 
        arglist.insert(0, '--output-file=-')
 
    output = out_type()
 
    errors = io.StringIO()
 
    retcode = accrual.main(arglist, output, errors, config)
 
    output.seek(0)
 
    errors.seek(0)
 
    return retcode, output, errors
 

	
 
def check_main_fails(arglist, config, error_flags):
 
    retcode, output, errors = run_main(arglist, config)
 
    assert retcode > 16
 
    assert (retcode - 16) & error_flags
 
    assert not output.getvalue()
 
    return errors
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['--report-type=balance', 'entity=EarlyBird'],
 
    ['--report-type=outgoing', 'entity=EarlyBird'],
 
])
 
def test_output_excludes_payments(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    output.seek(0)
 
    for line in output:
 
        assert not re.match(r'\brt:4\d\b', line)
 

	
 
@pytest.mark.parametrize('arglist,expect_invoice', [
 
    (['40'], 'rt:40/400'),
 
    (['44/440'], 'rt:44/440'),
 
])
 
def test_output_payments_when_only_match(arglist, expect_invoice):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        rf'^EarlyBird {re.escape(expect_invoice)}:$',
 
        r' outstanding since ',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist,expect_amount', [
 
    (['310'], 420),
 
    (['310/3120'], 220),
 
    (['-t', 'out', 'entity=Vendor'], 420),
 
])
 
def test_main_outgoing_report(arglist, expect_amount):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    rt_url = RTClient.DEFAULT_URL[:-9]
 
    rt_id_url = re.escape(f'<{rt_url}Ticket/Display.html?id=310>')
 
    contract_url = re.escape(f'<{rt_url}Ticket/Attachment/3120/3120/VendorContract.pdf>')
 
    check_output(output, [
 
        r'^REQUESTOR: Mx\. 310 <mx310@example\.org>$',
 
        rf'^TOTAL TO PAY: \${expect_amount}\.00$',
 
        r'^\s*2010-04-30\s',
 
        r'^\s+Expenses:Travel\s+220 USD$',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['-t', 'balance'],
 
    ['515/5150'],
 
])
 
def test_main_balance_report(arglist):
 
    retcode, output, errors = run_main(arglist)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        r'\brt://ticket/515/attachments/5150:$',
 
        r'^\s+1,500\.00 USD outstanding since 2010-05-15$',
 
    ])
 

	
 
def test_main_balance_report_because_no_rt_id():
 
    invoice = 'Invoices/2010StateRegistration.pdf'
 
    retcode, output, errors = run_main([invoice])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_output(output, [
 
        rf'\b{re.escape(invoice)}:$',
 
        r'^\s+-50\.00 USD outstanding since 2010-06-20$',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    [],
 
    ['entity=Lawyer'],
 
])
 
def test_main_aging_report(arglist):
 
    if arglist:
 
        recv_rows = [row for row in AGING_AR if 'Lawyer' in row.entity]
 
        pay_rows = [row for row in AGING_AP if 'Lawyer' in row.entity]
 
    else:
 
        recv_rows = AGING_AR
 
        pay_rows = AGING_AP
 
    retcode, output, errors = run_main(arglist, out_type=io.BytesIO)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_aging_ods(output, datetime.date.today(), recv_rows, pay_rows)
 

	
 
def test_main_no_books():
 
    errors = check_main_fails([], testutil.TestConfig(), 1 | 8)
 
    errors = check_main_fails([], testutil.TestConfig(), 1 | 2)
 
    testutil.check_lines_match(iter(errors), [
 
        r':[01]: +no books to load in configuration\b',
 
    ])
 

	
 
@pytest.mark.parametrize('arglist', [
 
    ['499'],
 
    ['505/99999'],
 
    ['-t', 'balance', 'entity=NonExistent'],
 
])
 
def test_main_no_matches(arglist, caplog):
 
    check_main_fails(arglist, None, 8)
 
    check_main_fails(arglist, None, 2)
 
    testutil.check_logs_match(caplog, [
 
        ('WARNING', 'no matching entries found to report'),
 
    ])
 

	
 
def test_main_no_rt(caplog):
 
    config = testutil.TestConfig(
 
        books_path=testutil.test_path('books/accruals.beancount'),
 
    )
 
    check_main_fails(['-t', 'out'], config, 4)
 
    check_main_fails(['-t', 'out'], config, 16)
 
    testutil.check_logs_match(caplog, [
 
        ('ERROR', 'unable to generate outgoing report: RT client is required'),
 
    ])
tests/test_reports_fund.py
Show inline comments
...
 
@@ -91,194 +91,194 @@ BALANCES_BY_YEAR = {
 

	
 
@pytest.fixture
 
def fund_entries():
 
    return copy.deepcopy(_ledger_load[0])
 

	
 
def split_text_lines(output):
 
    for line in output:
 
        account, amount = line.rsplit(None, 1)
 
        yield account.strip(), amount
 

	
 
def format_amount(amount, currency='USD'):
 
    return babel.numbers.format_currency(
 
        amount, currency, format_type='accounting',
 
    )
 

	
 
def check_text_balances(actual, expected, *expect_accounts):
 
    balance = Decimal()
 
    for expect_account in expect_accounts:
 
        expect_amount = expected[expect_account]
 
        if expect_amount:
 
            actual_account, actual_amount = next(actual)
 
            assert actual_account == expect_account
 
            assert actual_amount == format_amount(expect_amount)
 
            balance += expect_amount
 
    return balance
 

	
 
def check_text_report(output, project, start_date, stop_date):
 
    _, _, project = project.rpartition('=')
 
    balance_amount = Decimal(OPENING_BALANCES[project])
 
    expected = collections.defaultdict(Decimal)
 
    for year in range(2018, stop_date.year):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(project, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    balance_amount += amount
 
                else:
 
                    expected[account] += amount
 
    actual = split_text_lines(output)
 
    next(actual); next(actual)  # Discard headers
 
    open_acct, open_amt = next(actual)
 
    assert open_acct == "{} balance as of {}".format(
 
        project, start_date.isoformat(),
 
    )
 
    assert open_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Equity:Realized:CurrencyConversion',
 
        'Income:Other',
 
        'Expenses:Other',
 
    )
 
    end_acct, end_amt = next(actual)
 
    assert end_acct == "{} balance as of {}".format(
 
        project, stop_date.isoformat(),
 
    )
 
    assert end_amt == format_amount(balance_amount)
 
    balance_amount += check_text_balances(
 
        actual, expected,
 
        'Assets:Receivable:Accounts',
 
        'Assets:Prepaid:Expenses',
 
        'Liabilities:Payable:Accounts',
 
        'Liabilities:UnearnedIncome',
 
    )
 
    assert next(actual, None) is None
 

	
 
def check_cell_balance(cell, balance):
 
    if balance:
 
        assert cell.value == balance
 
    else:
 
        assert not cell.value
 

	
 
def check_ods_sheet(sheet, account_balances, *, full):
 
    if full:
 
        account_bals = account_balances.copy()
 
        account_bals['Unrestricted'] = account_bals.pop('Conservancy')
 
    else:
 
        account_bals = {
 
            key: balances
 
            for key, balances in account_balances.items()
 
            if key != 'Conservancy' and any(v >= .5 for v in balances.values())
 
        }
 
        totals = {key: Decimal() for key in
 
                  ['opening', 'Income', 'Expenses', 'Equity:Realized']}
 
        for fund, balances in account_bals.items():
 
            for key in totals:
 
                totals[key] += balances[key]
 
        account_bals[''] = totals
 
    for row in itertools.islice(sheet.getElementsByType(odf.table.TableRow), 4, None):
 
        cells = iter(testutil.ODSCell.from_row(row))
 
        try:
 
            fund = next(cells).firstChild.text
 
        except (AttributeError, StopIteration):
 
            continue
 
        try:
 
            balances = account_bals.pop(fund)
 
        except KeyError:
 
            pytest.fail(f"report included unexpected fund {fund}")
 
        check_cell_balance(next(cells), balances['opening'])
 
        check_cell_balance(next(cells), balances['Income'])
 
        check_cell_balance(next(cells), -balances['Expenses'])
 
        if full:
 
            check_cell_balance(next(cells), balances['Equity:Realized'])
 
        check_cell_balance(next(cells), sum(balances[key] for key in [
 
            'opening', 'Income', 'Expenses', 'Equity:Realized',
 
        ]))
 
        if full:
 
            check_cell_balance(next(cells), balances['Assets:Receivable'])
 
            check_cell_balance(next(cells), balances['Assets:Prepaid'])
 
            check_cell_balance(next(cells), balances['Liabilities'])
 
            check_cell_balance(next(cells), balances['Liabilities:Payable'])
 
        assert next(cells, None) is None
 
    assert not account_bals, "did not see all funds in report"
 

	
 
def check_ods_report(ods, start_date, stop_date):
 
    account_bals = collections.OrderedDict((key, {
 
        'opening': Decimal(amount),
 
        'Income': Decimal(0),
 
        'Expenses': Decimal(0),
 
        'Equity:Realized': Decimal(0),
 
        'Assets:Receivable': Decimal(0),
 
        'Assets:Prepaid': Decimal(0),
 
        'Liabilities:Payable': Decimal(0),
 
        'Liabilities': Decimal(0),  # UnearnedIncome
 
    }) for key, amount in sorted(OPENING_BALANCES.items()))
 
    for fund, year in itertools.product(account_bals, range(2018, stop_date.year)):
 
        try:
 
            amounts = BALANCES_BY_YEAR[(fund, year)]
 
        except KeyError:
 
            pass
 
        else:
 
            for account, amount in amounts:
 
                if year < start_date.year and account.startswith(EQUITY_ROOT_ACCOUNTS):
 
                    acct_key = 'opening'
 
                else:
 
                    acct_key, _, _ = account.rpartition(':')
 
                account_bals[fund][acct_key] += amount
 
    sheets = iter(ods.getElementsByType(odf.table.Table))
 
    check_ods_sheet(next(sheets), account_bals, full=False)
 
    check_ods_sheet(next(sheets), account_bals, full=True)
 
    assert next(sheets, None) is None, "found unexpected sheet"
 

	
 
def run_main(out_type, arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/fund.beancount'),
 
        )
 
    arglist.insert(0, '--output-file=-')
 
    output = out_type()
 
    errors = io.StringIO()
 
    retcode = fund.main(arglist, output, errors, config)
 
    output.seek(0)
 
    return retcode, output, errors
 

	
 
@pytest.mark.parametrize('project,start_date,stop_date', [
 
    ('Conservancy', START_DATE, STOP_DATE),
 
    ('project=Conservancy', MID_DATE, STOP_DATE),
 
    ('Conservancy', START_DATE, MID_DATE),
 
    ('Alpha', START_DATE, STOP_DATE),
 
    ('project=Alpha', MID_DATE, STOP_DATE),
 
    ('Alpha', START_DATE, MID_DATE),
 
    ('Bravo', START_DATE, STOP_DATE),
 
    ('project=Bravo', MID_DATE, STOP_DATE),
 
    ('Bravo', START_DATE, MID_DATE),
 
    ('project=Charlie', START_DATE, STOP_DATE),
 
])
 
def test_text_report(project, start_date, stop_date):
 
    retcode, output, errors = run_main(io.StringIO, [
 
        '-b', start_date.isoformat(), '-e', stop_date.isoformat(), project,
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    check_text_report(output, project, start_date, stop_date)
 

	
 
@pytest.mark.parametrize('start_date,stop_date', [
 
    (START_DATE, STOP_DATE),
 
    (MID_DATE, STOP_DATE),
 
    (START_DATE, MID_DATE),
 
])
 
def test_ods_report(start_date, stop_date):
 
    retcode, output, errors = run_main(io.BytesIO, [
 
        '--begin', start_date.isoformat(), '--end', stop_date.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    check_ods_report(ods, start_date, stop_date)
 

	
 
def test_main_no_postings(caplog):
 
    retcode, output, errors = run_main(io.StringIO, ['NonexistentProject'])
 
    assert retcode == 24
 
    assert retcode == 18
 
    assert any(log.levelname == 'WARNING' for log in caplog.records)
tests/test_reports_ledger.py
Show inline comments
...
 
@@ -368,194 +368,194 @@ def test_date_range_report(ledger_entries, start_date, stop_date, report_kwargs)
 
@pytest.mark.parametrize('tot_accts', [
 
    (),
 
    ('Assets', 'Liabilities'),
 
    ('Income', 'Expenses'),
 
    ('Assets', 'Liabilities', 'Income', 'Expenses'),
 
])
 
def test_report_filter_totals(ledger_entries, tot_accts, report_kwargs):
 
    txn_filter = report_kwargs.get('txn_filter')
 
    postings, report = build_report(ledger_entries, START_DATE, STOP_DATE,
 
                                    totals_with_entries=tot_accts,
 
                                    totals_without_entries=tot_accts,
 
                                    **report_kwargs)
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        expect_totals = account.startswith(tot_accts)
 
        if account in expected and expected[account][-1].meta.date >= START_DATE:
 
            if txn_filter is None:
 
                expected[account].check_report(
 
                    report.document, START_DATE, STOP_DATE, expect_totals=expect_totals,
 
                )
 
            else:
 
                expected[account].check_txn_report(
 
                    report.document, txn_filter,
 
                    START_DATE, STOP_DATE, expect_totals=expect_totals,
 
                )
 
        elif expect_totals:
 
            ExpectedPostings.check_in_report(
 
                report.document, account, START_DATE, STOP_DATE, txn_filter,
 
            )
 
        else:
 
            ExpectedPostings.check_not_in_report(report.document, account)
 

	
 
@pytest.mark.parametrize('report_kwargs', iter(REPORT_KWARGS))
 
@pytest.mark.parametrize('accounts', [
 
    ('Income', 'Expenses'),
 
    ('Assets:Receivable', 'Liabilities:Payable'),
 
])
 
def test_account_names_report(ledger_entries, accounts, report_kwargs):
 
    txn_filter = report_kwargs.get('txn_filter')
 
    postings, report = build_report(ledger_entries, START_DATE, STOP_DATE,
 
                                    accounts, **report_kwargs)
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        if not account.startswith(accounts):
 
            ExpectedPostings.check_not_in_report(report.document, account)
 
        elif txn_filter is None:
 
            expected[account].check_report(report.document, START_DATE, STOP_DATE)
 
        else:
 
            expected[account].check_txn_report(
 
                report.document, txn_filter, START_DATE, STOP_DATE,
 
            )
 

	
 
def run_main(arglist, config=None):
 
    if config is None:
 
        config = testutil.TestConfig(
 
            books_path=testutil.test_path('books/ledger.beancount'),
 
            rt_client=testutil.RTClient(),
 
        )
 
    arglist.insert(0, '--output-file=-')
 
    output = io.BytesIO()
 
    errors = io.StringIO()
 
    with clean_account_meta():
 
        retcode = ledger.main(arglist, output, errors, config)
 
    output.seek(0)
 
    return retcode, output, errors
 

	
 
def test_main(ledger_entries):
 
    retcode, output, errors = run_main([
 
        '-b', START_DATE.isoformat(),
 
        '-e', STOP_DATE.isoformat(),
 
    ])
 
    output.seek(0)
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    assert get_sheet_names(ods) == DEFAULT_REPORT_SHEETS[:]
 
    postings = data.Posting.from_entries(iter(ledger_entries))
 
    expected = dict(ExpectedPostings.group_by_account(postings))
 
    for account in iter_accounts(ledger_entries):
 
        try:
 
            expected[account].check_report(ods, START_DATE, STOP_DATE)
 
        except KeyError:
 
            ExpectedPostings.check_in_report(ods, account)
 

	
 
@pytest.mark.parametrize('acct_arg', [
 
    'Liabilities',
 
    'Accounts payable',
 
])
 
def test_main_account_limit(ledger_entries, acct_arg):
 
    retcode, output, errors = run_main([
 
        '-a', acct_arg,
 
        '-b', START_DATE.isoformat(),
 
        '-e', STOP_DATE.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    assert get_sheet_names(ods) == ['Balance', 'Liabilities']
 
    postings = data.Posting.from_entries(ledger_entries)
 
    for account, expected in ExpectedPostings.group_by_account(postings):
 
        if account == 'Liabilities:UnearnedIncome':
 
            should_find = acct_arg == 'Liabilities'
 
        else:
 
            should_find = account.startswith('Liabilities')
 
        try:
 
            expected.check_report(ods, START_DATE, STOP_DATE)
 
        except NotFound:
 
            assert not should_find
 
        else:
 
            assert should_find
 

	
 
def test_main_account_classification_splits_hierarchy(ledger_entries):
 
    retcode, output, errors = run_main([
 
        '-a', 'Cash',
 
        '-b', START_DATE.isoformat(),
 
        '-e', STOP_DATE.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    assert get_sheet_names(ods) == ['Balance', 'Assets']
 
    postings = data.Posting.from_entries(ledger_entries)
 
    for account, expected in ExpectedPostings.group_by_account(postings):
 
        should_find = (account == 'Assets:Checking' or account == 'Assets:PayPal')
 
        try:
 
            expected.check_report(ods, START_DATE, STOP_DATE)
 
        except NotFound:
 
            assert not should_find, f"{account} not found in report"
 
        else:
 
            assert should_find, f"{account} in report but should be excluded"
 

	
 
@pytest.mark.parametrize('project,start_date,stop_date', [
 
    ('eighteen', START_DATE, MID_DATE.replace(day=30)),
 
    ('nineteen', MID_DATE, STOP_DATE),
 
])
 
def test_main_project_report(ledger_entries, project, start_date, stop_date):
 
    postings = data.Posting.from_entries(iter(ledger_entries))
 
    for key, related in ExpectedPostings.group_by_meta(postings, 'project'):
 
        if key == project:
 
            break
 
    assert key == project
 
    retcode, output, errors = run_main([
 
        f'--begin={start_date.isoformat()}',
 
        f'--end={stop_date.isoformat()}',
 
        project,
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    assert get_sheet_names(ods) == PROJECT_REPORT_SHEETS[:]
 
    expected = dict(ExpectedPostings.group_by_account(related))
 
    for account in iter_accounts(ledger_entries):
 
        try:
 
            expected[account].check_report(ods, start_date, stop_date)
 
        except KeyError:
 
            ExpectedPostings.check_not_in_report(ods, account)
 

	
 
@pytest.mark.parametrize('flag', [
 
    '--disbursements',
 
    '--receipts',
 
])
 
def test_main_cash_report(ledger_entries, flag):
 
    if flag == '--receipts':
 
        txn_filter = ledger.TransactionFilter.CREDIT
 
    else:
 
        txn_filter = ledger.TransactionFilter.DEBIT
 
    retcode, output, errors = run_main([
 
        flag,
 
        '-b', START_DATE.isoformat(),
 
        '-e', STOP_DATE.isoformat(),
 
    ])
 
    assert not errors.getvalue()
 
    assert retcode == 0
 
    ods = odf.opendocument.load(output)
 
    postings = data.Posting.from_entries(ledger_entries)
 
    for account, expected in ExpectedPostings.group_by_account(postings):
 
        if account == 'Assets:Checking' or account == 'Assets:PayPal':
 
            expected.check_txn_report(ods, txn_filter, START_DATE, STOP_DATE)
 
        else:
 
            expected.check_not_in_report(ods)
 

	
 
@pytest.mark.parametrize('arg', [
 
    'Assets:NoneSuchBank',
 
    'Funny money',
 
])
 
def test_main_invalid_account(caplog, arg):
 
    retcode, output, errors = run_main(['-a', arg])
 
    assert retcode == 2
 
    assert any(log.message.endswith(f': {arg!r}') for log in caplog.records)
 

	
 
def test_main_no_postings(caplog):
 
    retcode, output, errors = run_main(['NonexistentProject'])
 
    assert retcode == 24
 
    assert retcode == 18
 
    assert any(log.levelname == 'WARNING' for log in caplog.records)
0 comments (0 inline, 0 general)