Changeset - 1cbc9d3dc933
[Not reviewed]
0 2 0
Brett Smith - 4 years ago 2020-06-11 17:07:14
brettcsmith@brettcsmith.org
tests: Add _meta_type kwarg to testutil.Posting.
2 files changed with 20 insertions and 37 deletions:
0 comments (0 inline, 0 general)
tests/test_reports_related_postings.py
Show inline comments
 
"""test_reports_related_postings - Unit tests for RelatedPostings"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import collections
 
import datetime
 
import itertools
 

	
 
from decimal import Decimal
 

	
 
import pytest
 

	
 
from . import testutil
 

	
 
from conservancy_beancount import data
 
from conservancy_beancount.reports import core
 

	
 
def accruals_and_payments(acct, src_acct, dst_acct, start_date, *amounts):
 
    dates = testutil.date_seq(start_date)
 
    for amt, currency in amounts:
 
        post_meta = {'metanumber': amt, 'metacurrency': currency}
 
        yield testutil.Transaction(date=next(dates), postings=[
 
            (acct, amt, currency, post_meta),
 
            (dst_acct if amt < 0 else src_acct, -amt, currency, post_meta),
 
        ])
 

	
 
@pytest.fixture
 
def credit_card_cycle():
 
    return list(accruals_and_payments(
 
        'Liabilities:CreditCard',
 
        'Assets:Checking',
 
        'Expenses:Other',
 
        datetime.date(2020, 4, 1),
 
        (-110, 'USD'),
 
        (110, 'USD'),
 
        (-120, 'USD'),
 
        (120, 'USD'),
 
    ))
 

	
 
@pytest.fixture
 
def two_accruals_three_payments():
 
    return list(accruals_and_payments(
 
        'Assets:Receivable:Accounts',
 
        'Income:Donations',
 
        'Assets:Checking',
 
        datetime.date(2020, 4, 10),
 
        (440, 'USD'),
 
        (-230, 'USD'),
 
        (550, 'EUR'),
 
        (-210, 'USD'),
 
        (-550, 'EUR'),
 
    ))
 

	
 
def test_initialize_with_list(credit_card_cycle):
 
    related = core.RelatedPostings(credit_card_cycle[0].postings)
 
    assert len(related) == 2
 

	
 
def test_initialize_with_iterable(two_accruals_three_payments):
 
    related = core.RelatedPostings(
 
        post for txn in two_accruals_three_payments
 
        for post in txn.postings
 
        if post.account == 'Assets:Receivable:Accounts'
 
    )
 
    assert len(related) == 5
 

	
 
def test_balance_empty():
 
    balance = core.RelatedPostings().balance()
 
    assert not balance
 
    assert balance.is_zero()
 

	
 
@pytest.mark.parametrize('index,expected', enumerate([
 
    -110,
 
    0,
 
    -120,
 
    0,
 
]))
 
def test_balance_credit_card(credit_card_cycle, index, expected):
 
    related = core.RelatedPostings(
 
        txn.postings[0] for txn in credit_card_cycle[:index + 1]
 
    )
 
    assert related.balance() == {'USD': testutil.Amount(expected, 'USD')}
 

	
 
def check_iter_with_balance(entries):
 
    expect_posts = [txn.postings[0] for txn in entries]
 
    expect_balances = []
 
    balance_tally = collections.defaultdict(Decimal)
 
    for post in expect_posts:
 
        number, currency = post.units
 
        balance_tally[currency] += number
 
        expect_balances.append({code: testutil.Amount(number, code)
 
                                for code, number in balance_tally.items()})
 
    related = core.RelatedPostings(expect_posts)
 
    for (post, balance), exp_post, exp_balance in zip(
 
            related.iter_with_balance(),
 
            expect_posts,
 
            expect_balances,
 
    ):
 
        assert post is exp_post
 
        assert balance == exp_balance
 
    assert post is expect_posts[-1]
 
    assert related.balance() == expect_balances[-1]
 

	
 
def test_iter_with_balance_empty():
 
    assert not list(core.RelatedPostings().iter_with_balance())
 

	
 
def test_iter_with_balance_credit_card(credit_card_cycle):
 
    check_iter_with_balance(credit_card_cycle)
 

	
 
def test_iter_with_balance_two_acccruals(two_accruals_three_payments):
 
    check_iter_with_balance(two_accruals_three_payments)
 

	
 
def test_balance_at_cost_mixed():
 
    txn = testutil.Transaction(postings=[
 
        ('Expenses:Other', '22'),
 
        ('Expenses:Other', '30', 'EUR', ('1.1',)),
 
        ('Expenses:Other', '40', 'EUR'),
 
        ('Expenses:Other', '50', 'USD', ('1.1', 'EUR')),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    amounts = set(balance.values())
 
    assert amounts == {testutil.Amount(55, 'USD'), testutil.Amount(95, 'EUR')}
 

	
 
def test_balance_at_single_currency_cost():
 
    txn = testutil.Transaction(postings=[
 
        ('Expenses:Other', '22'),
 
        ('Expenses:Other', '30', 'EUR', ('1.1',)),
 
        ('Expenses:Other', '40', 'GBP', ('1.1',)),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    amounts = set(balance.values())
 
    assert amounts == {testutil.Amount(99)}
 

	
 
def test_balance_at_cost_zeroed_out():
 
    txn = testutil.Transaction(postings=[
 
        ('Income:Other', '-22'),
 
        ('Assets:Receivable:Accounts', '20', 'EUR', ('1.1',)),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    assert balance.is_zero()
 

	
 
def test_balance_at_cost_singleton():
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Receivable:Accounts', '20', 'EUR', ('1.1',)),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    amounts = set(balance.values())
 
    assert amounts == {testutil.Amount(22)}
 

	
 
def test_balance_at_cost_singleton_without_cost():
 
    txn = testutil.Transaction(postings=[
 
        ('Assets:Receivable:Accounts', '20'),
 
    ])
 
    related = core.RelatedPostings(data.Posting.from_txn(txn))
 
    balance = related.balance_at_cost()
 
    amounts = set(balance.values())
 
    assert amounts == {testutil.Amount(20)}
 

	
 
def test_balance_at_cost_empty():
 
    related = core.RelatedPostings()
 
    balance = related.balance_at_cost()
 
    assert balance.is_zero()
 

	
 
def test_meta_values_empty():
 
    related = core.RelatedPostings()
 
    assert related.meta_values('key') == set()
 

	
 
def test_meta_values_no_match():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, metakey='metavalue'),
 
    ])
 
    assert related.meta_values('key') == {None}
 

	
 
def test_meta_values_no_match_default_given():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, metakey='metavalue'),
 
    ])
 
    assert related.meta_values('key', '') == {''}
 

	
 
def test_meta_values_one_match():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='metavalue'),
 
    ])
 
    assert related.meta_values('key') == {'metavalue'}
 

	
 
def test_meta_values_some_match():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, metakey='2'),
 
    ])
 
    assert related.meta_values('key') == {'1', None}
 

	
 
def test_meta_values_some_match_default_given():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, metakey='2'),
 
    ])
 
    assert related.meta_values('key', '') == {'1', ''}
 

	
 
def test_meta_values_all_match():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, key='2'),
 
    ])
 
    assert related.meta_values('key') == {'1', '2'}
 

	
 
def test_meta_values_all_match_one_value():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, key='1'),
 
    ])
 
    assert related.meta_values('key') == {'1'}
 

	
 
def test_meta_values_all_match_default_given():
 
    related = core.RelatedPostings([
 
        testutil.Posting('Income:Donations', -1, key='1'),
 
        testutil.Posting('Income:Donations', -2, key='2'),
 
    ])
 
    assert related.meta_values('key', '') == {'1', '2'}
 

	
 
def test_meta_values_many_types():
 
    expected = {
 
        datetime.date(2020, 4, 1),
 
        Decimal(42),
 
        testutil.Amount(5),
 
        'rt:42',
 
    }
 
    related = core.RelatedPostings(
 
        testutil.Posting('Income:Donations', -index, key=value)
 
        for index, value in enumerate(expected)
 
    )
 
    assert related.meta_values('key') == expected
 

	
 
@pytest.mark.parametrize('count', range(3))
 
def test_all_meta_links_zero(count):
 
    postings = (
 
        testutil.Posting('Income:Donations', -n, testkey=str(n))
 
        for n in range(count)
 
    )
 
    related = core.RelatedPostings(
 
        post._replace(meta=data.Metadata(post.meta))
 
        for post in postings
 
    )
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -n, testkey=str(n), _meta_type=data.Metadata,
 
    ) for n in range(count))
 
    assert next(related.all_meta_links('approval'), None) is None
 

	
 
def test_all_meta_links_singletons():
 
    postings = (
 
        testutil.Posting('Income:Donations', -10, statement=value)
 
        for value in itertools.chain(
 
            testutil.NON_LINK_METADATA_STRINGS,
 
            testutil.LINK_METADATA_STRINGS,
 
            testutil.NON_STRING_METADATA_VALUES,
 
        ))
 
    related = core.RelatedPostings(
 
        post._replace(meta=data.Metadata(post.meta))
 
        for post in postings
 
    )
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, statement=value, _meta_type=data.Metadata,
 
    ) for value in itertools.chain(
 
        testutil.NON_LINK_METADATA_STRINGS,
 
        testutil.LINK_METADATA_STRINGS,
 
        testutil.NON_STRING_METADATA_VALUES,
 
    ))
 
    assert set(related.all_meta_links('statement')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_multiples():
 
    postings = (
 
        testutil.Posting('Income:Donations', -10, approval=' '.join(value))
 
        for value in itertools.permutations(testutil.LINK_METADATA_STRINGS, 2)
 
    )
 
    related = core.RelatedPostings(
 
        post._replace(meta=data.Metadata(post.meta))
 
        for post in postings
 
    )
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=' '.join(value), _meta_type=data.Metadata,
 
    ) for value in itertools.permutations(testutil.LINK_METADATA_STRINGS, 2))
 
    assert set(related.all_meta_links('approval')) == testutil.LINK_METADATA_STRINGS
 

	
 
def test_all_meta_links_preserves_order():
 
    postings = (
 
        testutil.Posting('Income:Donations', -10, approval=c)
 
        for c in '121323'
 
    )
 
    related = core.RelatedPostings(
 
        post._replace(meta=data.Metadata(post.meta))
 
        for post in postings
 
    )
 
    related = core.RelatedPostings(testutil.Posting(
 
        'Income:Donations', -10, approval=c, _meta_type=data.Metadata,
 
    ) for c in '121323')
 
    assert list(related.all_meta_links('approval')) == list('123')
 

	
 
def test_group_by_meta_zero():
 
    assert not list(core.RelatedPostings.group_by_meta([], 'metacurrency'))
 

	
 
def test_group_by_meta_one(credit_card_cycle):
 
    posting = next(post for post in data.Posting.from_entries(credit_card_cycle)
 
                   if post.account.is_credit_card())
 
    actual = core.RelatedPostings.group_by_meta([posting], 'metacurrency')
 
    assert set(key for key, _ in actual) == {'USD'}
 

	
 
def test_group_by_meta_many(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metacurrency'))
 
    assert set(actual) == {'USD', 'EUR'}
 
    for key, group in actual.items():
 
        assert 2 <= len(group) <= 3
 
        assert group.balance().is_zero()
 

	
 
def test_group_by_meta_many_single_posts(two_accruals_three_payments):
 
    postings = [post for post in data.Posting.from_entries(two_accruals_three_payments)
 
                if post.account == 'Assets:Receivable:Accounts']
 
    actual = dict(core.RelatedPostings.group_by_meta(postings, 'metanumber'))
 
    assert set(actual) == {post.units.number for post in postings}
 
    assert len(actual) == len(postings)
tests/testutil.py
Show inline comments
 
"""Mock Beancount objects for testing"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import itertools
 
import re
 

	
 
import beancount.core.amount as bc_amount
 
import beancount.core.data as bc_data
 
import beancount.loader as bc_loader
 

	
 
import odf.element
 
import odf.opendocument
 
import odf.table
 

	
 
from decimal import Decimal
 
from pathlib import Path
 
from typing import Any, Optional, NamedTuple
 

	
 
from conservancy_beancount import books, rtutil
 

	
 
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
 
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
 
FY_START_DATE = datetime.date(2020, 3, 1)
 
FY_MID_DATE = datetime.date(2020, 9, 1)
 
PAST_DATE = datetime.date(2000, 1, 1)
 
TESTS_DIR = Path(__file__).parent
 

	
 
def _ods_cell_value_type(cell):
 
    assert cell.tagName == 'table:table-cell'
 
    return cell.getAttribute('valuetype')
 

	
 
def _ods_cell_value(cell):
 
    value_type = cell.getAttribute('valuetype')
 
    if value_type == 'currency' or value_type == 'float':
 
        return Decimal(cell.getAttribute('value'))
 
    elif value_type == 'date':
 
        return datetime.datetime.strptime(
 
            cell.getAttribute('datevalue'), '%Y-%m-%d',
 
        ).date()
 
    else:
 
        return cell.getAttribute('value')
 

	
 
def _ods_elem_text(elem):
 
    if isinstance(elem, odf.element.Text):
 
        return elem.data
 
    else:
 
        return '\0'.join(_ods_elem_text(child) for child in elem.childNodes)
 

	
 
odf.element.Element.value_type = property(_ods_cell_value_type)
 
odf.element.Element.value = property(_ods_cell_value)
 
odf.element.Element.text = property(_ods_elem_text)
 

	
 
def check_lines_match(lines, expect_patterns, source='output'):
 
    for pattern in expect_patterns:
 
        assert any(re.search(pattern, line) for line in lines), \
 
            f"{pattern!r} not found in {source}"
 

	
 
def check_post_meta(txn, *expected_meta, default=None):
 
    assert len(txn.postings) == len(expected_meta)
 
    for post, expected in zip(txn.postings, expected_meta):
 
        if not expected:
 
            assert not post.meta
 
        else:
 
            actual = None if post.meta is None else {
 
                key: post.meta.get(key, default) for key in expected
 
            }
 
            assert actual == expected
 

	
 
def combine_values(*value_seqs):
 
    stop = 0
 
    for seq in value_seqs:
 
        try:
 
            stop = max(stop, len(seq))
 
        except TypeError:
 
            pass
 
    return itertools.islice(
 
        zip(*(itertools.cycle(seq) for seq in value_seqs)),
 
        stop,
 
    )
 

	
 
def date_seq(date=FY_MID_DATE, step=1):
 
    while True:
 
        yield date
 
        date += datetime.timedelta(days=step)
 

	
 
def parse_date(s, fmt='%Y-%m-%d'):
 
    return datetime.datetime.strptime(s, fmt).date()
 

	
 
def test_path(s):
 
    if s is None:
 
        return s
 
    s = Path(s)
 
    if not s.is_absolute():
 
        s = TESTS_DIR / s
 
    return s
 

	
 
def Amount(number, currency='USD'):
 
    return bc_amount.Amount(Decimal(number), currency)
 

	
 
def Cost(number, currency='USD', date=FY_MID_DATE, label=None):
 
    return bc_data.Cost(Decimal(number), currency, date, label)
 

	
 
def Posting(account, number,
 
            currency='USD', cost=None, price=None, flag=None,
 
            type_=bc_data.Posting, **meta):
 
            _post_type=bc_data.Posting, _meta_type=None, **meta):
 
    if cost is not None:
 
        cost = Cost(*cost)
 
    if not meta:
 
        meta = None
 
    return type_(
 
    elif _meta_type:
 
        meta = _meta_type(meta)
 
    return _post_type(
 
        account,
 
        Amount(number, currency),
 
        cost,
 
        price,
 
        flag,
 
        meta,
 
    )
 

	
 
def Transaction(date=FY_MID_DATE, flag='*', payee=None,
 
                narration='', tags=None, links=None, postings=(),
 
                **meta):
 
    if isinstance(date, str):
 
        date = parse_date(date)
 
    meta.setdefault('filename', '<test>')
 
    meta.setdefault('lineno', 0)
 
    real_postings = []
 
    for post in postings:
 
        try:
 
            post.account
 
        except AttributeError:
 
            if isinstance(post[-1], dict):
 
                args = post[:-1]
 
                kwargs = post[-1]
 
            else:
 
                args = post
 
                kwargs = {}
 
            post = Posting(*args, **kwargs)
 
        real_postings.append(post)
 
    return bc_data.Transaction(
 
        meta,
 
        date,
 
        flag,
 
        payee,
 
        narration,
 
        set(tags or ''),
 
        set(links or ''),
 
        real_postings,
 
    )
 

	
 
LINK_METADATA_STRINGS = {
 
    'Invoices/304321.pdf',
 
    'rt:123/456',
 
    'rt://ticket/234',
 
}
 

	
 
NON_LINK_METADATA_STRINGS = {
 
    '',
 
    ' ',
 
    '     ',
 
}
 

	
 
NON_STRING_METADATA_VALUES = [
 
    Decimal(5),
 
    FY_MID_DATE,
 
    Amount(50),
 
    Amount(500, None),
 
]
 

	
 
OPENING_EQUITY_ACCOUNTS = itertools.cycle([
 
    'Equity:Funds:Unrestricted',
 
    'Equity:Funds:Restricted',
 
    'Equity:OpeningBalance',
 
])
 

	
 
class ODSCell:
 
    @classmethod
 
    def from_row(cls, row):
 
        return row.getElementsByType(odf.table.TableCell)
 

	
 
    @classmethod
 
    def from_sheet(cls, spreadsheet):
 
        for row in spreadsheet.getElementsByType(odf.table.TableRow):
 
            yield list(cls.from_row(row))
 

	
 
    @classmethod
 
    def from_ods_file(cls, path):
 
        ods = odf.opendocument.load(path)
 
        return cls.from_sheet(ods.spreadsheet)
 

	
 

	
 
def OpeningBalance(acct=None, **txn_meta):
 
    if acct is None:
 
        acct = next(OPENING_EQUITY_ACCOUNTS)
 
    return Transaction(**txn_meta, postings=[
 
        ('Assets:Receivable:Accounts', 100),
 
        ('Assets:Receivable:Loans', 200),
 
        ('Liabilities:Payable:Accounts', -15),
 
        ('Liabilities:Payable:Vacation', -25),
 
        (acct, -260),
 
    ])
 

	
 
class TestBooksLoader(books.Loader):
 
    def __init__(self, source):
 
        self.source = source
 

	
 
    def load_all(self, from_year=None):
 
        return bc_loader.load_file(self.source)
 

	
 
    def load_fy_range(self, from_fy, to_fy=None):
 
        return self.load_all()
 

	
 

	
 
class TestConfig:
 
    def __init__(self, *,
 
                 books_path=None,
 
                 payment_threshold=0,
 
                 repo_path=None,
 
                 rt_client=None,
 
    ):
 
        if books_path is None:
 
            self._books_loader = None
 
        else:
 
            self._books_loader = TestBooksLoader(books_path)
 
        self._payment_threshold = Decimal(payment_threshold)
 
        self.repo_path = test_path(repo_path)
 
        self._rt_client = rt_client
 
        if rt_client is None:
 
            self._rt_wrapper = None
 
        else:
 
            self._rt_wrapper = rtutil.RT(rt_client)
 

	
 
    def books_loader(self):
 
        return self._books_loader
 

	
 
    def config_file_path(self):
 
        return test_path('userconfig/conservancy_beancount/config.ini')
 

	
 
    def payment_threshold(self):
 
        return self._payment_threshold
 

	
 
    def repository_path(self):
 
        return self.repo_path
 

	
 
    def rt_client(self):
 
        return self._rt_client
 

	
 
    def rt_wrapper(self):
 
        return self._rt_wrapper
 

	
 

	
 
class _TicketBuilder:
 
    MESSAGE_ATTACHMENTS = [
 
        ('(Unnamed)', 'multipart/alternative', '0b'),
 
        ('(Unnamed)', 'text/plain', '1.2k'),
 
        ('(Unnamed)', 'text/html', '1.4k'),
 
    ]
 
    MISC_ATTACHMENTS = [
 
        ('Forwarded Message.eml', 'message/rfc822', '3.1k'),
 
        ('photo.jpg', 'image/jpeg', '65.2k'),
 
        ('ConservancyInvoice-301.pdf', 'application/pdf', '326k'),
 
        ('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'),
 
        ('statement.txt', 'text/plain', '652b'),
 
        ('screenshot.png', 'image/png', '1.9m'),
 
    ]
 

	
 
    def __init__(self):
 
        self.id_seq = itertools.count(1)
 
        self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
 

	
 
    def new_attch(self, attch):
 
        return (str(next(self.id_seq)), *attch)
 

	
 
    def new_msg_with_attachments(self, attachments_count=1):
 
        for attch in self.MESSAGE_ATTACHMENTS:
 
            yield self.new_attch(attch)
 
        for _ in range(attachments_count):
 
            yield self.new_attch(next(self.misc_attchs))
 

	
 
    def new_messages(self, messages_count, attachments_count=None):
 
        for n in range(messages_count):
 
            if attachments_count is None:
 
                att_count = messages_count - n
 
            else:
 
                att_count = attachments_count
 
            yield from self.new_msg_with_attachments(att_count)
 

	
 

	
 
class RTClient:
 
    _builder = _TicketBuilder()
 
    DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
 
    TICKET_DATA = {
 
        '1': list(_builder.new_messages(1, 3)),
 
        '2': list(_builder.new_messages(2, 1)),
 
        '3': list(_builder.new_messages(3, 0)),
 
    }
 
    del _builder
 

	
 
    def __init__(self,
 
                 url=DEFAULT_URL,
 
                 default_login=None,
 
                 default_password=None,
 
                 proxy=None,
 
                 default_queue='General',
 
                 skip_login=False,
 
                 verify_cert=True,
 
                 http_auth=None,
 
                 want_cfs=True,
 
    ):
 
        self.url = url
 
        if http_auth is None:
 
            self.user = default_login
 
            self.password = default_password
 
            self.auth_method = 'login'
 
            self.login_result = skip_login or None
 
        else:
 
            self.user = http_auth.username
 
            self.password = http_auth.password
 
            self.auth_method = type(http_auth).__name__
 
            self.login_result = True
 
        self.last_login = None
 
        self.want_cfs = want_cfs
 

	
 
    def login(self, login=None, password=None):
 
        if login is None and password is None:
 
            login = self.user
 
            password = self.password
 
        self.login_result = bool(login and password and not password.startswith('bad'))
 
        self.last_login = (login, password, self.login_result)
 
        return self.login_result
 

	
 
    def get_attachments(self, ticket_id):
 
        try:
 
            return list(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            return None
 

	
 
    def get_attachment(self, ticket_id, attachment_id):
 
        try:
 
            att_seq = iter(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            return None
 
        att_id = str(attachment_id)
 
        multipart_id = None
 
        for attch in att_seq:
 
            if attch[0] == att_id:
 
                break
 
            elif attch[2].startswith('multipart/'):
 
                multipart_id = attch[0]
 
        else:
 
            return None
 
        tx_id = multipart_id or att_id
 
        if attch[1] == '(Unnamed)':
 
            filename = ''
 
        else:
 
            filename = attch[1]
 
        return {
 
            'id': att_id,
 
            'ContentType': attch[2],
 
            'Filename': filename,
 
            'Transaction': tx_id,
 
        }
 

	
 
    def get_ticket(self, ticket_id):
 
        ticket_id_s = str(ticket_id)
 
        if ticket_id_s not in self.TICKET_DATA:
 
            return None
 
        retval = {
 
            'id': 'ticket/{}'.format(ticket_id_s),
 
            'numerical_id': ticket_id_s,
 
            'Requestors': [
 
                f'mx{ticket_id_s}@example.org',
 
                'requestor2@example.org',
 
            ],
 
        }
 
        if self.want_cfs:
 
            retval['CF.{payment-method}'] = f'payment method {ticket_id_s}'
 
            retval['CF.{payment-to}'] = f'Hon. Mx. {ticket_id_s}'
 
        return retval
 

	
 
    def get_user(self, user_id):
 
        user_id_s = str(user_id)
 
        match = re.search(r'(\d+)@', user_id_s)
 
        if match is None:
 
            email = f'mx{user_id_s}@example.org'
 
            user_id_num = int(user_id_s)
 
        else:
 
            email = user_id_s
 
            user_id_num = int(match.group(1))
 
        retval = {
 
            'id': f'user/{user_id_num}',
 
            'EmailAddress': email,
 
            'Name': email,
 
        }
 
        if self.want_cfs:
 
            retval['RealName'] = f'Mx. {user_id_num}'
 
        return retval
0 comments (0 inline, 0 general)