Files @ 6deaacb11bdd
Branch filter:

Location: NPO-Accounting/conservancy_beancount/conservancy_beancount/books.py

bkuhn
Add US:TN:Unemployment as a valid `payroll-type` metadata for taxes
"""books - Tools for loading the books

This module provides common functionality for loading books split by fiscal
year and doing common operations on the results.
"""
# Copyright © 2020  Brett Smith
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
#
# Full copyright and licensing details can be found at toplevel file
# LICENSE.txt in the repository.

import datetime

from pathlib import Path

import beancount.loader as bc_loader
import beancount.parser.options as bc_options
import beancount.parser.printer as bc_printer

from . import cliutil
from . import data
from .reports import rewrite

from typing import (
    Any,
    Iterable,
    Iterator,
    Mapping,
    NamedTuple,
    Optional,
    Set,
    TextIO,
    Union,
)
from .beancount_types import (
    Entries,
    Error,
    Errors,
    OptionsMap,
)

PathLike = Union[str, Path]
Year = Union[int, datetime.date]

class FiscalYear(NamedTuple):
    """Convert to and from fiscal years and calendar dates

    Given a month and date that a fiscal year starts, this class provides
    methods to calculate the fiscal year of a given calendar date; to return
    important calendar dates associated with the fiscal year; and iterate
    fiscal years.

    Most methods can accept either an int, representing a fiscal year;
    or a date. When you pass a date, the method will calculate that date's
    corresponding fiscal year, and use it as the argument.
    """
    month: int = 3
    day: int = 1

    def for_date(self, date: Optional[datetime.date]=None) -> int:
        """Return the fiscal year of a given calendar date

        The default date is today's date.
        """
        if date is None:
            date = datetime.date.today()
        if (date.month, date.day) < self:
            return date.year - 1
        else:
            return date.year

    def first_date(self, year: Year) -> datetime.date:
        """Return the first calendar date of a fiscal year"""
        if isinstance(year, datetime.date):
            year = self.for_date(year)
        return datetime.date(year, self.month, self.day)

    def last_date(self, year: Year) -> datetime.date:
        """Return the last calendar date of a fiscal year"""
        return self.next_fy_date(year) - datetime.timedelta(days=1)

    def next_fy_date(self, year: Year) -> datetime.date:
        """Return the last calendar date of a fiscal year"""
        if isinstance(year, datetime.date):
            year = self.for_date(year)
        return datetime.date(year + 1, self.month, self.day)

    def range(self, from_fy: Year, to_fy: Optional[Year]=None) -> Iterable[int]:
        """Return a range of fiscal years

        Both arguments can be either a year (represented as an integer) or a
        date. Dates will be converted into a year by calling for_date() on
        them.

        If the first argument is negative or below 1000, it will be treated as
        an offset. You'll get a range of fiscal years between the second
        argument offset by this amount.

        If the second argument is omitted, it defaults to the current fiscal
        year.

        Note that unlike normal Python ranges, these ranges include the final
        fiscal year.

        Examples:

          range(2015)  # Iterate all fiscal years from 2015 to today, inclusive

          range(-1)  # Iterate the previous fiscal year and current fiscal year
        """
        if not isinstance(from_fy, int):
            from_fy = self.for_date(from_fy)
        if to_fy is None:
            to_fy = self.for_date()
        elif not isinstance(to_fy, int):
            to_fy = self.for_date(to_fy - datetime.timedelta(days=1))
        if from_fy < 1:
            from_fy += to_fy
        elif from_fy < 1000:
            from_fy, to_fy = to_fy, from_fy + to_fy
        return range(from_fy, to_fy + 1)


class LoadResult(NamedTuple):
    """Common functionality for loaded books

    This class is type-compatible with the return value of the loader
    functions in ``beancount.loader``. This provides named access to the
    results, as well as common functionality methods.
    """
    entries: Entries
    errors: Errors
    options_map: OptionsMap

    @classmethod
    def empty(cls, error: Optional[Error]=None) -> 'LoadResult':
        """Create a return result that represents nothing loaded

        If an error is provided, it will be the sole error reported.

        This method is useful to create a LoadResult when one can't be
        created normally; e.g., because a books path is not properly configured.
        """
        errors: Errors = []
        if error is not None:
            errors.append(error)
        return cls([], errors, bc_options.OPTIONS_DEFAULTS.copy())

    def iter_postings(
            self,
            rewrites: Iterable[Union[Path, rewrite.RewriteRuleset]]=(),
            search_terms: Iterable[cliutil.SearchTerm]=(),
    ) -> Iterator[data.Posting]:
        """Iterate all the postings in this LoadResult

        If ``rewrites`` are provided, postings will be passed through them all.
        See the ``reports.rewrite`` pydoc for details.

        If ``search_terms`` are provided, postings will be filtered through
        them all. See the ``cliutil.SearchTerm`` pydoc for details.
        """
        postings = data.Posting.from_entries(self.entries)
        for ruleset in rewrites:
            if isinstance(ruleset, Path):
                ruleset = rewrite.RewriteRuleset.from_yaml(ruleset)
            postings = ruleset.rewrite(postings)
        for search_term in search_terms:
            postings = search_term.filter_postings(postings)
        return postings

    def load_account_metadata(self) -> None:
        """Load account metadata from this LoadResult"""
        return data.Account.load_from_books(self.entries, self.options_map)

    def print_errors(self, out_file: TextIO) -> bool:
        """Report errors from this LoadResult to ``out_file``

        Returns True if errors were reported, False otherwise.
        """
        for error in self.errors:
            bc_printer.print_error(error, file=out_file)
        try:
            error
        except NameError:
            return False
        else:
            return True

    def returncode(self) -> int:
        """Return an appropriate Unix exit code for this LoadResult

        If this LoadResult has errors, or no entries, return an exit code that
        best represents that. Otherwise, return the standard OK exit code 0.
        """
        if self.errors:
            if self.entries:
                return cliutil.ExitCode.BeancountErrors
            else:
                return cliutil.ExitCode.NoConfiguration
        elif not self.entries:
            return cliutil.ExitCode.NoDataLoaded
        else:
            return cliutil.ExitCode.OK


class Loader:
    """Load Beancount books organized by fiscal year"""

    def __init__(self,
                 books_root: Path,
                 fiscal_year: FiscalYear,
    ) -> None:
        """Set up a books loader

        Arguments:
        * books_root: A Path to a Beancount books checkout.
        * fiscal_year: A FiscalYear object, used to determine what books to
          load for a given date range.
        """
        self.books_root = books_root
        self.fiscal_year = fiscal_year

    def _iter_fy_books(self, fy_range: Iterable[int]) -> Iterator[Path]:
        for year in fy_range:
            path = Path(self.books_root, 'books', f'{year}.beancount')
            if path.exists():
                yield path

    def _load_paths(self, paths: Iterator[Path]) -> LoadResult:
        try:
            result = LoadResult._make(bc_loader.load_file(next(paths)))
        except StopIteration:
            result = LoadResult.empty()
        seen_files: Set[str] = set(result.options_map['include'])
        for load_path in paths:
            new_entries, new_errors, new_options = bc_loader.load_file(load_path)
            # We only want transactions from the new fiscal year.
            # We don't want the opening balance, duplicate definitions, etc.
            seen_files.add(new_options['filename'])
            result.entries.extend(
                entry for entry in new_entries
                if entry.meta.get('filename') not in seen_files
            )
            result.errors.extend(new_errors)
            seen_files.update(new_options['include'])
        result.options_map['include'] = list(seen_files)
        return result

    def _path_year(self, path: Path) -> int:
        return int(path.stem)

    def load_all(self, from_year: Optional[Year]=None) -> LoadResult:
        """Load all of the books from a starting FY

        This method loads all of the books, starting from the fiscal year you
        specify.

        * Pass in a date to start from the FY for that date.
        * Pass in an integer >= 1000 to start from that year.
        * Pass in a smaller integer to start from an FY relative to today
          (e.g., -2 starts two FYs before today).
        * Pass is no argument to load all books from the first available FY.

        This method finds books by globbing the filesystem. It still loads
        each fiscal year in sequence to provide the best cache utilization.
        """
        path = Path(self.books_root, 'books')
        fy_paths = list(path.glob('[1-9][0-9][0-9][0-9].beancount'))
        fy_paths.sort(key=self._path_year)
        if from_year is not None:
            if not isinstance(from_year, int):
                from_year = self.fiscal_year.for_date(from_year)
            elif from_year < 1000:
                from_year = self.fiscal_year.for_date() + from_year
            for index, path in enumerate(fy_paths):
                if self._path_year(path) >= from_year:
                    fy_paths = fy_paths[index:]
                    break
            else:
                fy_paths = []
        return self._load_paths(iter(fy_paths))

    def load_fy_range(self,
                      from_fy: Year,
                      to_fy: Optional[Year]=None,
    ) -> LoadResult:
        """Load books for a range of fiscal years

        This method generates a range of fiscal years by calling
        FiscalYear.range() with its arguments. It loads all the books within
        that range.
        """
        fy_range = self.fiscal_year.range(from_fy, to_fy)
        fy_paths = self._iter_fy_books(fy_range)
        return self._load_paths(fy_paths)

    @classmethod
    def load_none(cls, config_path: Optional[PathLike]=None, lineno: int=0) -> LoadResult:
        """Load no books and generate an error about it

        This is a convenience method for reporting tools that already handle
        general Beancount errors. If a configuration problem prevents them from
        loading the books, they can call this method in place of a regular
        loading method, and then continue on their normal code path.

        The path and line number given in the arguments will be named as the
        source of the error.
        """
        source = {
            'filename': str(config_path or 'conservancy_beancount.ini'),
            'lineno': lineno,
        }
        return LoadResult.empty(Error(source, "no books to load in configuration", None))

    @classmethod
    def dispatch(cls,
                loader: Optional['Loader'],
                from_fy: Optional[Year]=None,
                to_fy: Optional[Year]=None,
    ) -> LoadResult:
        """High-level, "do-what-I-mean"-ish books loader

        Most tools can call this with a books loader from configuration, plus
        one or two fiscal year arguments, to get the LoadResult they want.
        """
        if loader is None:
            return cls.load_none()
        elif to_fy is None:
            return loader.load_all(from_fy)
        else:
            return loader.load_fy_range(from_fy or 0, to_fy)