Changeset - 2bd3e8b462d2
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-06-04 13:03:10
brettcsmith@brettcsmith.org
books: Loader.from_all() accepts a start FY argument.
3 files changed with 61 insertions and 26 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/books.py
Show inline comments
...
 
@@ -42,122 +42,143 @@ Year = Union[int, datetime.date]
 
def workdir(path: PathLike) -> Iterator[Path]:
 
    old_dir = os.getcwd()
 
    os.chdir(path)
 
    try:
 
        yield Path(old_dir)
 
    finally:
 
        os.chdir(old_dir)
 

	
 
class FiscalYear(NamedTuple):
 
    month: int = 3
 
    day: int = 1
 

	
 
    def for_date(self, date: Optional[datetime.date]=None) -> int:
 
        if date is None:
 
            date = datetime.date.today()
 
        if (date.month, date.day) < self:
 
            return date.year - 1
 
        else:
 
            return date.year
 

	
 
    def range(self, from_fy: Year, to_fy: Optional[Year]=None) -> Iterable[int]:
 
        """Return a range of fiscal years
 

	
 
        Both arguments can be either a year (represented as an integer) or a
 
        date. Dates will be converted into a year by calling for_date() on
 
        them.
 

	
 
        If the first argument is negative or below 1000, it will be treated as
 
        an offset. You'll get a range of fiscal years between the second
 
        argument offset by this amount.
 

	
 
        If the second argument is omitted, it defaults to the current fiscal
 
        year.
 

	
 
        Note that unlike normal Python ranges, these ranges include the final
 
        fiscal year.
 

	
 
        Examples:
 

	
 
          range(2015)  # Iterate all fiscal years from 2015 to today, inclusive
 

	
 
          range(-1)  # Iterate the previous fiscal year and current fiscal year
 
        """
 
        if not isinstance(from_fy, int):
 
            from_fy = self.for_date(from_fy)
 
        if to_fy is None:
 
            to_fy = self.for_date()
 
        elif not isinstance(to_fy, int):
 
            to_fy = self.for_date(to_fy - datetime.timedelta(days=1))
 
        if from_fy < 1:
 
            from_fy += to_fy
 
        elif from_fy < 1000:
 
            from_fy, to_fy = to_fy, from_fy + to_fy
 
        return range(from_fy, to_fy + 1)
 

	
 

	
 
class Loader:
 
    """Load Beancount books organized by fiscal year"""
 

	
 
    def __init__(self,
 
                 books_root: Path,
 
                 fiscal_year: FiscalYear,
 
    ) -> None:
 
        """Set up a books loader
 

	
 
        Arguments:
 
        * books_root: A Path to a Beancount books checkout.
 
        * fiscal_year: A FiscalYear object, used to determine what books to
 
          load for a given date range.
 
        """
 
        self.books_root = books_root
 
        self.fiscal_year = fiscal_year
 

	
 
    def _iter_fy_books(self, fy_range: Iterable[int]) -> Iterator[Path]:
 
        for year in fy_range:
 
            path = Path(self.books_root, 'books', f'{year}.beancount')
 
            if path.exists():
 
                yield path
 

	
 
    def _load_paths(self, paths: Iterator[Path]) -> LoadResult:
 
        try:
 
            entries, errors, options_map = bc_loader.load_file(next(paths))
 
        except StopIteration:
 
            entries, errors, options_map = [], [], {}
 
        for load_path in paths:
 
            new_entries, new_errors, new_options = bc_loader.load_file(load_path)
 
            # We only want transactions from the new fiscal year.
 
            # We don't want the opening balance, duplicate definitions, etc.
 
            fy_filename = str(load_path.parent.parent / load_path.name)
 
            entries.extend(
 
                entry for entry in new_entries
 
                if entry.meta.get('filename') == fy_filename
 
            )
 
            errors.extend(new_errors)
 
        return entries, errors, options_map
 

	
 
    def load_all(self) -> LoadResult:
 
        """Load all of the books
 
    def _path_year(self, path: Path) -> int:
 
        return int(path.stem)
 

	
 
        This method loads all of the books. It finds the books by simply
 
        globbing the filesystem. It still loads each fiscal year in sequence to
 
        provide the best cache utilization.
 
    def load_all(self, from_year: Optional[Year]=None) -> LoadResult:
 
        """Load all of the books from a starting FY
 

	
 
        This method loads all of the books, starting from the fiscal year you
 
        specify.
 

	
 
        * Pass in a date to start from the FY for that date.
 
        * Pass in an integer >= 1000 to start from that year.
 
        * Pass in a smaller integer to start from an FY relative to today
 
          (e.g., -2 starts two FYs before today).
 
        * Pass is no argument to load all books from the first available FY.
 

	
 
        This method finds books by globbing the filesystem. It still loads
 
        each fiscal year in sequence to provide the best cache utilization.
 
        """
 
        path = Path(self.books_root, 'books')
 
        fy_paths = list(path.glob('[1-9][0-9][0-9][0-9].beancount'))
 
        fy_paths.sort(key=lambda path: int(path.stem))
 
        fy_paths.sort(key=self._path_year)
 
        if from_year is not None:
 
            if not isinstance(from_year, int):
 
                from_year = self.fiscal_year.for_date(from_year)
 
            elif from_year < 1000:
 
                from_year = self.fiscal_year.for_date() + from_year
 
            for index, path in enumerate(fy_paths):
 
                if self._path_year(path) >= from_year:
 
                    fy_paths = fy_paths[index:]
 
                    break
 
            else:
 
                fy_paths = []
 
        return self._load_paths(iter(fy_paths))
 

	
 
    def load_fy_range(self,
 
                      from_fy: Year,
 
                      to_fy: Optional[Year]=None,
 
    ) -> LoadResult:
 
        """Load books for a range of fiscal years
 

	
 
        This method generates a range of fiscal years by calling
 
        FiscalYear.range() with its first two arguments. It returns a string of
 
        Beancount directives to load the books from the first available fiscal
 
        year through the end of the range.
 
        FiscalYear.range() with its arguments. It loads all the books within
 
        that range.
 
        """
 
        fy_range = self.fiscal_year.range(from_fy, to_fy)
 
        fy_paths = self._iter_fy_books(fy_range)
 
        return self._load_paths(fy_paths)
tests/test_books_loader.py
Show inline comments
 
"""test_books_loader - Unit tests for books Loader class"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import re
 

	
 
from datetime import date
 
from pathlib import Path
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from beancount.core import data as bc_data
 
from conservancy_beancount import books
 

	
 
FY_START_MONTH = 3
 

	
 
books_path = testutil.test_path('books')
 

	
 
@pytest.fixture(scope='module')
 
def conservancy_loader():
 
    return books.Loader(books_path, books.FiscalYear(3))
 
    return books.Loader(books_path, books.FiscalYear(FY_START_MONTH))
 

	
 
def check_openings(entries):
 
    openings = collections.defaultdict(int)
 
    for entry in entries:
 
        if isinstance(entry, bc_data.Open):
 
            openings[entry.account] += 1
 
    for account, count in openings.items():
 
        assert count == 1, f"found {count} open directives for {account}"
 

	
 
def check_narrations(entries, expected):
 
    expected = iter(expected)
 
    expected_next = next(expected)
 
def txn_dates(entries):
 
    for entry in entries:
 
        if (isinstance(entry, bc_data.Transaction)
 
            and entry.narration == expected_next):
 
            try:
 
                expected_next = next(expected)
 
            except StopIteration:
 
                break
 
    else:
 
        assert None, f"{expected_next} not found in entry narrations"
 
        if isinstance(entry, bc_data.Transaction):
 
            yield entry.date
 

	
 
def txn_years(entries):
 
    return frozenset(date.year for date in txn_dates(entries))
 

	
 
@pytest.mark.parametrize('from_fy,to_fy,expect_years', [
 
    (2019, 2019, range(2019, 2020)),
 
    (0, 2019, range(2019, 2020)),
 
    (2018, 2019, range(2018, 2020)),
 
    (1, 2018, range(2018, 2020)),
 
    (-1, 2019, range(2018, 2020)),
 
    (2019, 2020, range(2019, 2021)),
 
    (1, 2019, range(2019, 2021)),
 
    (-1, 2020, range(2019, 2021)),
 
    (2010, 2030, range(2018, 2021)),
 
    (20, 2010, range(2018, 2021)),
 
    (-20, 2030, range(2018, 2021)),
 
])
 
def test_load_fy_range(conservancy_loader, from_fy, to_fy, expect_years):
 
    entries, errors, options_map = conservancy_loader.load_fy_range(from_fy, to_fy)
 
    assert not errors
 
    check_narrations(entries, [f'{year} donation' for year in expect_years])
 
    assert txn_years(entries).issuperset(expect_years)
 

	
 
def test_load_fy_range_does_not_duplicate_openings(conservancy_loader):
 
    entries, errors, options_map = conservancy_loader.load_fy_range(2010, 2030)
 
    check_openings(entries)
 

	
 
def test_load_fy_range_empty(conservancy_loader):
 
    entries, errors, options_map = conservancy_loader.load_fy_range(2020, 2019)
 
    assert not errors
 
    assert not entries
 
    assert not options_map
 

	
 
def test_load_all(conservancy_loader):
 
    entries, errors, options_map = conservancy_loader.load_all()
 
@pytest.mark.parametrize('from_year', [None, *range(2018, 2021)])
 
def test_load_all(conservancy_loader, from_year):
 
    entries, errors, options_map = conservancy_loader.load_all(from_year)
 
    from_year = from_year or 2018
 
    assert not errors
 
    check_openings(entries)
 
    assert txn_years(entries).issuperset(range(from_year or 2018, 2021))
 

	
 
@pytest.mark.parametrize('from_date', [
 
    date(2019, 2, 1),
 
    date(2019, 9, 15),
 
    date(2020, 1, 20),
 
    date(2020, 5, 31),
 
])
 
def test_load_all_from_date(conservancy_loader, from_date):
 
    from_year = from_date.year
 
    if from_date.month < FY_START_MONTH:
 
        from_year -= 1
 
    entries, errors, options_map = conservancy_loader.load_all(from_date)
 
    assert not errors
 
    check_narrations(entries, [f'{year} donation' for year in range(2018, 2021)])
 
    check_openings(entries)
 
    assert txn_years(entries).issuperset(range(from_year, 2021))
tests/testutil.py
Show inline comments
...
 
@@ -124,193 +124,193 @@ def Posting(account, number,
 
    return type_(
 
        account,
 
        Amount(number, currency),
 
        cost,
 
        price,
 
        flag,
 
        meta,
 
    )
 

	
 
def Transaction(date=FY_MID_DATE, flag='*', payee=None,
 
                narration='', tags=None, links=None, postings=(),
 
                **meta):
 
    if isinstance(date, str):
 
        date = parse_date(date)
 
    meta.setdefault('filename', '<test>')
 
    meta.setdefault('lineno', 0)
 
    real_postings = []
 
    for post in postings:
 
        try:
 
            post.account
 
        except AttributeError:
 
            if isinstance(post[-1], dict):
 
                args = post[:-1]
 
                kwargs = post[-1]
 
            else:
 
                args = post
 
                kwargs = {}
 
            post = Posting(*args, **kwargs)
 
        real_postings.append(post)
 
    return bc_data.Transaction(
 
        meta,
 
        date,
 
        flag,
 
        payee,
 
        narration,
 
        set(tags or ''),
 
        set(links or ''),
 
        real_postings,
 
    )
 

	
 
LINK_METADATA_STRINGS = {
 
    'Invoices/304321.pdf',
 
    'rt:123/456',
 
    'rt://ticket/234',
 
}
 

	
 
NON_LINK_METADATA_STRINGS = {
 
    '',
 
    ' ',
 
    '     ',
 
}
 

	
 
NON_STRING_METADATA_VALUES = [
 
    Decimal(5),
 
    FY_MID_DATE,
 
    Amount(50),
 
    Amount(500, None),
 
]
 

	
 
OPENING_EQUITY_ACCOUNTS = itertools.cycle([
 
    'Equity:Funds:Unrestricted',
 
    'Equity:Funds:Restricted',
 
    'Equity:OpeningBalance',
 
])
 

	
 
class ODSCell:
 
    @classmethod
 
    def from_row(cls, row):
 
        return row.getElementsByType(odf.table.TableCell)
 

	
 
    @classmethod
 
    def from_sheet(cls, spreadsheet):
 
        for row in spreadsheet.getElementsByType(odf.table.TableRow):
 
            yield list(cls.from_row(row))
 

	
 
    @classmethod
 
    def from_ods_file(cls, path):
 
        ods = odf.opendocument.load(path)
 
        return cls.from_sheet(ods.spreadsheet)
 

	
 

	
 
def OpeningBalance(acct=None, **txn_meta):
 
    if acct is None:
 
        acct = next(OPENING_EQUITY_ACCOUNTS)
 
    return Transaction(**txn_meta, postings=[
 
        ('Assets:Receivable:Accounts', 100),
 
        ('Assets:Receivable:Loans', 200),
 
        ('Liabilities:Payable:Accounts', -15),
 
        ('Liabilities:Payable:Vacation', -25),
 
        (acct, -260),
 
    ])
 

	
 
class TestBooksLoader(books.Loader):
 
    def __init__(self, source):
 
        self.source = source
 

	
 
    def load_all(self):
 
    def load_all(self, from_year=None):
 
        return bc_loader.load_file(self.source)
 

	
 
    def load_fy_range(self, from_fy, to_fy=None):
 
        return self.load_all()
 

	
 

	
 
class TestConfig:
 
    def __init__(self, *,
 
                 books_path=None,
 
                 payment_threshold=0,
 
                 repo_path=None,
 
                 rt_client=None,
 
    ):
 
        if books_path is None:
 
            self._books_loader = None
 
        else:
 
            self._books_loader = TestBooksLoader(books_path)
 
        self._payment_threshold = Decimal(payment_threshold)
 
        self.repo_path = test_path(repo_path)
 
        self._rt_client = rt_client
 
        if rt_client is None:
 
            self._rt_wrapper = None
 
        else:
 
            self._rt_wrapper = rtutil.RT(rt_client)
 

	
 
    def books_loader(self):
 
        return self._books_loader
 

	
 
    def config_file_path(self):
 
        return test_path('userconfig/conservancy_beancount/config.ini')
 

	
 
    def payment_threshold(self):
 
        return self._payment_threshold
 

	
 
    def repository_path(self):
 
        return self.repo_path
 

	
 
    def rt_client(self):
 
        return self._rt_client
 

	
 
    def rt_wrapper(self):
 
        return self._rt_wrapper
 

	
 

	
 
class _TicketBuilder:
 
    MESSAGE_ATTACHMENTS = [
 
        ('(Unnamed)', 'multipart/alternative', '0b'),
 
        ('(Unnamed)', 'text/plain', '1.2k'),
 
        ('(Unnamed)', 'text/html', '1.4k'),
 
    ]
 
    MISC_ATTACHMENTS = [
 
        ('Forwarded Message.eml', 'message/rfc822', '3.1k'),
 
        ('photo.jpg', 'image/jpeg', '65.2k'),
 
        ('ConservancyInvoice-301.pdf', 'application/pdf', '326k'),
 
        ('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'),
 
        ('statement.txt', 'text/plain', '652b'),
 
        ('screenshot.png', 'image/png', '1.9m'),
 
    ]
 

	
 
    def __init__(self):
 
        self.id_seq = itertools.count(1)
 
        self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
 

	
 
    def new_attch(self, attch):
 
        return (str(next(self.id_seq)), *attch)
 

	
 
    def new_msg_with_attachments(self, attachments_count=1):
 
        for attch in self.MESSAGE_ATTACHMENTS:
 
            yield self.new_attch(attch)
 
        for _ in range(attachments_count):
 
            yield self.new_attch(next(self.misc_attchs))
 

	
 
    def new_messages(self, messages_count, attachments_count=None):
 
        for n in range(messages_count):
 
            if attachments_count is None:
 
                att_count = messages_count - n
 
            else:
 
                att_count = attachments_count
 
            yield from self.new_msg_with_attachments(att_count)
 

	
 

	
 
class RTClient:
 
    _builder = _TicketBuilder()
 
    DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
 
    TICKET_DATA = {
 
        '1': list(_builder.new_messages(1, 3)),
 
        '2': list(_builder.new_messages(2, 1)),
 
        '3': list(_builder.new_messages(3, 0)),
 
    }
 
    del _builder
 

	
 
    def __init__(self,
 
                 url=DEFAULT_URL,
 
                 default_login=None,
 
                 default_password=None,
 
                 proxy=None,
0 comments (0 inline, 0 general)