Files @ aa488effb0f5
Branch filter:

Location: NPO-Accounting/conservancy_beancount/tests/testutil.py

Brett Smith
books.Loader: New loading strategy based on load_file. RT#11034.

Building a string and loading it means Beancount can never cache any
load. It only caches top-level file loads because options in the
top-level file can change the semantics of included entries.

Instead use load_file as much as possible, and filter entries as
needed.
"""Mock Beancount objects for testing"""
# Copyright © 2020  Brett Smith
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import datetime
import itertools
import re

import beancount.core.amount as bc_amount
import beancount.core.data as bc_data
import beancount.loader as bc_loader

from decimal import Decimal
from pathlib import Path

from conservancy_beancount import books, rtutil

EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
FY_START_DATE = datetime.date(2020, 3, 1)
FY_MID_DATE = datetime.date(2020, 9, 1)
PAST_DATE = datetime.date(2000, 1, 1)
TESTS_DIR = Path(__file__).parent

def check_lines_match(lines, expect_patterns, source='output'):
    for pattern in expect_patterns:
        assert any(re.search(pattern, line) for line in lines), \
            f"{pattern!r} not found in {source}"

def check_post_meta(txn, *expected_meta, default=None):
    assert len(txn.postings) == len(expected_meta)
    for post, expected in zip(txn.postings, expected_meta):
        if not expected:
            assert not post.meta
        else:
            actual = None if post.meta is None else {
                key: post.meta.get(key, default) for key in expected
            }
            assert actual == expected

def combine_values(*value_seqs):
    stop = 0
    for seq in value_seqs:
        try:
            stop = max(stop, len(seq))
        except TypeError:
            pass
    return itertools.islice(
        zip(*(itertools.cycle(seq) for seq in value_seqs)),
        stop,
    )

def date_seq(date=FY_MID_DATE, step=1):
    while True:
        yield date
        date += datetime.timedelta(days=step)

def parse_date(s, fmt='%Y-%m-%d'):
    return datetime.datetime.strptime(s, fmt).date()

def test_path(s):
    if s is None:
        return s
    s = Path(s)
    if not s.is_absolute():
        s = TESTS_DIR / s
    return s

def Amount(number, currency='USD'):
    return bc_amount.Amount(Decimal(number), currency)

def Cost(number, currency='USD', date=FY_MID_DATE, label=None):
    return bc_data.Cost(Decimal(number), currency, date, label)

def Posting(account, number,
            currency='USD', cost=None, price=None, flag=None,
            type_=bc_data.Posting, **meta):
    if cost is not None:
        cost = Cost(*cost)
    if not meta:
        meta = None
    return type_(
        account,
        Amount(number, currency),
        cost,
        price,
        flag,
        meta,
    )

def Transaction(date=FY_MID_DATE, flag='*', payee=None,
                narration='', tags=None, links=None, postings=(),
                **meta):
    if isinstance(date, str):
        date = parse_date(date)
    meta.setdefault('filename', '<test>')
    meta.setdefault('lineno', 0)
    real_postings = []
    for post in postings:
        try:
            post.account
        except AttributeError:
            if isinstance(post[-1], dict):
                args = post[:-1]
                kwargs = post[-1]
            else:
                args = post
                kwargs = {}
            post = Posting(*args, **kwargs)
        real_postings.append(post)
    return bc_data.Transaction(
        meta,
        date,
        flag,
        payee,
        narration,
        set(tags or ''),
        set(links or ''),
        real_postings,
    )

LINK_METADATA_STRINGS = {
    'Invoices/304321.pdf',
    'rt:123/456',
    'rt://ticket/234',
}

NON_LINK_METADATA_STRINGS = {
    '',
    ' ',
    '     ',
}

NON_STRING_METADATA_VALUES = [
    Decimal(5),
    FY_MID_DATE,
    Amount(50),
    Amount(500, None),
]

OPENING_EQUITY_ACCOUNTS = itertools.cycle([
    'Equity:Funds:Unrestricted',
    'Equity:Funds:Restricted',
    'Equity:OpeningBalance',
])

def balance_map(source=None, **kwargs):
    # The source and/or kwargs should map currency name strings to
    # things you can pass to Decimal (a decimal string, an int, etc.)
    # This returns a dict that maps currency name strings to Amount instances.
    retval = {}
    if source is not None:
        retval.update((currency, Amount(number, currency))
                      for currency, number in source)
    if kwargs:
        retval.update(balance_map(kwargs.items()))
    return retval

def OpeningBalance(acct=None, **txn_meta):
    if acct is None:
        acct = next(OPENING_EQUITY_ACCOUNTS)
    return Transaction(**txn_meta, postings=[
        ('Assets:Receivable:Accounts', 100),
        ('Assets:Receivable:Loans', 200),
        ('Liabilities:Payable:Accounts', -15),
        ('Liabilities:Payable:Vacation', -25),
        (acct, -260),
    ])

class TestBooksLoader(books.Loader):
    def __init__(self, source):
        self.source = source

    def load_fy_range(self, from_fy, to_fy=None):
        return bc_loader.load_file(self.source)


class TestConfig:
    def __init__(self, *,
                 books_path=None,
                 payment_threshold=0,
                 repo_path=None,
                 rt_client=None,
    ):
        if books_path is None:
            self._books_loader = None
        else:
            self._books_loader = TestBooksLoader(books_path)
        self._payment_threshold = Decimal(payment_threshold)
        self.repo_path = test_path(repo_path)
        self._rt_client = rt_client
        if rt_client is None:
            self._rt_wrapper = None
        else:
            self._rt_wrapper = rtutil.RT(rt_client)

    def books_loader(self):
        return self._books_loader

    def config_file_path(self):
        return test_path('userconfig/conservancy_beancount/config.ini')

    def payment_threshold(self):
        return self._payment_threshold

    def repository_path(self):
        return self.repo_path

    def rt_client(self):
        return self._rt_client

    def rt_wrapper(self):
        return self._rt_wrapper


class _TicketBuilder:
    MESSAGE_ATTACHMENTS = [
        ('(Unnamed)', 'multipart/alternative', '0b'),
        ('(Unnamed)', 'text/plain', '1.2k'),
        ('(Unnamed)', 'text/html', '1.4k'),
    ]
    MISC_ATTACHMENTS = [
        ('Forwarded Message.eml', 'message/rfc822', '3.1k'),
        ('photo.jpg', 'image/jpeg', '65.2k'),
        ('ConservancyInvoice-301.pdf', 'application/pdf', '326k'),
        ('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'),
        ('statement.txt', 'text/plain', '652b'),
        ('screenshot.png', 'image/png', '1.9m'),
    ]

    def __init__(self):
        self.id_seq = itertools.count(1)
        self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)

    def new_attch(self, attch):
        return (str(next(self.id_seq)), *attch)

    def new_msg_with_attachments(self, attachments_count=1):
        for attch in self.MESSAGE_ATTACHMENTS:
            yield self.new_attch(attch)
        for _ in range(attachments_count):
            yield self.new_attch(next(self.misc_attchs))

    def new_messages(self, messages_count, attachments_count=None):
        for n in range(messages_count):
            if attachments_count is None:
                att_count = messages_count - n
            else:
                att_count = attachments_count
            yield from self.new_msg_with_attachments(att_count)


class RTClient:
    _builder = _TicketBuilder()
    DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
    TICKET_DATA = {
        '1': list(_builder.new_messages(1, 3)),
        '2': list(_builder.new_messages(2, 1)),
        '3': list(_builder.new_messages(3, 0)),
    }
    del _builder

    def __init__(self,
                 url=DEFAULT_URL,
                 default_login=None,
                 default_password=None,
                 proxy=None,
                 default_queue='General',
                 skip_login=False,
                 verify_cert=True,
                 http_auth=None,
    ):
        self.url = url
        if http_auth is None:
            self.user = default_login
            self.password = default_password
            self.auth_method = 'login'
            self.login_result = skip_login or None
        else:
            self.user = http_auth.username
            self.password = http_auth.password
            self.auth_method = type(http_auth).__name__
            self.login_result = True
        self.last_login = None

    def login(self, login=None, password=None):
        if login is None and password is None:
            login = self.user
            password = self.password
        self.login_result = bool(login and password and not password.startswith('bad'))
        self.last_login = (login, password, self.login_result)
        return self.login_result

    def get_attachments(self, ticket_id):
        try:
            return list(self.TICKET_DATA[str(ticket_id)])
        except KeyError:
            return None

    def get_attachment(self, ticket_id, attachment_id):
        try:
            att_seq = iter(self.TICKET_DATA[str(ticket_id)])
        except KeyError:
            return None
        att_id = str(attachment_id)
        multipart_id = None
        for attch in att_seq:
            if attch[0] == att_id:
                break
            elif attch[2].startswith('multipart/'):
                multipart_id = attch[0]
        else:
            return None
        tx_id = multipart_id or att_id
        if attch[1] == '(Unnamed)':
            filename = ''
        else:
            filename = attch[1]
        return {
            'id': att_id,
            'ContentType': attch[2],
            'Filename': filename,
            'Transaction': tx_id,
        }

    def get_ticket(self, ticket_id):
        ticket_id_s = str(ticket_id)
        if ticket_id_s not in self.TICKET_DATA:
            return None
        return {
            'id': 'ticket/{}'.format(ticket_id_s),
            'numerical_id': ticket_id_s,
            'CF.{payment-method}': f'payment method {ticket_id_s}',
            'Requestors': [
                f'mx{ticket_id_s}@example.org',
                'requestor2@example.org',
            ],
        }

    def get_user(self, user_id):
        user_id_s = str(user_id)
        match = re.search(r'(\d+)@', user_id_s)
        if match is None:
            email = f'mx{user_id_s}@example.org'
            user_id_num = int(user_id_s)
        else:
            email = user_id_s
            user_id_num = int(match.group(1))
        return {
            'id': f'user/{user_id_num}',
            'EmailAddress': email,
            'Name': email,
            'RealName': f'Mx. {user_id_num}',
        }