Changeset - 35804db617a1
[Not reviewed]
0 7 0
Brett Smith - 4 years ago 2020-08-31 18:19:00
brettcsmith@brettcsmith.org
reports: All reports support rewrite rules.

I realized that if ledger-report supported rewrite rules, then it would
include all the information necessary to reproduce the numbers on the
statement of functional expenses.

With that, it was easy enough to add support to the rest of the reports for
consistency's sake.
7 files changed with 51 insertions and 20 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/cliutil.py
Show inline comments
...
 
@@ -74,96 +74,97 @@ class ExceptHook:
 
                 exc_tb: types.TracebackType,
 
    ) -> NoReturn:
 
        error_type = type(exc_value).__name__
 
        msg = ": ".join(str(arg) for arg in exc_value.args)
 
        if isinstance(exc_value, KeyboardInterrupt):
 
            signal.signal(signal.SIGINT, signal.SIG_DFL)
 
            os.kill(0, signal.SIGINT)
 
            signal.pause()
 
        elif isinstance(exc_value, (
 
                rt_error.AuthorizationError,
 
                rt_error.NotAllowed,
 
        )):
 
            exitcode = os.EX_NOPERM
 
            error_type = "RT access denied"
 
        elif isinstance(exc_value, rt_error.ConnectionError):
 
            exitcode = os.EX_TEMPFAIL
 
            error_type = "RT connection error"
 
        elif isinstance(exc_value, rt_error.RtError):
 
            exitcode = os.EX_UNAVAILABLE
 
            error_type = f"RT {error_type}"
 
        elif isinstance(exc_value, OSError):
 
            if exc_value.filename is None:
 
                exitcode = os.EX_OSERR
 
                error_type = "OS error"
 
                msg = exc_value.strerror
 
            else:
 
                # There are more specific exit codes for input problems vs.
 
                # output problems, but without knowing how the file was
 
                # intended to be used, we can't use them.
 
                exitcode = os.EX_IOERR
 
                error_type = "I/O error"
 
                msg = f"{exc_value.filename}: {exc_value.strerror}"
 
        else:
 
            exitcode = os.EX_SOFTWARE
 
            error_type = f"internal {error_type}"
 
        self.logger.critical("%s%s%s", error_type, ": " if msg else "", msg)
 
        self.logger.debug(
 
            ''.join(traceback.format_exception(exc_type, exc_value, exc_tb)),
 
        )
 
        raise SystemExit(exitcode)
 

	
 

	
 
class ExitCode(enum.IntEnum):
 
    # BSD exit codes commonly used
 
    NoConfiguration = os.EX_CONFIG
 
    NoConfig = NoConfiguration
 
    NoDataFiltered = os.EX_DATAERR
 
    NoDataLoaded = os.EX_NOINPUT
 
    RewriteRulesError = os.EX_DATAERR
 

	
 
    # Our own exit codes, working down from that range
 
    BeancountErrors = 63
 

	
 

	
 
class InfoAction(argparse.Action):
 
    def __call__(self,
 
                 parser: argparse.ArgumentParser,
 
                 namespace: argparse.Namespace,
 
                 values: Union[Sequence[Any], str, None]=None,
 
                 option_string: Optional[str]=None,
 
    ) -> NoReturn:
 
        if isinstance(self.const, str):
 
            info = self.const
 
            exitcode = 0
 
        else:
 
            info, exitcode = self.const
 
        print(info)
 
        raise SystemExit(exitcode)
 

	
 

	
 
class LogLevel(enum.IntEnum):
 
    DEBUG = logging.DEBUG
 
    INFO = logging.INFO
 
    WARNING = logging.WARNING
 
    ERROR = logging.ERROR
 
    CRITICAL = logging.CRITICAL
 
    WARN = WARNING
 
    ERR = ERROR
 
    CRIT = CRITICAL
 

	
 
    @classmethod
 
    def from_arg(cls, arg: str) -> int:
 
        try:
 
            return cls[arg.upper()].value
 
        except KeyError:
 
            raise ValueError(f"unknown loglevel {arg!r}") from None
 

	
 
    @classmethod
 
    def choices(cls) -> Iterable[str]:
 
        for level in sorted(cls, key=operator.attrgetter('value')):
 
            yield level.name.lower()
 

	
 

	
 
class SearchTerm(NamedTuple):
 
    """NamedTuple representing a user's metadata filter
 

	
 
    SearchTerm knows how to parse and store posting metadata filters provided
...
 
@@ -208,96 +209,107 @@ class SearchTerm(NamedTuple):
 
        def parse_search_term(arg: str) -> 'SearchTerm':
 
            key: Optional[str] = None
 
            if re.match(r'^[a-z][-\w]*=', arg):
 
                key, _, raw_link = arg.partition('=')
 
            else:
 
                raw_link = arg
 
            rt_ids = rtutil.RT.parse(raw_link)
 
            if rt_ids is None:
 
                rt_ids = rtutil.RT.parse('rt:' + raw_link)
 
            if rt_ids is None:
 
                if key is None:
 
                    key = default_key
 
                pattern = r'(?:^|\s){}(?:\s|$)'.format(re.escape(raw_link))
 
            else:
 
                ticket_id, attachment_id = rt_ids
 
                if key is None:
 
                    if attachment_id is None:
 
                        key = ticket_default_key
 
                    else:
 
                        key = default_key
 
                pattern = rtutil.RT.metadata_regexp(
 
                    ticket_id,
 
                    attachment_id,
 
                    first_link_only=key == 'rt-id' and attachment_id is None,
 
                )
 
            if key is None:
 
                raise ValueError(f"invalid search term {arg!r}: no metadata key")
 
            return cls(key, pattern)
 
        return parse_search_term
 

	
 
    def filter_postings(self, postings: Iterable[data.Posting]) -> Iterable[data.Posting]:
 
        return filters.filter_meta_match(
 
            postings, self.meta_key, re.compile(self.pattern),
 
        )
 

	
 

	
 
def add_loglevel_argument(parser: argparse.ArgumentParser,
 
                          default: LogLevel=LogLevel.INFO) -> argparse.Action:
 
    return parser.add_argument(
 
        '--loglevel',
 
        metavar='LEVEL',
 
        default=default.value,
 
        type=LogLevel.from_arg,
 
        help="Show logs at this level and above."
 
        f" Specify one of {', '.join(LogLevel.choices())}."
 
        f" Default {default.name.lower()}.",
 
    )
 

	
 
def add_rewrite_rules_argument(parser: argparse.ArgumentParser) -> argparse.Action:
 
    return parser.add_argument(
 
        '--rewrite-rules', '--rewrites', '-r',
 
        action='append',
 
        default=[],
 
        metavar='PATH',
 
        type=Path,
 
        help="""Use rewrite rules from the given YAML file. You can specify
 
this option multiple times to load multiple sets of rewrite rules in order.
 
""")
 

	
 
def add_version_argument(parser: argparse.ArgumentParser) -> argparse.Action:
 
    progname = parser.prog or sys.argv[0]
 
    return parser.add_argument(
 
        '--version', '--copyright', '--license',
 
        action=InfoAction,
 
        nargs=0,
 
        const=f"{progname} version {VERSION}\n{LICENSE}",
 
        help="Show program version and license information",
 
    )
 

	
 
def date_arg(arg: str) -> datetime.date:
 
    return datetime.datetime.strptime(arg, '%Y-%m-%d').date()
 

	
 
def diff_year(date: datetime.date, diff: int) -> datetime.date:
 
    new_year = date.year + diff
 
    try:
 
        return date.replace(year=new_year)
 
    except ValueError:
 
        # The original date is Feb 29, which doesn't exist in the new year.
 
        if diff < 0:
 
            return datetime.date(new_year, 2, 28)
 
        else:
 
            return datetime.date(new_year, 3, 1)
 

	
 
def year_or_date_arg(arg: str) -> Union[int, datetime.date]:
 
    """Get either a date or a year (int) from an argument string
 

	
 
    This is a useful argument type for arguments that will be passed into
 
    Books loader methods which can accept either a fiscal year or a full date.
 
    """
 
    try:
 
        year = int(arg, 10)
 
    except ValueError:
 
        ok = False
 
    else:
 
        ok = datetime.MINYEAR <= year <= datetime.MAXYEAR
 
    if ok:
 
        return year
 
    else:
 
        return date_arg(arg)
 

	
 
def make_entry_point(mod_name: str, prog_name: str=sys.argv[0]) -> Callable[[], int]:
 
    """Create an entry_point function for a tool
 

	
 
    The returned function is suitable for use as an entry_point in setup.py.
 
    It sets up the root logger and excepthook, then calls the module's main
 
    function.
 
    """
conservancy_beancount/reports/accrual.py
Show inline comments
...
 
@@ -70,96 +70,97 @@ import argparse
 
import collections
 
import datetime
 
import enum
 
import logging
 
import re
 
import sys
 

	
 
from pathlib import Path
 

	
 
from typing import (
 
    cast,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    Deque,
 
    Dict,
 
    Hashable,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    Match,
 
    NamedTuple,
 
    Optional,
 
    Sequence,
 
    Set,
 
    TextIO,
 
    Tuple,
 
    TypeVar,
 
    Union,
 
)
 
from ..beancount_types import (
 
    Entries,
 
    Error,
 
    Errors,
 
    MetaKey,
 
    MetaValue,
 
    Transaction,
 
)
 

	
 
import odf.element  # type:ignore[import]
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 
import rt
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from . import rewrite
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from .. import filters
 
from .. import rtutil
 

	
 
PROGNAME = 'accrual-report'
 

	
 
PostGroups = Mapping[Optional[Hashable], 'AccrualPostings']
 
T = TypeVar('T')
 

	
 
logger = logging.getLogger('conservancy_beancount.reports.accrual')
 

	
 
class Account(NamedTuple):
 
    name: str
 
    aging_thresholds: Sequence[int]
 

	
 

	
 
class AccrualAccount(enum.Enum):
 
    # Note the aging report uses the same order accounts are defined here.
 
    # See AgingODS.start_spreadsheet().
 
    RECEIVABLE = Account('Assets:Receivable', [365, 120, 90, 60])
 
    PAYABLE = Account('Liabilities:Payable', [365, 90, 60, 30])
 

	
 
    @classmethod
 
    def account_names(cls) -> Iterator[str]:
 
        return (acct.value.name for acct in cls)
 

	
 
    @classmethod
 
    def by_account(cls, name: data.Account) -> 'AccrualAccount':
 
        for account in cls:
 
            if name.is_under(account.value.name):
 
                return account
 
        raise ValueError(f"unrecognized account {name!r}")
 

	
 
    @classmethod
 
    def classify(cls, related: core.RelatedPostings) -> 'AccrualAccount':
 
        for account in cls:
 
            account_name = account.value.name
 
            if all(post.account.is_under(account_name) for post in related):
 
                return account
 
        raise ValueError("unrecognized account set in related postings")
 

	
 
    @property
 
    def normalize_amount(self) -> Callable[[T], T]:
 
        return core.normalize_amount_func(self.value.name)
 

	
...
 
@@ -602,173 +603,181 @@ class OutgoingReport(BaseReport):
 
                               or ticket.get('CF.{payment-method}')),
 
            'payment-to': payment_to,
 
        }
 

	
 
        cf_updates = {
 
            f'CF_{key}': value
 
            for key, value in cf_targets.items()
 
            if ticket.get(f'CF.{{{key}}}') != value
 
        }
 
        if cf_updates:
 
            try:
 
                ok = self.rt_client.edit_ticket(ticket_id, **cf_updates)
 
            except rt.RtError:
 
                self.logger.debug("RT exception on edit_ticket", exc_info=True)
 
                ok = False
 
            if not ok:
 
                self.logger.warning("failed to set custom fields for rt:%s", ticket_id)
 

	
 

	
 
class ReportType(enum.Enum):
 
    AGING = AgingReport
 
    BALANCE = BalanceReport
 
    OUTGOING = OutgoingReport
 
    AGE = AGING
 
    BAL = BALANCE
 
    OUT = OUTGOING
 
    OUTGOINGS = OUTGOING
 

	
 
    @classmethod
 
    def by_name(cls, name: str) -> 'ReportType':
 
        try:
 
            return cls[name.upper()]
 
        except KeyError:
 
            raise ValueError(f"unknown report type {name!r}") from None
 

	
 

	
 
def filter_search(postings: Iterable[data.Posting],
 
                  search_terms: Iterable[cliutil.SearchTerm],
 
) -> Iterable[data.Posting]:
 
    accounts = tuple(AccrualAccount.account_names())
 
    postings = (post for post in postings if post.account.is_under(*accounts))
 
    for query in search_terms:
 
        postings = query.filter_postings(postings)
 
    return postings
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='NAME',
 
        type=ReportType.by_name,
 
        help="""The type of report to generate, one of `aging`, `balance`, or
 
`outgoing`. If not specified, the default is `aging` when no search terms are
 
given, `outgoing` for search terms that return a single outstanding payable,
 
and `balance` any other time.
 
""")
 
    parser.add_argument(
 
        '--since',
 
        metavar='YEAR',
 
        type=int,
 
        default=0,
 
        help="""How far back to search the books for related transactions.
 
You can either specify a fiscal year, or a negative offset from the current
 
fiscal year, to start loading entries from. The default is to load the current,
 
unaudited books.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for the balance and outgoing reports, and a generated
 
filename for other reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('invoice', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on accruals that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other link, including an RT attachment link in
 
`TIK/ATT` format, is a shortcut for `invoice=LINK`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None and not any(
 
            term.meta_key == 'invoice' or term.meta_key == 'rt-id'
 
            for term in args.search_terms
 
    ):
 
        args.report_type = ReportType.AGING
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        load_since = None if args.report_type == ReportType.AGING else args.since
 
        entries, load_errors, _ = books_loader.load_all(load_since)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 
    filters.remove_opening_balance_txn(entries)
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 

	
 
    postings = list(filter_search(
 
        data.Posting.from_entries(entries), args.search_terms,
 
    ))
 
    postings_src = data.Posting.from_entries(entries)
 
    for rewrite_path in args.rewrite_rules:
 
        try:
 
            ruleset = rewrite.RewriteRuleset.from_yaml(rewrite_path)
 
        except ValueError as error:
 
            logger.critical("failed loading rewrite rules from %s: %s",
 
                            rewrite_path, error.args[0])
 
            return cliutil.ExitCode.RewriteRulesError
 
        postings_src = ruleset.rewrite(postings_src)
 
    postings = list(filter_search(postings_src, args.search_terms))
 
    if not postings:
 
        logger.warning("no matching entries found to report")
 
        returncode = returncode or cliutil.ExitCode.NoDataFiltered
 
    # groups is a mapping of metadata value strings to AccrualPostings.
 
    # The keys are basically arbitrary, the report classes don't rely on them,
 
    # but they do help symbolize what's being grouped.
 
    # For the outgoing approval report, groups maps rt-id link strings to
 
    # associated accruals.
 
    # For all other reports, groups comes from AccrualReport.make_consistent().
 
    groups: PostGroups
 
    if args.report_type is None or args.report_type is ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.group_by_first_meta_link(postings, 'rt-id'))
 
        if args.report_type is None and len(groups) == 1:
 
            key = next(iter(groups))
 
            group = groups[key]
 
            account = group[0].account
 
            if (AccrualAccount.by_account(account) is AccrualAccount.PAYABLE
 
                and all(post.account == account for post in group)
 
                and not group.balance().ge_zero()
 
                and key):  # Make sure we have a usable rt-id
 
                args.report_type = ReportType.OUTGOING
 
    if args.report_type is not ReportType.OUTGOING:
 
        groups = dict(AccrualPostings.make_consistent(postings))
 
    if args.report_type is not ReportType.AGING:
 
        groups = {
 
            key: posts for key, posts in groups.items() if not posts.is_paid()
 
        } or groups
 
    del postings
 

	
 
    report: Optional[BaseReport] = None
 
    output_path: Optional[Path] = None
 
    if args.report_type is ReportType.AGING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate aging report: RT client is required")
 
        else:
 
            if args.output_file is None:
 
                now = datetime.datetime.now()
 
                out_dir_path = config.repository_path() or Path()
 
                args.output_file = out_dir_path / now.strftime('AgingReport_%Y-%m-%d_%H:%M.ods')
 
                logger.info("Writing report to %s", args.output_file)
 
            out_bin = cliutil.bytes_output(args.output_file, stdout)
 
            report = AgingReport(rt_wrapper, out_bin)
 
            report.ods.set_common_properties(config.books_repo())
 
    elif args.report_type is ReportType.OUTGOING:
 
        rt_wrapper = config.rt_wrapper()
 
        if rt_wrapper is None:
 
            logger.error("unable to generate outgoing report: RT client is required")
conservancy_beancount/reports/balance_sheet.py
Show inline comments
 
"""balance_sheet.py - Balance sheet report"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import logging
 
import operator
 
import os
 
import sys
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
from typing import (
 
    Any,
 
    Callable,
 
    Collection,
 
    Dict,
 
    Hashable,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    NamedTuple,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
    Union,
 
)
 

	
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from . import rewrite
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from .. import ranges
 

	
 
EQUITY_ACCOUNTS = frozenset(['Equity', 'Income', 'Expenses'])
 
PROGNAME = 'balance-sheet-report'
 
logger = logging.getLogger('conservancy_beancount.reports.balance_sheet')
 

	
 
KWArgs = Mapping[str, Any]
 

	
 
class Fund(enum.IntFlag):
 
    RESTRICTED = enum.auto()
 
    UNRESTRICTED = enum.auto()
 
    ANY = RESTRICTED | UNRESTRICTED
 

	
 

	
...
 
@@ -588,147 +587,139 @@ class Report(core.BaseODS[Sequence[None], None]):
 
        header_row.insertBefore(self.multiline_cell(
 
            ["Balance Ending", self.period_name],
 
            stylename=header_row.lastChild.getAttribute('stylename'),
 
        ), header_row.lastChild)
 

	
 
        for acct_root in ['Assets', 'Liabilities', 'Income', 'Expenses', 'Equity']:
 
            norm_func = core.normalize_amount_func(f'{acct_root}:Dummy')
 
            want_balance = acct_root not in EQUITY_ACCOUNTS
 
            self.add_row()
 
            for account in self.balances.iter_accounts(acct_root):
 
                period_bal = self.balances.total(account=account, period=Period.PERIOD)
 
                prior_bal = self.balances.total(account=account, period=Period.PRIOR)
 
                if want_balance:
 
                    close_bal = self.balances.total(account=account)
 
                    close_cell = self.balance_cell(norm_func(close_bal))
 
                    open_cell = self.balance_cell(norm_func(close_bal - period_bal))
 
                else:
 
                    close_cell = odf.table.TableCell()
 
                    open_cell = odf.table.TableCell()
 
                self.add_row(
 
                    self.string_cell(account),
 
                    self.string_cell(account.meta.get('classification', '')),
 
                    open_cell,
 
                    self.balance_cell(norm_func(period_bal)),
 
                    close_cell,
 
                    self.balance_cell(norm_func(prior_bal)),
 
                )
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date.
 
""")
 
    parser.add_argument(
 
        '--rewrite-rules', '--rewrites', '-r',
 
        action='append',
 
        default=[],
 
        metavar='PATH',
 
        type=Path,
 
        help="""Use rewrite rules from the given YAML file. You can specify
 
this option multiple times to load multiple sets of rewrite rules in order.
 
""")
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--fund-metadata-key', '-m',
 
        metavar='KEY',
 
        default='project',
 
        help="""Name of the fund metadata key. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        '--unrestricted-fund', '-u',
 
        metavar='PROJECT',
 
        default='Conservancy',
 
        help="""Name of the unrestricted fund. Default %(default)s.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    return parser.parse_args(arglist)
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    if args.stop_date is None:
 
        if args.start_date is None:
 
            args.stop_date = datetime.date.today()
 
        else:
 
            args.stop_date = cliutil.diff_year(args.start_date, 1)
 
    if args.start_date is None:
 
        args.start_date = cliutil.diff_year(args.stop_date, -1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, options_map = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        start_fy = config.fiscal_year_begin().for_date(args.start_date) - 1
 
        entries, load_errors, options_map = books_loader.load_fy_range(start_fy, args.stop_date)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 

	
 
    data.Account.load_from_books(entries, options_map)
 
    postings = data.Posting.from_entries(entries)
 
    for rewrite_path in args.rewrite_rules:
 
        try:
 
            ruleset = rewrite.RewriteRuleset.from_yaml(rewrite_path)
 
        except ValueError as error:
 
            logger.critical("failed loading rewrite rules from %s: %s",
 
                            rewrite_path, error.args[0])
 
            return os.EX_DATAERR
 
            return cliutil.ExitCode.RewriteRulesError
 
        postings = ruleset.rewrite(postings)
 

	
 
    balances = Balances(
 
        postings,
 
        args.start_date,
 
        args.stop_date,
 
        args.fund_metadata_key,
 
        args.unrestricted_fund,
 
    )
 
    report = Report(balances)
 
    report.set_common_properties(config.books_repo())
 
    report.write_all()
 
    if args.output_file is None:
 
        out_dir_path = config.repository_path() or Path()
 
        args.output_file = out_dir_path / 'BalanceSheet_{}_{}.ods'.format(
 
            args.start_date.isoformat(), args.stop_date.isoformat(),
 
        )
 
        logger.info("Writing report to %s", args.output_file)
 
    ods_file = cliutil.bytes_output(args.output_file, stdout)
 
    report.save_file(ods_file)
 
    return returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/reports/fund.py
Show inline comments
...
 
@@ -29,96 +29,97 @@ Query a specific restricted fund and get a quick balance on the terminal::
 
      fund-report <PROJECT>
 
"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import locale
 
import logging
 
import sys
 

	
 
from typing import (
 
    Dict,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    Optional,
 
    Sequence,
 
    TextIO,
 
    Tuple,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaValue,
 
)
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
import odf.table  # type:ignore[import]
 

	
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from . import rewrite
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 

	
 
AccountsMap = Mapping[data.Account, core.PeriodPostings]
 
FundPosts = Tuple[MetaValue, AccountsMap]
 

	
 
EQUITY_ACCOUNTS = ['Income', 'Expenses', 'Equity']
 
INFO_ACCOUNTS = [
 
    'Assets:Receivable',
 
    'Assets:Prepaid',
 
    'Liabilities:UnearnedIncome',
 
    'Liabilities:Payable',
 
]
 
PROGNAME = 'fund-report'
 
UNRESTRICTED_FUND = 'Conservancy'
 
logger = logging.getLogger('conservancy_beancount.reports.fund')
 

	
 
class ODSReport(core.BaseODS[FundPosts, None]):
 
    def __init__(self, start_date: datetime.date, stop_date: datetime.date) -> None:
 
        super().__init__()
 
        self.start_date = start_date
 
        self.stop_date = stop_date
 

	
 
    def section_key(self, row: FundPosts) -> None:
 
        return None
 

	
 
    def start_spreadsheet(self, *, expanded: bool=True) -> None:
 
        headers = [["Fund"], ["Balance as of", self.start_date.isoformat()]]
 
        if expanded:
 
            sheet_name = "With Breakdowns"
 
            headers.extend([acct] for acct in EQUITY_ACCOUNTS)
 
        else:
 
            sheet_name = "Fund Report"
 
            headers += [["Additions"], ["Releases from", "Restrictions"]]
 
        headers.append(["Balance as of", self.stop_date.isoformat()])
 
        if expanded:
 
            headers += [
 
                ["Of which", "Receivable"],
 
                ["Of which", "Prepaid Expenses"],
 
                ["Of which", "Payable"],
 
                ["Of which", "Unearned Income"],
 
            ]
 

	
 
        self.use_sheet(sheet_name)
 
        for header in headers:
 
            first_line = header[0]
...
 
@@ -263,160 +264,169 @@ class TextReport:
 
        fund_end = f' balance as of {self.stop_date.isoformat()}'
 
        for acct_s, bal_seq in output:
 
            is_end_bal = acct_s.endswith(fund_end)
 
            if is_end_bal or acct_s.endswith(fund_start):
 
                print(line_fmt.format('─' * acct_width, '─' * bal_width),
 
                      file=self.out_file)
 
            bal_iter = iter(bal_seq)
 
            print(line_fmt.format(acct_s, next(bal_iter)), file=self.out_file)
 
            for bal_s in bal_iter:
 
                print(line_fmt.format('', bal_s), file=self.out_file)
 
            if is_end_bal:
 
                print(line_fmt.format('═' * acct_width, '═' * bal_width),
 
                      file=self.out_file)
 

	
 

	
 
class ReportType(enum.Enum):
 
    TEXT = TextReport
 
    ODS = ODSReport
 
    TXT = TEXT
 
    SPREADSHEET = ODS
 

	
 
    @classmethod
 
    def from_arg(cls, s: str) -> 'ReportType':
 
        try:
 
            return cls[s.upper()]
 
        except KeyError:
 
            raise ValueError(f"no report type matches {s!r}") from None
 

	
 

	
 
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
 
    parser = argparse.ArgumentParser(prog=PROGNAME)
 
    cliutil.add_version_argument(parser)
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date.
 
""")
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--report-type', '-t',
 
        metavar='TYPE',
 
        type=ReportType.from_arg,
 
        help="""Type of report to generate. `text` gives a plain two-column text
 
report listing accounts and balances over the period, and is the default when
 
you search for a specific project/fund. `ods` produces a higher-level
 
spreadsheet, meant to provide an overview of all funds, and is the default when
 
you don't specify a project/fund.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is stdout for text reports, and a generated filename for ODS
 
reports.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.report_type is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.report_type = ReportType.TEXT
 
        else:
 
            args.report_type = ReportType.ODS
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    if args.stop_date is None:
 
        if args.start_date is None:
 
            args.stop_date = datetime.date.today()
 
        else:
 
            args.stop_date = cliutil.diff_year(args.start_date, 1)
 
    if args.start_date is None:
 
        args.start_date = cliutil.diff_year(args.stop_date, -1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, _ = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        entries, load_errors, _ = books_loader.load_fy_range(args.start_date, args.stop_date)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 

	
 
    postings = (
 
    postings = iter(
 
        post
 
        for post in data.Posting.from_entries(entries)
 
        if post.meta.date < args.stop_date
 
    )
 
    for rewrite_path in args.rewrite_rules:
 
        try:
 
            ruleset = rewrite.RewriteRuleset.from_yaml(rewrite_path)
 
        except ValueError as error:
 
            logger.critical("failed loading rewrite rules from %s: %s",
 
                            rewrite_path, error.args[0])
 
            return cliutil.ExitCode.RewriteRulesError
 
        postings = ruleset.rewrite(postings)
 
    for search_term in args.search_terms:
 
        postings = search_term.filter_postings(postings)
 
    fund_postings = {
 
        key: related
 
        for key, related in core.RelatedPostings.group_by_meta(postings, 'project')
 
        if isinstance(key, str)
 
    }
 
    period_cls = core.PeriodPostings.with_start_date(args.start_date)
 
    fund_map = collections.OrderedDict(
 
        (fund, dict(period_cls.group_by_account(fund_postings[fund])))
 
        for fund in sorted(fund_postings, key=lambda s: locale.strxfrm(s.casefold()))
 
    )
 
    if not fund_map:
 
        logger.warning("no matching postings found to report")
 
        returncode = returncode or cliutil.ExitCode.NoDataFiltered
 
    elif args.report_type is ReportType.TEXT:
 
        out_file = cliutil.text_output(args.output_file, stdout)
 
        report = TextReport(args.start_date, args.stop_date, out_file)
 
        report.write(fund_map.items())
 
    else:
 
        ods_report = ODSReport(args.start_date, args.stop_date)
 
        ods_report.set_common_properties(config.books_repo())
 
        ods_report.write(fund_map.items())
 
        if args.output_file is None:
 
            out_dir_path = config.repository_path() or Path()
 
            args.output_file = out_dir_path / 'FundReport_{}_{}.ods'.format(
 
                args.start_date.isoformat(), args.stop_date.isoformat(),
 
            )
 
            logger.info("Writing report to %s", args.output_file)
 
        ods_file = cliutil.bytes_output(args.output_file, stdout)
 
        ods_report.save_file(ods_file)
 
    return returncode
 

	
 
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
 

	
 
if __name__ == '__main__':
 
    exit(entry_point())
conservancy_beancount/reports/ledger.py
Show inline comments
...
 
@@ -33,96 +33,97 @@ Get all Assets postings for a given month to help with reconciliation::
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import argparse
 
import collections
 
import datetime
 
import enum
 
import itertools
 
import operator
 
import logging
 
import sys
 

	
 
from typing import (
 
    Any,
 
    Callable,
 
    Dict,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    Optional,
 
    Sequence,
 
    Set,
 
    TextIO,
 
    Tuple,
 
    Union,
 
)
 
from ..beancount_types import (
 
    Transaction,
 
)
 

	
 
from pathlib import Path
 

	
 
import odf.table  # type:ignore[import]
 

	
 
from beancount.core import data as bc_data
 
from beancount.parser import printer as bc_printer
 

	
 
from . import core
 
from . import rewrite
 
from .. import books
 
from .. import cliutil
 
from .. import config as configmod
 
from .. import data
 
from .. import ranges
 
from .. import rtutil
 

	
 
PostTally = List[Tuple[int, data.Account]]
 

	
 
PROGNAME = 'ledger-report'
 
logger = logging.getLogger('conservancy_beancount.reports.ledger')
 

	
 
class LedgerODS(core.BaseODS[data.Posting, None]):
 
    CORE_COLUMNS: Sequence[str] = [
 
        'Date',
 
        data.Metadata.human_name('entity'),
 
        'Description',
 
        'Original Amount',
 
        'Booked Amount',
 
    ]
 
    ACCOUNT_COLUMNS: Dict[str, Sequence[str]] = collections.OrderedDict([
 
        ('Income', ['project', 'rt-id', 'receipt', 'income-type', 'memo']),
 
        ('Expenses', ['project', 'rt-id', 'receipt', 'approval', 'expense-type']),
 
        ('Equity', ['project', 'rt-id']),
 
        ('Assets:Receivable', ['project', 'rt-id', 'invoice', 'approval', 'contract', 'purchase-order']),
 
        ('Liabilities:Payable', ['project', 'rt-id', 'invoice', 'approval', 'contract', 'purchase-order']),
 
        ('Assets:PayPal', ['rt-id', 'paypal-id', 'receipt', 'approval']),
 
        ('Assets', ['rt-id', 'receipt', 'approval', 'bank-statement']),
 
        ('Liabilities', ['rt-id', 'receipt', 'approval', 'bank-statement']),
 
    ])
 
    # Excel 2003 was limited to 65,536 rows per worksheet.
 
    # While we can probably count on all our users supporting more modern
 
    # formats (Excel 2007 supports over 1 million rows per worksheet),
 
    # keeping the default limit conservative seems good to avoid running into
 
    # other limits (like the number of hyperlinks per worksheet), plus just
 
    # better for human organization and readability.
 
    SHEET_SIZE = 65500
 

	
 
    def __init__(self,
 
                 start_date: datetime.date,
 
                 stop_date: datetime.date,
 
                 accounts: Optional[Sequence[str]]=None,
 
                 rt_wrapper: Optional[rtutil.RT]=None,
 
                 sheet_size: Optional[int]=None,
 
                 totals_with_entries: Optional[Sequence[str]]=None,
 
                 totals_without_entries: Optional[Sequence[str]]=None,
 
    ) -> None:
 
        if sheet_size is None:
...
 
@@ -656,192 +657,201 @@ def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace
 
        nargs=0,
 
        help="""Shortcut to set all the necessary options to generate a cash
 
disbursements report.
 
""")
 
    parser.add_argument(
 
        '--receipts',
 
        action=CashReportAction,
 
        const=TransactionFilter.CREDIT,
 
        nargs=0,
 
        help="""Shortcut to set all the necessary options to generate a cash
 
receipts report.
 
""")
 
    parser.add_argument(
 
        '--begin', '--start', '-b',
 
        dest='start_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to start reporting entries, inclusive, in YYYY-MM-DD format.
 
The default is one year ago.
 
""")
 
    parser.add_argument(
 
        '--end', '--stop', '-e',
 
        dest='stop_date',
 
        metavar='DATE',
 
        type=cliutil.date_arg,
 
        help="""Date to stop reporting entries, exclusive, in YYYY-MM-DD format.
 
The default is a year after the start date, or 30 days from today if the start
 
date was also not specified.
 
""")
 
    parser.add_argument(
 
        '--transactions', '-t',
 
        dest='txn_filter',
 
        metavar='TYPE',
 
        type=TransactionFilter.from_arg,
 
        help="""Report whole transactions rather than individual postings.
 
The type argument selects which type of transactions to report. Choices are
 
credit, debit, or all.
 
""")
 
    parser.add_argument(
 
        '--account', '-a',
 
        dest='accounts',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""Show this account in the report. You can specify this option
 
multiple times. You can specify a part of the account hierarchy, or an account
 
classification from metadata. If not specified, the default set adapts to your
 
search criteria.
 
""")
 
    cliutil.add_rewrite_rules_argument(parser)
 
    parser.add_argument(
 
        '--show-totals', '-S',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""When entries for this account appear in the report, include
 
account balance(s) as well. You can specify this option multiple times. Pass in
 
a part of the account hierarchy. The default is all accounts.
 
""")
 
    parser.add_argument(
 
        '--add-totals', '-T',
 
        metavar='ACCOUNT',
 
        action='append',
 
        help="""When an account could be included in the report but does not
 
have any entries in the date range, include a header and account balance(s) for
 
it. You can specify this option multiple times. Pass in a part of the account
 
hierarchy. The default set adapts to your search criteria.
 
""")
 
    parser.add_argument(
 
        '--sheet-size', '--size',
 
        metavar='SIZE',
 
        type=int,
 
        default=LedgerODS.SHEET_SIZE,
 
        help="""Try to limit sheets to this many rows. The report will
 
automatically create new sheets to make this happen. When that's not possible,
 
it will issue a warning.
 
""")
 
    parser.add_argument(
 
        '--output-file', '-O',
 
        metavar='PATH',
 
        type=Path,
 
        help="""Write the report to this file, or stdout when PATH is `-`.
 
The default is `LedgerReport_<StartDate>_<StopDate>.ods`.
 
""")
 
    cliutil.add_loglevel_argument(parser)
 
    parser.add_argument(
 
        'search_terms',
 
        metavar='FILTER',
 
        type=cliutil.SearchTerm.arg_parser('project', 'rt-id'),
 
        nargs=argparse.ZERO_OR_MORE,
 
        help="""Report on postings that match this criteria. The format is
 
NAME=TERM. TERM is a link or word that must exist in a posting's NAME
 
metadata to match. A single ticket number is a shortcut for
 
`rt-id=rt:NUMBER`. Any other word is a shortcut for `project=TERM`.
 
""")
 
    args = parser.parse_args(arglist)
 
    if args.add_totals is None and args.search_terms:
 
        args.add_totals = []
 
    if args.accounts is None:
 
        if any(term.meta_key == 'project' for term in args.search_terms):
 
            args.accounts = [
 
                'Income',
 
                'Expenses',
 
                'Assets:Receivable',
 
                'Assets:Prepaid',
 
                'Liabilities:UnearnedIncome',
 
                'Liabilities:Payable',
 
            ]
 
        else:
 
            args.accounts = list(LedgerODS.ACCOUNT_COLUMNS)
 
    return args
 

	
 
def main(arglist: Optional[Sequence[str]]=None,
 
         stdout: TextIO=sys.stdout,
 
         stderr: TextIO=sys.stderr,
 
         config: Optional[configmod.Config]=None,
 
) -> int:
 
    args = parse_arguments(arglist)
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if config is None:
 
        config = configmod.Config()
 
        config.load_file()
 

	
 
    today = datetime.date.today()
 
    if args.start_date is None:
 
        args.start_date = cliutil.diff_year(today, -1)
 
        if args.stop_date is None:
 
            args.stop_date = today + datetime.timedelta(days=30)
 
    elif args.stop_date is None:
 
        args.stop_date = cliutil.diff_year(args.start_date, 1)
 

	
 
    returncode = 0
 
    books_loader = config.books_loader()
 
    if books_loader is None:
 
        entries, load_errors, options = books.Loader.load_none(config.config_file_path())
 
        returncode = cliutil.ExitCode.NoConfiguration
 
    else:
 
        entries, load_errors, options = books_loader.load_fy_range(args.start_date, args.stop_date)
 
        if load_errors:
 
            returncode = cliutil.ExitCode.BeancountErrors
 
        elif not entries:
 
            returncode = cliutil.ExitCode.NoDataLoaded
 
    for error in load_errors:
 
        bc_printer.print_error(error, file=stderr)
 

	
 
    data.Account.load_from_books(entries, options)
 
    postings = data.Posting.from_entries(entries)
 
    for rewrite_path in args.rewrite_rules:
 
        try:
 
            ruleset = rewrite.RewriteRuleset.from_yaml(rewrite_path)
 
        except ValueError as error:
 
            logger.critical("failed loading rewrite rules from %s: %s",
 
                            rewrite_path, error.args[0])
 
            return cliutil.ExitCode.RewriteRulesError
 
        postings = ruleset.rewrite(postings)
 
    for search_term in args.search_terms:
 
        postings = search_term.filter_postings(postings)
 

	
 
    rt_wrapper = config.rt_wrapper()
 
    if rt_wrapper is None:
 
        logger.warning("could not initialize RT client; spreadsheet links will be broken")
 
    try:
 
        if args.txn_filter is None:
 
            report = LedgerODS(
 
                args.start_date,
 
                args.stop_date,
 
                args.accounts,
 
                rt_wrapper,
 
                args.sheet_size,
 
                args.show_totals,
 
                args.add_totals,
 
            )
 
        else:
 
            report = TransactionODS(
 
                args.start_date,
 
                args.stop_date,
 
                args.accounts,
 
                rt_wrapper,
 
                args.sheet_size,
 
                args.show_totals,
 
                args.add_totals,
 
                args.txn_filter,
 
            )
 
    except ValueError as error:
 
        logger.error("%s: %r", *error.args)
 
        return 2
 
    report.set_common_properties(config.books_repo())
 
    report.write(postings)
 
    if not any(report.account_groups.values()):
 
        logger.warning("no matching postings found to report")
 
        returncode = returncode or cliutil.ExitCode.NoDataFiltered
 

	
 
    if args.output_file is None:
 
        out_dir_path = config.repository_path() or Path()
 
        args.output_file = out_dir_path / '{}Report_{}_{}.ods'.format(
 
            report.report_name,
 
            args.start_date.isoformat(),
 
            args.stop_date.isoformat(),
 
        )
 
        logger.info("Writing report to %s", args.output_file)
 
    ods_file = cliutil.bytes_output(args.output_file, stdout)
 
    report.save_file(ods_file)
 
    return returncode
conservancy_beancount/tools/audit_report.py
Show inline comments
...
 
@@ -151,112 +151,111 @@ def main(arglist: Optional[Sequence[str]]=None,
 
    cliutil.set_loglevel(logger, args.loglevel)
 
    if args.verbose:
 
        logger.setLevel(logging.DEBUG)
 
        reports_logger = logging.getLogger('conservancy_beancount.reports')
 
        reports_logger.setLevel(logging.DEBUG)
 
        reports_logger.getChild('rewrite').setLevel(args.loglevel)
 

	
 
    fy = config.fiscal_year_begin()
 
    today = datetime.date.today()
 
    if args.audit_year is None:
 
        args.audit_year = fy.for_date(today) - 1
 
    audit_begin = fy.first_date(args.audit_year)
 
    audit_end = fy.next_fy_date(args.audit_year)
 
    if args.end_date is None:
 
        days_diff = (today - audit_end).days
 
        if days_diff < (28 * 2):
 
            args.end_date = today
 
        elif days_diff >= 365:
 
            args.end_date = fy.next_fy_date(args.audit_year + 1)
 
        else:
 
            end_date = today - datetime.timedelta(days=today.day + 1)
 
            args.end_date = end_date.replace(day=1)
 
    if args.end_date < audit_end:
 
        args.arg_error("end date is within audited fiscal year")
 
    next_year = fy.for_date(args.end_date)
 
    repo_path = config.repository_path()
 

	
 
    if args.output_directory is None:
 
        args.output_directory = Path(tempfile.mkdtemp(
 
            prefix=f'FY{args.audit_year}AuditReports.', dir=repo_path,
 
        ))
 
        logger.info("writing reports to %s", args.output_directory)
 
    else:
 
        args.output_directory.mkdir(exist_ok=True)
 
    output_reports: List[Path] = []
 
    def common_args(out_name: str, year: Optional[int]=None, *arglist: str) -> Iterator[str]:
 
        if year is not None:
 
            out_name = f'FY{year}{out_name}.ods'
 
        if year == args.audit_year:
 
            yield f'--begin={audit_begin.isoformat()}'
 
            yield f'--end={audit_end.isoformat()}'
 
        elif year == next_year:
 
            yield f'--begin={audit_end.isoformat()}'
 
            yield f'--end={args.end_date.isoformat()}'
 
        elif year is not None:
 
            raise ValueError(f"unknown year {year!r}")
 
        out_path = args.output_directory / out_name
 
        output_reports.append(out_path)
 
        for path in args.rewrite_rules:
 
            yield f'--rewrite-rules={path}'
 
        yield f'--output-file={out_path}'
 
        yield from arglist
 
    reports: List[Tuple[ReportFunc, ArgList]] = [
 
        # Reports are sorted roughly in descending order of how long each takes
 
        # to generate.
 
        (ledger.main, list(common_args('GeneralLedger', args.audit_year))),
 
        (ledger.main, list(common_args('GeneralLedger', next_year))),
 
        (ledger.main, list(common_args('Disbursements', args.audit_year, '--disbursements'))),
 
        (ledger.main, list(common_args('Receipts', args.audit_year, '--receipts'))),
 
        (ledger.main, list(common_args('Disbursements', next_year, '--disbursements'))),
 
        (ledger.main, list(common_args('Receipts', next_year, '--receipts'))),
 
        (accrual.main, list(common_args('AgingReport.ods'))),
 
        (balance_sheet.main, list(common_args(
 
            'Summary', args.audit_year,
 
            *(f'--rewrite-rules={path}' for path in args.rewrite_rules),
 
        ))),
 
        (balance_sheet.main, list(common_args('Summary', args.audit_year))),
 
        (fund.main, list(common_args('FundReport', args.audit_year))),
 
        (fund.main, list(common_args('FundReport', next_year))),
 
    ]
 

	
 
    returncode = 0
 
    books = config.books_loader()
 
    if books is None:
 
        logger.critical("no books available to load")
 
        return os.EX_NOINPUT
 

	
 
    with multiprocessing.Pool(args.jobs, maxtasksperchild=1) as pool:
 
        logger.debug("%s: process pool ready with %s workers", now_s(), args.jobs)
 
        fy_paths = books._iter_fy_books(fy.range(args.audit_year - 1, args.end_date))
 
        check_results = pool.imap_unordered(bean_check, fy_paths)
 
        if all(exitcode == 0 for exitcode in check_results):
 
            logger.debug("%s: bean-check passed", now_s())
 
        else:
 
            logger.log(
 
                logging.WARNING if args.force else logging.ERROR,
 
                "%s: bean-check failed",
 
                now_s(),
 
            )
 
            if not args.force:
 
                return os.EX_DATAERR
 

	
 
        report_results = [
 
            pool.apply_async(report_func, (arglist,), {'config': config})
 
            for report_func, arglist in reports
 
        ]
 
        report_errors = [res.get() for res in report_results if res.get() != 0]
 
        if not report_errors:
 
            logger.debug("%s: all reports generated", now_s())
 
        else:
 
            logger.error("%s: %s reports generated errors", now_s(), len(report_errors))
 
            if not args.force:
 
                return max(report_errors)
 

	
 
    missing_reports = frozenset(
 
        path for path in output_reports if not path.exists()
 
    )
 
    if missing_reports:
 
        logger.error("missing expected reports: %s",
 
                     ', '.join(path.name for path in missing_reports))
 
        if not args.force or len(missing_reports) == len(output_reports):
 
            return os.EX_UNAVAILABLE
 

	
 
    arglist = [f'--delimiter={args.delimiter}']
 
    if repo_path is not None:
setup.py
Show inline comments
 
#!/usr/bin/env python3
 

	
 
from setuptools import setup
 

	
 
setup(
 
    name='conservancy_beancount',
 
    description="Plugin, library, and reports for reading Conservancy's books",
 
    version='1.9.0',
 
    version='1.9.1',
 
    author='Software Freedom Conservancy',
 
    author_email='info@sfconservancy.org',
 
    license='GNU AGPLv3+',
 

	
 
    install_requires=[
 
        'babel>=2.6',  # Debian:python3-babel
 
        'beancount>=2.2',  # Debian:beancount
 
        'GitPython>=2.0',  # Debian:python3-git
 
        # 1.4.1 crashes when trying to save some documents.
 
        'odfpy>=1.4.0,!=1.4.1',  # Debian:python3-odf
 
        'PyYAML>=3.0',  # Debian:python3-yaml
 
        'regex',  # Debian:python3-regex
 
        'rt>=2.0',
 
    ],
 
    setup_requires=[
 
        'pytest-mypy',
 
        'pytest-runner',  # Debian:python3-pytest-runner
 
    ],
 
    tests_require=[
 
        'mypy>=0.770',  # Debian:python3-mypy
 
        'pytest',  # Debian:python3-pytest
 
    ],
 

	
 
    packages=[
 
        'conservancy_beancount',
 
        'conservancy_beancount.plugin',
 
        'conservancy_beancount.reports',
 
    ],
 
    entry_points={
 
        'console_scripts': [
 
            'accrual-report = conservancy_beancount.reports.accrual:entry_point',
 
            'assemble-audit-reports = conservancy_beancount.tools.audit_report:entry_point',
 
            'balance-sheet-report = conservancy_beancount.reports.balance_sheet:entry_point',
 
            'extract-odf-links = conservancy_beancount.tools.extract_odf_links:entry_point',
 
            'fund-report = conservancy_beancount.reports.fund:entry_point',
 
            'ledger-report = conservancy_beancount.reports.ledger:entry_point',
 
            'opening-balances = conservancy_beancount.tools.opening_balances:entry_point',
 
        ],
 
    },
 
)
0 comments (0 inline, 0 general)