Changeset - 837fcec8f0db
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-07-29 21:22:09
brettcsmith@brettcsmith.org
reports: Add BaseODS.set_common_properties() method.
3 files changed with 47 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
 
"""core.py - Common data classes for reporting functionality"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import abc
 
import collections
 
import copy
 
import datetime
 
import enum
 
import itertools
 
import operator
 
import re
 
import shlex
 
import sys
 
import urllib.parse as urlparse
 

	
 
import babel.core  # type:ignore[import]
 
import babel.numbers  # type:ignore[import]
 

	
 
import git  # type:ignore[import]
 

	
 
import odf.config  # type:ignore[import]
 
import odf.element  # type:ignore[import]
 
import odf.meta  # type:ignore[import]
 
import odf.number  # type:ignore[import]
 
import odf.opendocument  # type:ignore[import]
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 
import odf.text  # type:ignore[import]
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
from beancount.core import amount as bc_amount
 
from odf.namespaces import TOOLSVERSION  # type:ignore[import]
 

	
 
from ..cliutil import VERSION
 
from .. import data
 
from .. import filters
 
from .. import rtutil
 

	
 
from typing import (
 
    cast,
 
    overload,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    Dict,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    MutableMapping,
 
    Optional,
 
    Sequence,
 
    Set,
 
    Tuple,
 
    Type,
 
    TypeVar,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaKey,
 
    MetaValue,
 
)
 

	
 
OPENING_BALANCE_NAME = "OPENING BALANCE"
 
ENDING_BALANCE_NAME = "ENDING BALANCE"
 

	
 
DecimalCompat = data.DecimalCompat
 
BalanceType = TypeVar('BalanceType', bound='Balance')
 
ElementType = Callable[..., odf.element.Element]
 
LinkType = Union[str, Tuple[str, Optional[str]]]
 
RelatedType = TypeVar('RelatedType', bound='RelatedPostings')
 
RT = TypeVar('RT', bound=Sequence)
 
ST = TypeVar('ST')
 
T = TypeVar('T')
 

	
 
class Balance(Mapping[str, data.Amount]):
 
    """A collection of amounts mapped by currency
 

	
 
    Each key is a Beancount currency string, and each value represents the
 
    balance in that currency.
 
    """
 
    __slots__ = ('_currency_map', 'tolerance')
 
    TOLERANCE = Decimal('0.01')
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Amount]=(),
 
                 tolerance: Optional[Decimal]=None,
 
    ) -> None:
 
        if tolerance is None:
 
            tolerance = self.TOLERANCE
 
        self.tolerance = tolerance
 
        self._currency_map: Dict[str, data.Amount] = {}
 
        for amount in source:
 
            self._add_amount(self._currency_map, amount)
 

	
 
    def _add_amount(self,
 
                    currency_map: MutableMapping[str, data.Amount],
 
                    amount: data.Amount,
 
    ) -> None:
 
        code = amount.currency
 
        try:
 
            current_number = currency_map[code].number
 
        except KeyError:
 
            current_number = Decimal(0)
 
        currency_map[code] = data.Amount(current_number + amount.number, code)
 

	
 
    def _add_other(self,
 
                   currency_map: MutableMapping[str, data.Amount],
 
                   other: Union[data.Amount, 'Balance'],
 
    ) -> None:
 
        if isinstance(other, Balance):
 
            for amount in other.values():
 
                self._add_amount(currency_map, amount)
...
 
@@ -959,192 +963,208 @@ class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
 
    def set_open_sheet(self, sheet: Union[str, odf.table.Table, None]=None) -> None:
 
        """Set which sheet is open in the document
 

	
 
        When the user first opens the spreadsheet, their view will be on this
 
        sheet. You can provide a sheet name string or sheet object. With no
 
        argument, defaults to ``self.sheet``.
 
        """
 
        if sheet is None:
 
            sheet = self.sheet
 
        if not isinstance(sheet, str):
 
            sheet = sheet.getAttribute('name')
 
            if not isinstance(sheet, str):
 
                raise ValueError("sheet argument has no name for setting")
 
        self.set_config(self.view, 'ActiveTable', sheet, 'string')
 

	
 
    def use_sheet(self, name: str) -> odf.table.Table:
 
        """Switch the active sheet ``self.sheet`` to the one with the given name
 

	
 
        If there is no sheet with the given name, create it and append it to
 
        the spreadsheet first.
 

	
 
        If the current active sheet is empty when this method is called, it
 
        will be removed from the spreadsheet.
 
        """
 
        try:
 
            empty_sheet = not self.sheet.hasChildNodes()
 
        except AttributeError:
 
            empty_sheet = False
 
        if empty_sheet:
 
            self.document.spreadsheet.removeChild(self.sheet)
 
        self.sheet = self.ensure_child(
 
            self.document.spreadsheet, odf.table.Table, name=name,
 
        )
 
        return self.sheet
 

	
 
    ### Initialization hooks
 

	
 
    def init_settings(self) -> None:
 
        """Hook called to initialize settings
 

	
 
        This method is called by __init__ to populate
 
        ``self.document.settings``. This implementation creates the barest
 
        skeleton structure necessary to support other methods, in particular
 
        ``lock_first_row``.
 
        """
 
        view_settings = self.ensure_child(
 
            self.document.settings, odf.config.ConfigItemSet, name='ooo:view-settings',
 
        )
 
        views = self.ensure_child(
 
            view_settings, odf.config.ConfigItemMapIndexed, name='Views',
 
        )
 
        self.view = self.ensure_child(views, odf.config.ConfigItemMapEntry)
 
        self.set_config(self.view, 'ViewId', 'view1')
 

	
 
    def init_styles(self) -> None:
 
        """Hook called to initialize settings
 

	
 
        This method is called by __init__ to populate
 
        ``self.document.styles``. This implementation creates basic building
 
        block cell styles often used in financial reports.
 
        """
 
        styles = self.document.styles
 
        self.style_bold = self.ensure_child(
 
            styles, odf.style.Style, name='Bold', family='table-cell',
 
        )
 
        self.ensure_child(
 
            self.style_bold, odf.style.TextProperties, fontweight='bold',
 
        )
 

	
 
        date_style = self.replace_child(styles, odf.number.DateStyle, name='ISODate')
 
        date_style.addElement(odf.number.Year(style='long'))
 
        date_style.addElement(odf.number.Text(text='-'))
 
        date_style.addElement(odf.number.Month(style='long'))
 
        date_style.addElement(odf.number.Text(text='-'))
 
        date_style.addElement(odf.number.Day(style='long'))
 
        self.style_date = self.ensure_child(
 
            styles,
 
            odf.style.Style,
 
            name=f'{date_style.getAttribute("name")}Cell',
 
            family='table-cell',
 
            datastylename=date_style,
 
        )
 

	
 
        self.style_starttext: odf.style.Style
 
        self.style_centertext: odf.style.Style
 
        self.style_endtext: odf.style.Style
 
        for textalign in ['start', 'center', 'end']:
 
            aligned_style = self.replace_child(
 
                styles, odf.style.Style, name=f'{textalign.title()}Text',
 
            )
 
            aligned_style.setAttribute('family', 'table-cell')
 
            aligned_style.addElement(odf.style.ParagraphProperties(textalign=textalign))
 
            setattr(self, f'style_{textalign}text', aligned_style)
 

	
 
    ### Properties
 

	
 
    def set_common_properties(self,
 
                              repo: Optional[git.Repo]=None,
 
                              command: Optional[Sequence[str]]=sys.argv,
 
    ) -> None:
 
        if repo is None:
 
            git_shahex = '<none>'
 
            git_dirty = True
 
        else:
 
            git_shahex = repo.head.commit.hexsha
 
            git_dirty = repo.is_dirty()
 
        self.set_custom_property('GitSHA', git_shahex)
 
        self.set_custom_property('GitDirty', git_dirty, 'boolean')
 
        if command is not None:
 
            command_s = ' '.join(shlex.quote(s) for s in command)
 
            self.set_custom_property('ReportCommand', command_s)
 

	
 
    def set_custom_property(self,
 
                            name: str,
 
                            value: Any,
 
                            valuetype: Optional[str]=None,
 
    ) -> odf.meta.UserDefined:
 
        if valuetype is None:
 
            if isinstance(value, bool):
 
                valuetype = 'boolean'
 
            elif isinstance(value, (datetime.date, datetime.datetime)):
 
                valuetype = 'date'
 
            elif isinstance(value, (int, float, Decimal)):
 
                valuetype = 'float'
 
        if not isinstance(value, str):
 
            if valuetype == 'boolean':
 
                value = 'true' if value else 'false'
 
            elif valuetype == 'date':
 
                value = value.isoformat()
 
            else:
 
                value = str(value)
 
        retval = self.ensure_child(self.document.meta, odf.meta.UserDefined, name=name)
 
        if valuetype is None:
 
            try:
 
                retval.removeAttribute('valuetype')
 
            except KeyError:
 
                pass
 
        else:
 
            retval.setAttribute('valuetype', valuetype)
 
        retval.childNodes.clear()
 
        retval.addText(value)
 
        return retval
 

	
 
    def set_properties(self, *,
 
                       created: Optional[datetime.datetime]=None,
 
                       generator: str='conservancy_beancount',
 
    ) -> None:
 
        if created is None:
 
            created = datetime.datetime.now()
 
        created_elem = self.ensure_child(self.document.meta, odf.meta.CreationDate)
 
        created_elem.childNodes.clear()
 
        created_elem.addText(created.isoformat())
 
        generator_elem = self.ensure_child(self.document.meta, odf.meta.Generator)
 
        generator_elem.childNodes.clear()
 
        generator_elem.addText(f'{generator}/{VERSION} {TOOLSVERSION}')
 

	
 
    ### Rows and cells
 

	
 
    def add_row(self, *cells: odf.table.TableCell, **attrs: Any) -> odf.table.TableRow:
 
        row = odf.table.TableRow(**attrs)
 
        for cell in cells:
 
            row.addElement(cell)
 
        self.sheet.addElement(row)
 
        return row
 

	
 
    def balance_cell(self, balance: Balance, **attrs: Any) -> odf.table.TableCell:
 
        balance = balance.clean_copy() or balance
 
        balance_currency_count = len(balance)
 
        if balance_currency_count == 0:
 
            return self.float_cell(0, **attrs)
 
        elif balance_currency_count == 1:
 
            amount = next(iter(balance.values()))
 
            attrs['stylename'] = self.merge_styles(
 
                attrs.get('stylename'), self.currency_style(amount.currency),
 
            )
 
            return self.currency_cell(amount, **attrs)
 
        else:
 
            lines = [babel.numbers.format_currency(
 
                number, currency, locale=self.locale, format_type=self.currency_fmt_key,
 
            ) for number, currency in balance.values()]
 
            attrs['stylename'] = self.merge_styles(
 
                attrs.get('stylename'), self.style_endtext,
 
            )
 
            return self.multiline_cell(lines, **attrs)
 

	
 
    def currency_cell(self, amount: data.Amount, **attrs: Any) -> odf.table.TableCell:
 
        if 'stylename' not in attrs:
 
            attrs['stylename'] = self.currency_style(amount.currency)
 
        number, currency = amount
 
        cell = odf.table.TableCell(valuetype='currency', value=number, **attrs)
 
        cell.addElement(odf.text.P(text=babel.numbers.format_currency(
 
            number, currency, locale=self.locale, format_type=self.currency_fmt_key,
 
        )))
 
        return cell
 

	
 
    def date_cell(self, date: datetime.date, **attrs: Any) -> odf.table.TableCell:
 
        attrs.setdefault('stylename', self.style_date)
 
        cell = odf.table.TableCell(valuetype='date', datevalue=date, **attrs)
 
        cell.addElement(odf.text.P(text=date.isoformat()))
 
        return cell
 

	
 
    def float_cell(self, value: Union[int, float, Decimal], **attrs: Any) -> odf.table.TableCell:
 
        cell = odf.table.TableCell(valuetype='float', value=value, **attrs)
 
        cell.addElement(odf.text.P(text=str(value)))
 
        return cell
 

	
 
    def _meta_link_pairs(self, links: Iterable[Optional[str]]) -> Iterator[Tuple[str, str]]:
 
        for href in links:
tests/test_reports_spreadsheet.py
Show inline comments
...
 
@@ -655,96 +655,114 @@ def test_ods_writer_multiline_cell(ods_writer):
 
))
 
def test_ods_writer_multilink_singleton(ods_writer, cell_source, style_name):
 
    cell = ods_writer.multilink_cell([cell_source], stylename=style_name)
 
    assert cell.getAttribute('valuetype') == 'string'
 
    assert cell.getAttribute('stylename') == style_name
 
    try:
 
        href, text = cell_source
 
    except ValueError:
 
        href = cell_source
 
        text = None
 
    anchor = get_child(cell, odf.text.A, type='simple', href=href)
 
    assert get_text(anchor) == (text or href)
 

	
 
def test_ods_writer_multilink_cell(ods_writer):
 
    cell = ods_writer.multilink_cell(iter(LINK_CELL_DATA))
 
    assert cell.getAttribute('valuetype') == 'string'
 
    children = get_children(cell, odf.text.A)
 
    for source, child in itertools.zip_longest(LINK_CELL_DATA, children):
 
        try:
 
            href, text = source
 
        except ValueError:
 
            href = source
 
            text = None
 
        assert child.getAttribute('type') == 'simple'
 
        assert child.getAttribute('href') == href
 
        assert get_text(child) == (text or href)
 

	
 
@pytest.mark.parametrize('cell_source,style_name', testutil.combine_values(
 
    STRING_CELL_DATA,
 
    XML_NAMES,
 
))
 
def test_ods_writer_string_cell(ods_writer, cell_source, style_name):
 
    cell = ods_writer.string_cell(cell_source, stylename=style_name)
 
    assert cell.getAttribute('valuetype') == 'string'
 
    assert cell.getAttribute('stylename') == style_name
 
    assert get_text(cell) == str(cell_source)
 

	
 
def test_ods_writer_copy_element(ods_writer):
 
    child1 = odf.text.P()
 
    child1.addElement(odf.text.A(href='linkhref', text='linktext'))
 
    child2 = odf.text.P(text='para2')
 
    cell = odf.table.TableCell(stylename='cellsty')
 
    cell.addElement(child1)
 
    cell.addElement(child2)
 
    actual = ods_writer.copy_element(cell)
 
    assert actual is not cell
 
    assert actual.getAttribute('stylename') == 'cellsty'
 
    actual1, actual2 = actual.childNodes
 
    assert actual1 is not child1
 
    assert actual2 is not child2
 
    actual_a, = actual1.childNodes
 
    assert actual_a.getAttribute('href') == 'linkhref'
 
    assert actual_a.text == 'linktext'
 
    assert actual2.text == 'para2'
 

	
 
def test_ods_writer_default_properties(ods_writer):
 
    meta = ods_writer.document.meta
 
    yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
 
    creation_date_elem = get_child(meta, odf.meta.CreationDate)
 
    creation_date = datetime.datetime.strptime(
 
        creation_date_elem.text, '%Y-%m-%dT%H:%M:%S.%f',
 
    )
 
    assert creation_date > yesterday
 
    generator = get_child(meta, odf.meta.Generator)
 
    assert re.match(r'conservancy_beancount/\d+\.\d+', generator.text)
 

	
 
def test_ods_writer_set_properties(ods_writer):
 
    ctime = datetime.datetime(2009, 9, 19, 9, 49, 59)
 
    ods_writer.set_properties(created=ctime, generator='testgen')
 
    meta = ods_writer.document.meta
 
    creation_date_elem = get_child(meta, odf.meta.CreationDate)
 
    assert creation_date_elem.text == ctime.isoformat()
 
    generator = get_child(meta, odf.meta.Generator)
 
    assert re.match(r'testgen/\d+\.\d+', generator.text)
 

	
 
@pytest.mark.parametrize('value,exptype', [
 
    (1, 'float'),
 
    (12.34, 'float'),
 
    (Decimal('5.99'), 'float'),
 
    (datetime.date(2009, 8, 17), 'date'),
 
    (datetime.datetime(2009, 8, 17, 18, 38, 58), 'date'),
 
    (True, 'boolean'),
 
    (False, 'boolean'),
 
    ('foo', None)
 
])
 
def test_ods_writer_set_custom_property(ods_writer, value, exptype):
 
    cprop = ods_writer.set_custom_property('cprop', value)
 
    assert cprop.getAttribute('name') == 'cprop'
 
    assert cprop.getAttribute('valuetype') == exptype
 
    if exptype == 'boolean':
 
        expected = str(value).lower()
 
    elif exptype == 'date':
 
        expected = value.isoformat()
 
    else:
 
        expected = str(value)
 
    assert cprop.text == expected
 

	
 
def test_ods_writer_set_common_properties(ods_writer):
 
    ods_writer.set_common_properties()
 
    get_child(ods_writer.document.meta, odf.meta.UserDefined, name='ReportCommand')
 

	
 
def test_ods_writer_common_repo_properties(ods_writer):
 
    repo = testutil.TestRepo('abcd1234', False)
 
    ods_writer.set_common_properties(repo)
 
    meta = ods_writer.document.meta
 
    sha_prop = get_child(meta, odf.meta.UserDefined, name='GitSHA')
 
    assert sha_prop.text == 'abcd1234'
 
    dirty_prop = get_child(meta, odf.meta.UserDefined, name='GitDirty')
 
    assert dirty_prop.text == 'false'
 

	
 
def test_ods_writer_common_command(ods_writer):
 
    ods_writer.set_common_properties(command=['testcmd', 'testarg*'])
 
    cmd_prop = get_child(ods_writer.document.meta, odf.meta.UserDefined, name='ReportCommand')
 
    assert cmd_prop.text == 'testcmd \'testarg*\''
tests/testutil.py
Show inline comments
 
"""Mock Beancount objects for testing"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import itertools
 
import re
 
import unittest.mock
 

	
 
import beancount.core.amount as bc_amount
 
import beancount.core.data as bc_data
 
import beancount.loader as bc_loader
 
import beancount.parser.options as bc_options
 

	
 
import git
 
import odf.element
 
import odf.opendocument
 
import odf.table
 

	
 
from decimal import Decimal
 
from pathlib import Path
 
from typing import Any, Optional, NamedTuple
 

	
 
from conservancy_beancount import books, data, rtutil
 

	
 
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
 
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
 
FY_START_DATE = datetime.date(2020, 3, 1)
 
FY_MID_DATE = datetime.date(2020, 9, 1)
 
PAST_DATE = datetime.date(2000, 1, 1)
 
TESTS_DIR = Path(__file__).parent
 

	
 
# This function is a teardown fixture, but different test files use
 
# it with different scopes. Typical usage looks like:
 
#   clean_account_meta = pytest.fixture([options])(testutil.clean_account_meta)
 
def clean_account_meta():
 
    try:
 
        yield
 
    finally:
 
        data.Account.load_options_map(bc_options.OPTIONS_DEFAULTS)
 
        data.Account._meta_map.clear()
 

	
 
def _ods_cell_value_type(cell):
 
    assert cell.tagName == 'table:table-cell'
 
    return cell.getAttribute('valuetype')
 

	
 
def _ods_cell_value(cell):
 
    value_type = cell.getAttribute('valuetype')
 
    if value_type == 'currency' or value_type == 'float':
 
        return Decimal(cell.getAttribute('value'))
 
    elif value_type == 'date':
 
        return datetime.datetime.strptime(
 
            cell.getAttribute('datevalue'), '%Y-%m-%d',
 
        ).date()
 
    else:
 
        return cell.getAttribute('value')
 

	
 
def _ods_elem_text(elem):
 
    if isinstance(elem, odf.element.Text):
 
        return elem.data
 
    else:
 
        return '\0'.join(_ods_elem_text(child) for child in elem.childNodes)
 

	
 
odf.element.Element.value_type = property(_ods_cell_value_type)
 
odf.element.Element.value = property(_ods_cell_value)
 
odf.element.Element.text = property(_ods_elem_text)
 

	
 
def check_lines_match(lines, expect_patterns, source='output'):
 
    for pattern in expect_patterns:
 
        assert any(re.search(pattern, line) for line in lines), \
 
            f"{pattern!r} not found in {source}"
 

	
 
def check_logs_match(caplog, expected):
 
    records = iter(caplog.records)
 
    for exp_level, exp_msg in expected:
 
        exp_level = exp_level.upper()
 
        assert any(
 
            log.levelname == exp_level and log.message == exp_msg for log in records
 
        ), f"{exp_level} log {exp_msg!r} not found"
 

	
 
def check_post_meta(txn, *expected_meta, default=None):
 
    assert len(txn.postings) == len(expected_meta)
 
    for post, expected in zip(txn.postings, expected_meta):
 
        if not expected:
 
            assert not post.meta
 
        else:
 
            actual = None if post.meta is None else {
 
                key: post.meta.get(key, default) for key in expected
 
            }
 
            assert actual == expected
 

	
 
def combine_values(*value_seqs):
 
    stop = 0
 
    for seq in value_seqs:
 
        try:
 
            stop = max(stop, len(seq))
 
        except TypeError:
 
            pass
 
    return itertools.islice(
 
        zip(*(itertools.cycle(seq) for seq in value_seqs)),
 
        stop,
 
    )
 

	
 
def date_seq(date=FY_MID_DATE, step=1):
 
    while True:
 
        yield date
 
        date += datetime.timedelta(days=step)
 

	
 
def parse_date(s, fmt='%Y-%m-%d'):
 
    return datetime.datetime.strptime(s, fmt).date()
 

	
...
 
@@ -195,192 +197,199 @@ NON_LINK_METADATA_STRINGS = {
 
}
 

	
 
NON_STRING_METADATA_VALUES = [
 
    Decimal(5),
 
    FY_MID_DATE,
 
    Amount(50),
 
    Amount(500, None),
 
]
 

	
 
OPENING_EQUITY_ACCOUNTS = itertools.cycle([
 
    'Equity:Funds:Unrestricted',
 
    'Equity:Funds:Restricted',
 
    'Equity:OpeningBalance',
 
])
 

	
 
class ODSCell:
 
    @classmethod
 
    def from_row(cls, row):
 
        return row.getElementsByType(odf.table.TableCell)
 

	
 
    @classmethod
 
    def from_sheet(cls, spreadsheet):
 
        for row in spreadsheet.getElementsByType(odf.table.TableRow):
 
            yield list(cls.from_row(row))
 

	
 
    @classmethod
 
    def from_ods_file(cls, path):
 
        ods = odf.opendocument.load(path)
 
        return cls.from_sheet(ods.spreadsheet)
 

	
 

	
 
def OpeningBalance(acct=None, **txn_meta):
 
    if acct is None:
 
        acct = next(OPENING_EQUITY_ACCOUNTS)
 
    return Transaction(**txn_meta, postings=[
 
        ('Assets:Receivable:Accounts', 100),
 
        ('Assets:Receivable:Loans', 200),
 
        ('Liabilities:Payable:Accounts', -15),
 
        ('Liabilities:Payable:Vacation', -25),
 
        (acct, -260),
 
    ])
 

	
 
class TestBooksLoader(books.Loader):
 
    def __init__(self, source):
 
        self.source = source
 

	
 
    def load_all(self, from_year=None):
 
        return bc_loader.load_file(self.source)
 

	
 
    def load_fy_range(self, from_fy, to_fy=None):
 
        return self.load_all()
 

	
 

	
 
class TestConfig:
 
    def __init__(self, *,
 
                 books_path=None,
 
                 fiscal_year=(3, 1),
 
                 payment_threshold=0,
 
                 repo_path=None,
 
                 rt_client=None,
 
    ):
 
        if books_path is None:
 
            self._books_loader = None
 
        else:
 
            self._books_loader = TestBooksLoader(books_path)
 
        self.fiscal_year = fiscal_year
 
        self._payment_threshold = Decimal(payment_threshold)
 
        self.repo_path = test_path(repo_path)
 
        self._rt_client = rt_client
 
        if rt_client is None:
 
            self._rt_wrapper = None
 
        else:
 
            self._rt_wrapper = rtutil.RT(rt_client)
 

	
 
    def books_loader(self):
 
        return self._books_loader
 

	
 
    def config_file_path(self):
 
        return test_path('userconfig/conservancy_beancount/config.ini')
 

	
 
    def fiscal_year_begin(self):
 
        return books.FiscalYear(*self.fiscal_year)
 

	
 
    def payment_threshold(self):
 
        return self._payment_threshold
 

	
 
    def repository_path(self):
 
        return self.repo_path
 

	
 
    def rt_client(self):
 
        return self._rt_client
 

	
 
    def rt_wrapper(self):
 
        return self._rt_wrapper
 

	
 

	
 
def TestRepo(head_hexsha='abcd1234', dirty=False):
 
    retval = unittest.mock.Mock(spec=git.Repo)
 
    retval.is_dirty.return_value = dirty
 
    retval.head.commit.hexsha = head_hexsha
 
    return retval
 

	
 

	
 
class _TicketBuilder:
 
    MESSAGE_ATTACHMENTS = [
 
        ('(Unnamed)', 'multipart/alternative', '0b'),
 
        ('(Unnamed)', 'text/plain', '1.2k'),
 
        ('(Unnamed)', 'text/html', '1.4k'),
 
    ]
 
    MISC_ATTACHMENTS = [
 
        ('Forwarded Message.eml', 'message/rfc822', '3.1k'),
 
        ('photo.jpg', 'image/jpeg', '65.2k'),
 
        ('ConservancyInvoice-301.pdf', 'application/pdf', '326k'),
 
        ('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'),
 
        ('statement.txt', 'text/plain', '652b'),
 
        ('screenshot.png', 'image/png', '1.9m'),
 
    ]
 

	
 
    def __init__(self):
 
        self.id_seq = itertools.count(1)
 
        self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
 

	
 
    def new_attch(self, attch):
 
        return (str(next(self.id_seq)), *attch)
 

	
 
    def new_msg_with_attachments(self, attachments_count=1):
 
        for attch in self.MESSAGE_ATTACHMENTS:
 
            yield self.new_attch(attch)
 
        for _ in range(attachments_count):
 
            yield self.new_attch(next(self.misc_attchs))
 

	
 
    def new_messages(self, messages_count, attachments_count=None):
 
        for n in range(messages_count):
 
            if attachments_count is None:
 
                att_count = messages_count - n
 
            else:
 
                att_count = attachments_count
 
            yield from self.new_msg_with_attachments(att_count)
 

	
 

	
 
class RTClient:
 
    _builder = _TicketBuilder()
 
    DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
 
    TICKET_DATA = {
 
        '1': list(_builder.new_messages(1, 3)),
 
        '2': list(_builder.new_messages(2, 1)),
 
        '3': list(_builder.new_messages(3, 0)),
 
    }
 
    del _builder
 

	
 
    def __init__(self,
 
                 url=DEFAULT_URL,
 
                 default_login=None,
 
                 default_password=None,
 
                 proxy=None,
 
                 default_queue='General',
 
                 skip_login=False,
 
                 verify_cert=True,
 
                 http_auth=None,
 
                 want_cfs=True,
 
    ):
 
        self.url = url
 
        if http_auth is None:
 
            self.user = default_login
 
            self.password = default_password
 
            self.auth_method = 'login'
 
            self.login_result = skip_login or None
 
        else:
 
            self.user = http_auth.username
 
            self.password = http_auth.password
 
            self.auth_method = type(http_auth).__name__
 
            self.login_result = True
 
        self.last_login = None
 
        self.want_cfs = want_cfs
 
        self.edits = {}
 

	
 
    def login(self, login=None, password=None):
 
        if login is None and password is None:
 
            login = self.user
 
            password = self.password
 
        self.login_result = bool(login and password and not password.startswith('bad'))
 
        self.last_login = (login, password, self.login_result)
 
        return self.login_result
 

	
 
    def get_attachments(self, ticket_id):
 
        try:
 
            return list(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            return None
 

	
 
    def get_attachment(self, ticket_id, attachment_id):
 
        try:
 
            att_seq = iter(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            return None
 
        att_id = str(attachment_id)
 
        multipart_id = None
 
        for attch in att_seq:
 
            if attch[0] == att_id:
0 comments (0 inline, 0 general)