Changeset - 944c19da8d69
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-10 19:59:56
brettcsmith@brettcsmith.org
books: Add date-fetching methods to FiscalYear.
2 files changed with 63 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/books.py
Show inline comments
 
"""books - Tools for loading the books"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 

	
 
from pathlib import Path
 

	
 
from beancount import loader as bc_loader
 

	
 
from typing import (
 
    Any,
 
    Iterable,
 
    Iterator,
 
    Mapping,
 
    NamedTuple,
 
    Optional,
 
    Union,
 
)
 
from .beancount_types import (
 
    Error,
 
    Errors,
 
    LoadResult,
 
)
 

	
 
PathLike = Union[str, Path]
 
Year = Union[int, datetime.date]
 

	
 
class FiscalYear(NamedTuple):
 
    month: int = 3
 
    day: int = 1
 

	
 
    def for_date(self, date: Optional[datetime.date]=None) -> int:
 
        if date is None:
 
            date = datetime.date.today()
 
        if (date.month, date.day) < self:
 
            return date.year - 1
 
        else:
 
            return date.year
 

	
 
    def first_date(self, year: Year) -> datetime.date:
 
        if isinstance(year, datetime.date):
 
            year = self.for_date(year)
 
        return datetime.date(year, self.month, self.day)
 

	
 
    def last_date(self, year: Year) -> datetime.date:
 
        return self.next_fy_date(year) - datetime.timedelta(days=1)
 

	
 
    def next_fy_date(self, year: Year) -> datetime.date:
 
        if isinstance(year, datetime.date):
 
            year = self.for_date(year)
 
        return datetime.date(year + 1, self.month, self.day)
 

	
 
    def range(self, from_fy: Year, to_fy: Optional[Year]=None) -> Iterable[int]:
 
        """Return a range of fiscal years
 

	
 
        Both arguments can be either a year (represented as an integer) or a
 
        date. Dates will be converted into a year by calling for_date() on
 
        them.
 

	
 
        If the first argument is negative or below 1000, it will be treated as
 
        an offset. You'll get a range of fiscal years between the second
 
        argument offset by this amount.
 

	
 
        If the second argument is omitted, it defaults to the current fiscal
 
        year.
 

	
 
        Note that unlike normal Python ranges, these ranges include the final
 
        fiscal year.
 

	
 
        Examples:
 

	
 
          range(2015)  # Iterate all fiscal years from 2015 to today, inclusive
 

	
 
          range(-1)  # Iterate the previous fiscal year and current fiscal year
 
        """
 
        if not isinstance(from_fy, int):
 
            from_fy = self.for_date(from_fy)
 
        if to_fy is None:
 
            to_fy = self.for_date()
 
        elif not isinstance(to_fy, int):
 
            to_fy = self.for_date(to_fy - datetime.timedelta(days=1))
 
        if from_fy < 1:
 
            from_fy += to_fy
 
        elif from_fy < 1000:
 
            from_fy, to_fy = to_fy, from_fy + to_fy
 
        return range(from_fy, to_fy + 1)
 

	
 

	
 
class Loader:
 
    """Load Beancount books organized by fiscal year"""
 

	
 
    def __init__(self,
 
                 books_root: Path,
 
                 fiscal_year: FiscalYear,
 
    ) -> None:
 
        """Set up a books loader
 

	
 
        Arguments:
 
        * books_root: A Path to a Beancount books checkout.
 
        * fiscal_year: A FiscalYear object, used to determine what books to
 
          load for a given date range.
 
        """
 
        self.books_root = books_root
 
        self.fiscal_year = fiscal_year
 

	
 
    def _iter_fy_books(self, fy_range: Iterable[int]) -> Iterator[Path]:
 
        for year in fy_range:
 
            path = Path(self.books_root, 'books', f'{year}.beancount')
 
            if path.exists():
 
                yield path
 

	
 
    def _load_paths(self, paths: Iterator[Path]) -> LoadResult:
 
        try:
 
            entries, errors, options_map = bc_loader.load_file(next(paths))
 
        except StopIteration:
 
            entries, errors, options_map = [], [], {}
 
        for load_path in paths:
 
            new_entries, new_errors, new_options = bc_loader.load_file(load_path)
 
            # We only want transactions from the new fiscal year.
 
            # We don't want the opening balance, duplicate definitions, etc.
 
            fy_filename = str(load_path.parent.parent / load_path.name)
 
            entries.extend(
 
                entry for entry in new_entries
 
                if entry.meta.get('filename') == fy_filename
 
            )
 
            errors.extend(new_errors)
 
        return entries, errors, options_map
 

	
 
    def _path_year(self, path: Path) -> int:
 
        return int(path.stem)
 

	
 
    def load_all(self, from_year: Optional[Year]=None) -> LoadResult:
 
        """Load all of the books from a starting FY
 

	
 
        This method loads all of the books, starting from the fiscal year you
 
        specify.
 

	
 
        * Pass in a date to start from the FY for that date.
 
        * Pass in an integer >= 1000 to start from that year.
 
        * Pass in a smaller integer to start from an FY relative to today
 
          (e.g., -2 starts two FYs before today).
 
        * Pass is no argument to load all books from the first available FY.
 

	
 
        This method finds books by globbing the filesystem. It still loads
 
        each fiscal year in sequence to provide the best cache utilization.
 
        """
 
        path = Path(self.books_root, 'books')
 
        fy_paths = list(path.glob('[1-9][0-9][0-9][0-9].beancount'))
 
        fy_paths.sort(key=self._path_year)
 
        if from_year is not None:
 
            if not isinstance(from_year, int):
 
                from_year = self.fiscal_year.for_date(from_year)
 
            elif from_year < 1000:
 
                from_year = self.fiscal_year.for_date() + from_year
 
            for index, path in enumerate(fy_paths):
 
                if self._path_year(path) >= from_year:
 
                    fy_paths = fy_paths[index:]
 
                    break
 
            else:
 
                fy_paths = []
 
        return self._load_paths(iter(fy_paths))
 

	
 
    def load_fy_range(self,
 
                      from_fy: Year,
 
                      to_fy: Optional[Year]=None,
 
    ) -> LoadResult:
 
        """Load books for a range of fiscal years
 

	
 
        This method generates a range of fiscal years by calling
 
        FiscalYear.range() with its arguments. It loads all the books within
 
        that range.
 
        """
 
        fy_range = self.fiscal_year.range(from_fy, to_fy)
 
        fy_paths = self._iter_fy_books(fy_range)
 
        return self._load_paths(fy_paths)
 

	
 
    @classmethod
 
    def load_none(cls, config_path: Optional[PathLike]=None, lineno: int=0) -> LoadResult:
 
        """Load no books and generate an error about it
 

	
 
        This is a convenience method for reporting tools that already handle
 
        general Beancount errors. If a configuration problem prevents them from
 
        loading the books, they can call this method in place of a regular
 
        loading method, and then continue on their normal code path.
 

	
 
        The path and line number given in the arguments will be named as the
 
        source of the error.
 
        """
 
        source = {
 
            'filename': str(config_path or 'conservancy_beancount.ini'),
 
            'lineno': lineno,
 
        }
 
        errors: Errors = [Error(source, "no books to load in configuration", None)]
 
        return [], errors, {}
tests/test_books_fiscal_year.py
Show inline comments
 
"""test_books_fiscal_year - Unit tests for books.FiscalYear"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import itertools
 

	
 
import pytest
 

	
 
from conservancy_beancount import books
 

	
 
@pytest.fixture(scope='module')
 
def conservancy_fy():
 
    return books.FiscalYear(3, 1)
 

	
 
@pytest.fixture(scope='module')
 
def cy_fy():
 
    return books.FiscalYear(1, 1)
 

	
 
# Not bothering because it's too much trouble to override __new__
 
# @pytest.mark.parametrize('month,day', [
 
#     (0, 0),
 
#     (2, 29),
 
#     (4, 31),
 
#     (13, 15),
 
#     (-1, 1),
 
#     (1, -5),
 
# ])
 
# def test_values_checked(month, day):
 
#     with pytest.raises(ValueError):
 
#         books.FiscalYear(month, day)
 

	
 
def test_attribute_access():
 
    fy = books.FiscalYear(2, 15)
 
    assert fy.month == 2
 
    assert fy.day == 15
 
    with pytest.raises(AttributeError):
 
        fy.year
 

	
 
@pytest.mark.parametrize('month,day,expected', [
 
    (1, 1, 2019),
 
    (2, 15, 2019),
 
    (2, 28, 2019),
 
    (2, 29, 2019),
 
    (3, 1, 2020),
 
    (3, 9, 2020),
 
    (6, 1, 2020),
 
    (12, 31, 2020),
 
])
 
def test_for_date(conservancy_fy, month, day, expected):
 
    date = datetime.date(2020, month, day)
 
    assert conservancy_fy.for_date(date) == expected
 

	
 
def test_for_date_default_today(cy_fy):
 
    assert cy_fy.for_date() == datetime.date.today().year
 

	
 
@pytest.mark.parametrize('begin_date,end_date,expected', [
 
    ((2020, 3, 1), (2021, 2, 28), [2020]),
 
    ((2020, 1, 1), (2020, 3, 1), [2019]),
 
    ((2020, 1, 1), (2020, 3, 5), [2019, 2020]),
 
    ((2019, 2, 1), (2020, 6, 1), [2018, 2019, 2020]),
 
])
 
def test_range_two_dates(conservancy_fy, begin_date, end_date, expected):
 
    actual = list(conservancy_fy.range(datetime.date(*begin_date), datetime.date(*end_date)))
 
    assert actual == expected
 

	
 
@pytest.mark.parametrize('year_offset', range(3))
 
def test_range_one_date(cy_fy, year_offset):
 
    this_year = datetime.date.today().year
 
    actual = list(cy_fy.range(datetime.date(this_year - year_offset, 1, 1)))
 
    assert actual == list(range(this_year - year_offset, this_year + 1))
 

	
 
@pytest.mark.parametrize('begin_year,end_year', [
 
    (2006, 2020),
 
    (2019, 2020),
 
    (2020, 2020),
 
])
 
def test_range_two_years(conservancy_fy, begin_year, end_year):
 
    actual = list(conservancy_fy.range(begin_year, end_year))
 
    assert actual == list(range(begin_year, end_year + 1))
 

	
 
@pytest.mark.parametrize('year_offset', range(3))
 
def test_range_one_year(cy_fy, year_offset):
 
    this_year = datetime.date.today().year
 
    actual = list(cy_fy.range(this_year - year_offset))
 
    assert actual == list(range(this_year - year_offset, this_year + 1))
 

	
 
@pytest.mark.parametrize('year_offset,month_offset', itertools.product(
 
    range(-3, 3),
 
    [-1, 1]
 
))
 
def test_range_offset_and_date(conservancy_fy, year_offset, month_offset):
 
    end_date = datetime.date(2020, conservancy_fy.month + month_offset, 10)
 
    base_year = end_date.year
 
    if month_offset < 0:
 
        base_year -= 1
 
    if year_offset < 0:
 
        expected = range(base_year + year_offset, base_year + 1)
 
    else:
 
        expected = range(base_year, base_year + year_offset + 1)
 
    actual = list(conservancy_fy.range(year_offset, end_date))
 
    assert actual == list(expected)
 

	
 
@pytest.mark.parametrize('year_offset,year', itertools.product(
 
    range(-3, 3),
 
    [2010, 2015],
 
))
 
def test_range_offset_and_year(conservancy_fy, year_offset, year):
 
    if year_offset < 0:
 
        expected = range(year + year_offset, year + 1)
 
    else:
 
        expected = range(year, year + year_offset + 1)
 
    actual = list(conservancy_fy.range(year_offset, year))
 
    assert actual == list(expected)
 

	
 
@pytest.mark.parametrize('year_offset', range(-3, 3))
 
def test_range_offset_only(cy_fy, year_offset):
 
    year = datetime.date.today().year
 
    if year_offset < 0:
 
        expected = range(year + year_offset, year + 1)
 
    else:
 
        expected = range(year, year + year_offset + 1)
 
    actual = list(cy_fy.range(year_offset))
 
    assert actual == list(expected)
 

	
 
@pytest.mark.parametrize('year', range(2016, 2022))
 
def test_first_date_year_arg(conservancy_fy, year):
 
    assert conservancy_fy.first_date(year) == datetime.date(year, 3, 1)
 

	
 
@pytest.mark.parametrize('date', [
 
    datetime.date(2019, 1, 1),
 
    datetime.date(2019, 10, 10),
 
    datetime.date(2020, 2, 2),
 
    datetime.date(2020, 12, 12),
 
])
 
def test_first_date_date_arg(conservancy_fy, date):
 
    year = date.year
 
    if date.month < 3:
 
        year -= 1
 
    assert conservancy_fy.first_date(date) == datetime.date(year, 3, 1)
 

	
 
@pytest.mark.parametrize('year', range(2016, 2022))
 
def test_last_date_year_arg(conservancy_fy, year):
 
    day = 28 if year % 4 else 29
 
    assert conservancy_fy.last_date(year - 1) == datetime.date(year, 2, day)
 

	
 
@pytest.mark.parametrize('date', [
 
    datetime.date(2019, 1, 1),
 
    datetime.date(2019, 10, 10),
 
    datetime.date(2020, 2, 2),
 
    datetime.date(2020, 12, 12),
 
])
 
def test_last_date_date_arg(conservancy_fy, date):
 
    year = date.year
 
    if date.month >= 3:
 
        year += 1
 
    day = 28 if year % 4 else 29
 
    assert conservancy_fy.last_date(date) == datetime.date(year, 2, day)
 

	
 
@pytest.mark.parametrize('year', range(2016, 2022))
 
def test_next_fy_date_year_arg(conservancy_fy, year):
 
    assert conservancy_fy.next_fy_date(year) == datetime.date(year + 1, 3, 1)
 

	
 
@pytest.mark.parametrize('date', [
 
    datetime.date(2019, 1, 1),
 
    datetime.date(2019, 10, 10),
 
    datetime.date(2020, 2, 29),
 
    datetime.date(2020, 12, 12),
 
])
 
def test_next_fy_date_date_arg(conservancy_fy, date):
 
    year = date.year
 
    if date.month >= 3:
 
        year += 1
 
    assert conservancy_fy.next_fy_date(date) == datetime.date(year, 3, 1)
0 comments (0 inline, 0 general)