Changeset - 837fcec8f0db
[Not reviewed]
0 3 0
Brett Smith - 4 years ago 2020-07-29 21:22:09
brettcsmith@brettcsmith.org
reports: Add BaseODS.set_common_properties() method.
3 files changed with 47 insertions and 0 deletions:
0 comments (0 inline, 0 general)
conservancy_beancount/reports/core.py
Show inline comments
 
"""core.py - Common data classes for reporting functionality"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import abc
 
import collections
 
import copy
 
import datetime
 
import enum
 
import itertools
 
import operator
 
import re
 
import shlex
 
import sys
 
import urllib.parse as urlparse
 

	
 
import babel.core  # type:ignore[import]
 
import babel.numbers  # type:ignore[import]
 

	
 
import git  # type:ignore[import]
 

	
 
import odf.config  # type:ignore[import]
 
import odf.element  # type:ignore[import]
 
import odf.meta  # type:ignore[import]
 
import odf.number  # type:ignore[import]
 
import odf.opendocument  # type:ignore[import]
 
import odf.style  # type:ignore[import]
 
import odf.table  # type:ignore[import]
 
import odf.text  # type:ignore[import]
 

	
 
from decimal import Decimal
 
from pathlib import Path
 

	
 
from beancount.core import amount as bc_amount
 
from odf.namespaces import TOOLSVERSION  # type:ignore[import]
 

	
 
from ..cliutil import VERSION
 
from .. import data
 
from .. import filters
 
from .. import rtutil
 

	
 
from typing import (
 
    cast,
 
    overload,
 
    Any,
 
    BinaryIO,
 
    Callable,
 
    Dict,
 
    Generic,
 
    Iterable,
 
    Iterator,
 
    List,
 
    Mapping,
 
    MutableMapping,
 
    Optional,
 
    Sequence,
 
    Set,
 
    Tuple,
 
    Type,
 
    TypeVar,
 
    Union,
 
)
 
from ..beancount_types import (
 
    MetaKey,
 
    MetaValue,
 
)
 

	
 
OPENING_BALANCE_NAME = "OPENING BALANCE"
 
ENDING_BALANCE_NAME = "ENDING BALANCE"
 

	
 
DecimalCompat = data.DecimalCompat
 
BalanceType = TypeVar('BalanceType', bound='Balance')
 
ElementType = Callable[..., odf.element.Element]
 
LinkType = Union[str, Tuple[str, Optional[str]]]
 
RelatedType = TypeVar('RelatedType', bound='RelatedPostings')
 
RT = TypeVar('RT', bound=Sequence)
 
ST = TypeVar('ST')
 
T = TypeVar('T')
 

	
 
class Balance(Mapping[str, data.Amount]):
 
    """A collection of amounts mapped by currency
 

	
 
    Each key is a Beancount currency string, and each value represents the
 
    balance in that currency.
 
    """
 
    __slots__ = ('_currency_map', 'tolerance')
 
    TOLERANCE = Decimal('0.01')
 

	
 
    def __init__(self,
 
                 source: Iterable[data.Amount]=(),
 
                 tolerance: Optional[Decimal]=None,
 
    ) -> None:
 
        if tolerance is None:
 
            tolerance = self.TOLERANCE
 
        self.tolerance = tolerance
 
        self._currency_map: Dict[str, data.Amount] = {}
 
        for amount in source:
 
            self._add_amount(self._currency_map, amount)
 

	
 
    def _add_amount(self,
 
                    currency_map: MutableMapping[str, data.Amount],
 
                    amount: data.Amount,
 
    ) -> None:
 
        code = amount.currency
 
        try:
 
            current_number = currency_map[code].number
 
        except KeyError:
 
            current_number = Decimal(0)
 
        currency_map[code] = data.Amount(current_number + amount.number, code)
 

	
 
    def _add_other(self,
 
                   currency_map: MutableMapping[str, data.Amount],
 
                   other: Union[data.Amount, 'Balance'],
 
    ) -> None:
 
        if isinstance(other, Balance):
 
            for amount in other.values():
 
                self._add_amount(currency_map, amount)
 
        else:
 
            self._add_amount(currency_map, other)
 

	
 
    def __repr__(self) -> str:
 
        values = [repr(amt) for amt in self.values()]
 
        return f"{type(self).__name__}({values!r})"
 

	
 
    def __str__(self) -> str:
 
        return self.format()
 

	
 
    def __abs__(self: BalanceType) -> BalanceType:
 
        return type(self)(bc_amount.abs(amt) for amt in self.values())
 

	
 
    def __add__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
 
        retval_map = self._currency_map.copy()
 
        self._add_other(retval_map, other)
 
        return type(self)(retval_map.values())
 

	
 
    def __sub__(self: BalanceType, other: Union[data.Amount, 'Balance']) -> BalanceType:
 
        return self.__add__(-other)
 

	
 
    def __eq__(self, other: Any) -> bool:
 
        if isinstance(other, Balance):
 
            clean_self = self.clean_copy()
 
            clean_other = other.clean_copy()
 
            return len(clean_self) == len(clean_other) and all(
 
                clean_self[key] == clean_other.get(key) for key in clean_self
 
            )
 
        else:
 
            return super().__eq__(other)
 

	
 
    def __neg__(self: BalanceType) -> BalanceType:
 
        return type(self)(-amt for amt in self.values())
 

	
 
    def __pos__(self: BalanceType) -> BalanceType:
 
        return self
 

	
 
    def __getitem__(self, key: str) -> data.Amount:
 
        return self._currency_map[key]
 

	
 
    def __iter__(self) -> Iterator[str]:
 
        return iter(self._currency_map)
 

	
 
    def __len__(self) -> int:
 
        return len(self._currency_map)
 

	
 
    def _all_amounts(self,
 
                     op_func: Callable[[DecimalCompat, DecimalCompat], bool],
 
                     operand: DecimalCompat,
 
    ) -> bool:
 
        return all(op_func(amt.number, operand) for amt in self.values())
 

	
 
    def copy(self: BalanceType, tolerance: Optional[Decimal]=None) -> BalanceType:
 
        if tolerance is None:
 
            tolerance = self.tolerance
 
        return type(self)(self.values(), tolerance)
 

	
 
    def clean_copy(self: BalanceType, tolerance: Optional[Decimal]=None) -> BalanceType:
 
        if tolerance is None:
 
            tolerance = self.tolerance
 
        return type(self)(
 
            (amount for amount in self.values() if abs(amount.number) >= tolerance),
 
            tolerance,
 
        )
 

	
 
    @staticmethod
 
    def within_tolerance(dec: DecimalCompat, tolerance: DecimalCompat) -> bool:
 
        dec = cast(Decimal, dec)
 
        return abs(dec) < tolerance
 

	
 
    def eq_zero(self) -> bool:
 
        """Returns true if all amounts in the balance == 0, within tolerance."""
 
        return self._all_amounts(self.within_tolerance, self.tolerance)
 

	
 
    is_zero = eq_zero
 

	
 
    def ge_zero(self) -> bool:
 
        """Returns true if all amounts in the balance >= 0, within tolerance."""
 
        op_func = operator.gt if self.tolerance else operator.ge
 
        return self._all_amounts(op_func, -self.tolerance)
 

	
 
    def le_zero(self) -> bool:
 
        """Returns true if all amounts in the balance <= 0, within tolerance."""
 
        op_func = operator.lt if self.tolerance else operator.le
 
        return self._all_amounts(op_func, self.tolerance)
 

	
 
    def format(self,
 
               fmt: Optional[str]='#,##0.00 ¤¤',
 
               sep: str=', ',
 
               empty: str="Zero balance",
 
               zero: Optional[str]=None,
 
               tolerance: Optional[Decimal]=None,
 
    ) -> str:
 
        """Formats the balance as a string with the given parameters
 

	
 
        If the balance is completely empty, return ``empty``.
...
 
@@ -863,384 +867,400 @@ class BaseODS(BaseSpreadsheet[RT, ST], metaclass=abc.ABCMeta):
 

	
 
    def _merge_styles(self,
 
                      new_style: odf.style.Style,
 
                      sources: Iterable[odf.style.Style],
 
    ) -> None:
 
        for elem in sources:
 
            for key, new_value in self.iter_attributes(elem):
 
                old_value = new_style.getAttribute(key)
 
                if (key == 'name'
 
                    or key == 'displayname'
 
                    or old_value == new_value):
 
                    pass
 
                elif old_value is None:
 
                    new_style.setAttribute(key, new_value)
 
                else:
 
                    raise ValueError(f"cannot merge styles with conflicting {key}")
 
            for child in elem.childNodes:
 
                new_style.addElement(self.copy_element(child))
 

	
 
    def merge_styles(self,
 
                     *styles: Union[str, odf.style.Style, None],
 
    ) -> Optional[odf.style.Style]:
 
        """Create a new style from multiple existing styles
 

	
 
        Given any number of existing styles, create a new style that combines
 
        all of those styles' attributes and properties, add it to the document
 
        styles, and return it.
 

	
 
        Styles can be specified by name, or by passing in their Style element.
 
        For convenience, you can also pass in None as an argument; None will
 
        simply be skipped.
 

	
 
        Results are cached. If you repeatedly call this method with the same
 
        arguments, you'll keep getting the same style returned, which will
 
        only be added to the document once.
 

	
 
        If you pass in zero real style arguments, returns None.
 
        If you pass in one style argument, returns that style unchanged.
 
        If you pass in a style that doesn't already exist in the document,
 
        or if you pass in styles that can't be merged (because they have
 
        conflicting attributes), raises ValueError.
 
        """
 
        name_map: Dict[str, odf.style.Style] = {}
 
        for name in self._merge_style_iter_names(styles):
 
            source = odf.style.Style(name=name)
 
            found = self.find_child(self.document.styles, source)
 
            if found is None:
 
                raise ValueError(f"no style named {name!r}")
 
            name_map[name] = found
 
        if not name_map:
 
            retval = None
 
        elif len(name_map) == 1:
 
            _, retval = name_map.popitem()
 
        else:
 
            new_name = f'Merge_{"_".join(sorted(name_map))}'
 
            retval = self.ensure_child(
 
                self.document.styles, odf.style.Style, name=new_name,
 
            )
 
            if retval.firstChild is None:
 
                self._merge_styles(retval, name_map.values())
 
        return retval
 

	
 
    ### Sheets
 

	
 
    def lock_first_column(self, sheet: Optional[odf.table.Table]=None) -> None:
 
        """Lock the first column of cells under the given sheet
 

	
 
        This method sets all the appropriate settings to "lock" the first column
 
        of cells in a sheet, so it stays in view even as the viewer scrolls
 
        across the sheet. If a sheet is not given, works on ``self.sheet``.
 
        """
 
        if sheet is None:
 
            sheet = self.sheet
 
        config_map = self.ensure_config_map_entry(
 
            self.view, 'Tables', sheet.getAttribute('name'),
 
        )
 
        self.set_config(config_map, 'PositionRight', 1, 'int')
 
        self.set_config(config_map, 'HorizontalSplitMode', 2, 'short')
 
        self.set_config(config_map, 'HorizontalSplitPosition', 1, 'short')
 

	
 
    def lock_first_row(self, sheet: Optional[odf.table.Table]=None) -> None:
 
        """Lock the first row of cells under the given sheet
 

	
 
        This method sets all the appropriate settings to "lock" the first row
 
        of cells in a sheet, so it stays in view even as the viewer scrolls
 
        through rows. If a sheet is not given, works on ``self.sheet``.
 
        """
 
        if sheet is None:
 
            sheet = self.sheet
 
        config_map = self.ensure_config_map_entry(
 
            self.view, 'Tables', sheet.getAttribute('name'),
 
        )
 
        self.set_config(config_map, 'PositionBottom', 1, 'int')
 
        self.set_config(config_map, 'VerticalSplitMode', 2, 'short')
 
        self.set_config(config_map, 'VerticalSplitPosition', 1, 'short')
 

	
 
    def set_open_sheet(self, sheet: Union[str, odf.table.Table, None]=None) -> None:
 
        """Set which sheet is open in the document
 

	
 
        When the user first opens the spreadsheet, their view will be on this
 
        sheet. You can provide a sheet name string or sheet object. With no
 
        argument, defaults to ``self.sheet``.
 
        """
 
        if sheet is None:
 
            sheet = self.sheet
 
        if not isinstance(sheet, str):
 
            sheet = sheet.getAttribute('name')
 
            if not isinstance(sheet, str):
 
                raise ValueError("sheet argument has no name for setting")
 
        self.set_config(self.view, 'ActiveTable', sheet, 'string')
 

	
 
    def use_sheet(self, name: str) -> odf.table.Table:
 
        """Switch the active sheet ``self.sheet`` to the one with the given name
 

	
 
        If there is no sheet with the given name, create it and append it to
 
        the spreadsheet first.
 

	
 
        If the current active sheet is empty when this method is called, it
 
        will be removed from the spreadsheet.
 
        """
 
        try:
 
            empty_sheet = not self.sheet.hasChildNodes()
 
        except AttributeError:
 
            empty_sheet = False
 
        if empty_sheet:
 
            self.document.spreadsheet.removeChild(self.sheet)
 
        self.sheet = self.ensure_child(
 
            self.document.spreadsheet, odf.table.Table, name=name,
 
        )
 
        return self.sheet
 

	
 
    ### Initialization hooks
 

	
 
    def init_settings(self) -> None:
 
        """Hook called to initialize settings
 

	
 
        This method is called by __init__ to populate
 
        ``self.document.settings``. This implementation creates the barest
 
        skeleton structure necessary to support other methods, in particular
 
        ``lock_first_row``.
 
        """
 
        view_settings = self.ensure_child(
 
            self.document.settings, odf.config.ConfigItemSet, name='ooo:view-settings',
 
        )
 
        views = self.ensure_child(
 
            view_settings, odf.config.ConfigItemMapIndexed, name='Views',
 
        )
 
        self.view = self.ensure_child(views, odf.config.ConfigItemMapEntry)
 
        self.set_config(self.view, 'ViewId', 'view1')
 

	
 
    def init_styles(self) -> None:
 
        """Hook called to initialize settings
 

	
 
        This method is called by __init__ to populate
 
        ``self.document.styles``. This implementation creates basic building
 
        block cell styles often used in financial reports.
 
        """
 
        styles = self.document.styles
 
        self.style_bold = self.ensure_child(
 
            styles, odf.style.Style, name='Bold', family='table-cell',
 
        )
 
        self.ensure_child(
 
            self.style_bold, odf.style.TextProperties, fontweight='bold',
 
        )
 

	
 
        date_style = self.replace_child(styles, odf.number.DateStyle, name='ISODate')
 
        date_style.addElement(odf.number.Year(style='long'))
 
        date_style.addElement(odf.number.Text(text='-'))
 
        date_style.addElement(odf.number.Month(style='long'))
 
        date_style.addElement(odf.number.Text(text='-'))
 
        date_style.addElement(odf.number.Day(style='long'))
 
        self.style_date = self.ensure_child(
 
            styles,
 
            odf.style.Style,
 
            name=f'{date_style.getAttribute("name")}Cell',
 
            family='table-cell',
 
            datastylename=date_style,
 
        )
 

	
 
        self.style_starttext: odf.style.Style
 
        self.style_centertext: odf.style.Style
 
        self.style_endtext: odf.style.Style
 
        for textalign in ['start', 'center', 'end']:
 
            aligned_style = self.replace_child(
 
                styles, odf.style.Style, name=f'{textalign.title()}Text',
 
            )
 
            aligned_style.setAttribute('family', 'table-cell')
 
            aligned_style.addElement(odf.style.ParagraphProperties(textalign=textalign))
 
            setattr(self, f'style_{textalign}text', aligned_style)
 

	
 
    ### Properties
 

	
 
    def set_common_properties(self,
 
                              repo: Optional[git.Repo]=None,
 
                              command: Optional[Sequence[str]]=sys.argv,
 
    ) -> None:
 
        if repo is None:
 
            git_shahex = '<none>'
 
            git_dirty = True
 
        else:
 
            git_shahex = repo.head.commit.hexsha
 
            git_dirty = repo.is_dirty()
 
        self.set_custom_property('GitSHA', git_shahex)
 
        self.set_custom_property('GitDirty', git_dirty, 'boolean')
 
        if command is not None:
 
            command_s = ' '.join(shlex.quote(s) for s in command)
 
            self.set_custom_property('ReportCommand', command_s)
 

	
 
    def set_custom_property(self,
 
                            name: str,
 
                            value: Any,
 
                            valuetype: Optional[str]=None,
 
    ) -> odf.meta.UserDefined:
 
        if valuetype is None:
 
            if isinstance(value, bool):
 
                valuetype = 'boolean'
 
            elif isinstance(value, (datetime.date, datetime.datetime)):
 
                valuetype = 'date'
 
            elif isinstance(value, (int, float, Decimal)):
 
                valuetype = 'float'
 
        if not isinstance(value, str):
 
            if valuetype == 'boolean':
 
                value = 'true' if value else 'false'
 
            elif valuetype == 'date':
 
                value = value.isoformat()
 
            else:
 
                value = str(value)
 
        retval = self.ensure_child(self.document.meta, odf.meta.UserDefined, name=name)
 
        if valuetype is None:
 
            try:
 
                retval.removeAttribute('valuetype')
 
            except KeyError:
 
                pass
 
        else:
 
            retval.setAttribute('valuetype', valuetype)
 
        retval.childNodes.clear()
 
        retval.addText(value)
 
        return retval
 

	
 
    def set_properties(self, *,
 
                       created: Optional[datetime.datetime]=None,
 
                       generator: str='conservancy_beancount',
 
    ) -> None:
 
        if created is None:
 
            created = datetime.datetime.now()
 
        created_elem = self.ensure_child(self.document.meta, odf.meta.CreationDate)
 
        created_elem.childNodes.clear()
 
        created_elem.addText(created.isoformat())
 
        generator_elem = self.ensure_child(self.document.meta, odf.meta.Generator)
 
        generator_elem.childNodes.clear()
 
        generator_elem.addText(f'{generator}/{VERSION} {TOOLSVERSION}')
 

	
 
    ### Rows and cells
 

	
 
    def add_row(self, *cells: odf.table.TableCell, **attrs: Any) -> odf.table.TableRow:
 
        row = odf.table.TableRow(**attrs)
 
        for cell in cells:
 
            row.addElement(cell)
 
        self.sheet.addElement(row)
 
        return row
 

	
 
    def balance_cell(self, balance: Balance, **attrs: Any) -> odf.table.TableCell:
 
        balance = balance.clean_copy() or balance
 
        balance_currency_count = len(balance)
 
        if balance_currency_count == 0:
 
            return self.float_cell(0, **attrs)
 
        elif balance_currency_count == 1:
 
            amount = next(iter(balance.values()))
 
            attrs['stylename'] = self.merge_styles(
 
                attrs.get('stylename'), self.currency_style(amount.currency),
 
            )
 
            return self.currency_cell(amount, **attrs)
 
        else:
 
            lines = [babel.numbers.format_currency(
 
                number, currency, locale=self.locale, format_type=self.currency_fmt_key,
 
            ) for number, currency in balance.values()]
 
            attrs['stylename'] = self.merge_styles(
 
                attrs.get('stylename'), self.style_endtext,
 
            )
 
            return self.multiline_cell(lines, **attrs)
 

	
 
    def currency_cell(self, amount: data.Amount, **attrs: Any) -> odf.table.TableCell:
 
        if 'stylename' not in attrs:
 
            attrs['stylename'] = self.currency_style(amount.currency)
 
        number, currency = amount
 
        cell = odf.table.TableCell(valuetype='currency', value=number, **attrs)
 
        cell.addElement(odf.text.P(text=babel.numbers.format_currency(
 
            number, currency, locale=self.locale, format_type=self.currency_fmt_key,
 
        )))
 
        return cell
 

	
 
    def date_cell(self, date: datetime.date, **attrs: Any) -> odf.table.TableCell:
 
        attrs.setdefault('stylename', self.style_date)
 
        cell = odf.table.TableCell(valuetype='date', datevalue=date, **attrs)
 
        cell.addElement(odf.text.P(text=date.isoformat()))
 
        return cell
 

	
 
    def float_cell(self, value: Union[int, float, Decimal], **attrs: Any) -> odf.table.TableCell:
 
        cell = odf.table.TableCell(valuetype='float', value=value, **attrs)
 
        cell.addElement(odf.text.P(text=str(value)))
 
        return cell
 

	
 
    def _meta_link_pairs(self, links: Iterable[Optional[str]]) -> Iterator[Tuple[str, str]]:
 
        for href in links:
 
            if href is None:
 
                continue
 
            elif self.rt_wrapper is not None:
 
                rt_ids = self.rt_wrapper.parse(href)
 
                rt_href = rt_ids and self.rt_wrapper.url(*rt_ids)
 
            else:
 
                rt_ids = None
 
                rt_href = None
 
            if rt_ids is None or rt_href is None:
 
                # '..' pops the ODS filename off the link path. In other words,
 
                # make the link relative to the directory the ODS is in.
 
                href_path = Path('..', href)
 
                href = str(href_path)
 
                text = href_path.name
 
            else:
 
                rt_path = urlparse.urlparse(rt_href).path
 
                if rt_path.endswith('/Ticket/Display.html'):
 
                    text = rtutil.RT.unparse(*rt_ids)
 
                else:
 
                    text = urlparse.unquote(Path(rt_path).name)
 
                href = rt_href
 
            yield (href, text)
 

	
 
    def meta_links_cell(self, links: Iterable[Optional[str]], **attrs: Any) -> odf.table.TableCell:
 
        return self.multilink_cell(self._meta_link_pairs(links), **attrs)
 

	
 
    def multiline_cell(self, lines: Iterable[Any], **attrs: Any) -> odf.table.TableCell:
 
        cell = odf.table.TableCell(valuetype='string', **attrs)
 
        for line in lines:
 
            cell.addElement(odf.text.P(text=str(line)))
 
        return cell
 

	
 
    def multilink_cell(self, links: Iterable[LinkType], **attrs: Any) -> odf.table.TableCell:
 
        cell = odf.table.TableCell(valuetype='string', **attrs)
 
        for link in links:
 
            if isinstance(link, tuple):
 
                href, text = link
 
            else:
 
                href = link
 
                text = None
 
            cell.addElement(odf.text.P())
 
            cell.lastChild.addElement(odf.text.A(
 
                type='simple', href=href, text=text or href,
 
            ))
 
        return cell
 

	
 
    def string_cell(self, text: str, **attrs: Any) -> odf.table.TableCell:
 
        cell = odf.table.TableCell(valuetype='string', **attrs)
 
        cell.addElement(odf.text.P(text=text))
 
        return cell
 

	
 
    def write_row(self, row: RT) -> None:
 
        """Write a single row of input data to the spreadsheet
 

	
 
        This default implementation adds a single row to the spreadsheet,
 
        with one cell per element of the row. The type of each element
 
        determines what kind of cell is created.
 

	
 
        This implementation will help get you started, but you'll probably
 
        want to override it to specify styles.
 
        """
 
        out_row = odf.table.TableRow()
 
        for cell_source in row:
 
            if isinstance(cell_source, (int, float, Decimal)):
 
                cell = self.float_cell(cell_source)
 
            else:
 
                cell = self.string_cell(cell_source)
 
            out_row.addElement(cell)
 
        self.sheet.addElement(out_row)
 

	
 
    def save_file(self, out_file: BinaryIO) -> None:
 
        self.document.write(out_file)
 

	
 
    def save_path(self, path: Path, mode: str='w') -> None:
 
        with path.open(f'{mode}b') as out_file:
 
            out_file = cast(BinaryIO, out_file)
 
            self.save_file(out_file)
 

	
 

	
 
def account_balances(
 
        groups: Mapping[data.Account, PeriodPostings],
 
        order: Optional[Sequence[str]]=None,
 
) -> Iterator[Tuple[str, Balance]]:
 
    """Iterate account balances over a date range
 

	
 
    1. ``subclass = PeriodPostings.with_start_date(start_date)``
 
    2. ``groups = dict(subclass.group_by_account(postings))``
 
    3. ``for acct, bal in account_balances(groups, [optional ordering]): ...``
 

	
 
    This function returns an iterator of 2-tuples ``(account, balance)``
 
    that you can use to generate a report in the style of ``ledger balance``.
 
    The accounts are accounts in ``groups`` that appeared under one of the
 
    account name strings in ``order``. ``balance`` is the corresponding
 
    balance over the time period (``groups[key].period_bal``). Accounts are
 
    iterated in the order provided by ``sort_and_filter_accounts()``.
 

	
tests/test_reports_spreadsheet.py
Show inline comments
...
 
@@ -559,192 +559,210 @@ def test_ods_writer_currency_cell(ods_writer, cell_source, style_name):
 
    number, currency = cell_source
 
    assert cell.getAttribute('valuetype') == 'currency'
 
    assert cell.getAttribute('value') == str(number)
 
    assert cell.getAttribute('stylename') == style_name
 
    expected = babel.numbers.format_currency(
 
        number, currency, locale=EN_US, format_type='accounting',
 
    )
 
    assert get_text(cell) == expected
 

	
 
@pytest.mark.parametrize('currency', [
 
    'EUR',
 
    'CHF',
 
    'GBP',
 
])
 
def test_ods_writer_currency_cell_default_style(ods_writer, currency):
 
    amount = testutil.Amount(1000, currency)
 
    expected_stylename = ods_writer.currency_style(currency).getAttribute('name')
 
    cell = ods_writer.currency_cell(amount)
 
    assert cell.getAttribute('valuetype') == 'currency'
 
    assert cell.getAttribute('value') == '1000'
 
    assert cell.getAttribute('stylename') == expected_stylename
 

	
 
@pytest.mark.parametrize('date,style_name', testutil.combine_values(
 
    [datetime.date(1980, 2, 5), datetime.date(2030, 10, 30)],
 
    XML_NAMES_LIST,
 
))
 
def test_ods_writer_date_cell(ods_writer, date, style_name):
 
    if style_name is None:
 
        expect_style = ods_writer.style_date.getAttribute('name')
 
        cell = ods_writer.date_cell(date)
 
    else:
 
        expect_style = style_name
 
        cell = ods_writer.date_cell(date, stylename=style_name)
 
    date_s = date.isoformat()
 
    assert cell.getAttribute('valuetype') == 'date'
 
    assert cell.getAttribute('datevalue') == date_s
 
    assert cell.getAttribute('stylename') == expect_style
 
    assert get_text(cell) == date_s
 

	
 
@pytest.mark.parametrize('cell_source,style_name', testutil.combine_values(
 
    NUMERIC_CELL_DATA,
 
    XML_NAMES,
 
))
 
def test_ods_writer_float_cell(ods_writer, cell_source, style_name):
 
    cell = ods_writer.float_cell(cell_source, stylename=style_name)
 
    assert cell.getAttribute('valuetype') == 'float'
 
    assert cell.getAttribute('stylename') == style_name
 
    expected = str(cell_source)
 
    assert cell.getAttribute('value') == expected
 
    assert get_text(cell) == expected
 

	
 
def test_ods_writer_meta_links_cell(ods_writer):
 
    rt_client = testutil.RTClient()
 
    ods_writer.rt_wrapper = rtutil.RT(rt_client)
 
    rt_url = rt_client.DEFAULT_URL[:-10]
 
    meta_links = [
 
        'rt://ticket/1',
 
        'rt://ticket/2/attachments/9',
 
        'rt:1/5',
 
        'Invoices/0123.pdf',
 
    ]
 
    cell = ods_writer.meta_links_cell(meta_links, stylename='meta1')
 
    assert cell.getAttribute('valuetype') == 'string'
 
    assert cell.getAttribute('stylename') == 'meta1'
 
    children = iter(get_children(cell, odf.text.A))
 
    child = next(children)
 
    assert child.getAttribute('type') == 'simple'
 
    expect_url = f'{rt_url}/Ticket/Display.html?id=1'
 
    assert child.getAttribute('href') == expect_url
 
    assert get_text(child) == 'rt:1'
 
    child = next(children)
 
    assert child.getAttribute('type') == 'simple'
 
    expect_url = f'{rt_url}/Ticket/Display.html?id=2#txn-7'
 
    assert child.getAttribute('href') == expect_url
 
    assert get_text(child) == 'rt:2/9'
 
    child = next(children)
 
    assert child.getAttribute('type') == 'simple'
 
    expect_url = f'{rt_url}/Ticket/Attachment/1/5/photo.jpg'
 
    assert child.getAttribute('href') == expect_url
 
    assert get_text(child) == 'photo.jpg'
 
    child = next(children)
 
    assert child.getAttribute('type') == 'simple'
 
    expect_url = f'../{meta_links[3]}'
 
    assert child.getAttribute('href') == expect_url
 
    assert get_text(child) == '0123.pdf'
 

	
 
def test_ods_writer_multiline_cell(ods_writer):
 
    cell = ods_writer.multiline_cell(iter(STRING_CELL_DATA))
 
    assert cell.getAttribute('valuetype') == 'string'
 
    children = get_children(cell, odf.text.P)
 
    for expected, child in itertools.zip_longest(STRING_CELL_DATA, children):
 
        assert get_text(child) == expected
 

	
 
@pytest.mark.parametrize('cell_source,style_name', testutil.combine_values(
 
    LINK_CELL_DATA,
 
    XML_NAMES,
 
))
 
def test_ods_writer_multilink_singleton(ods_writer, cell_source, style_name):
 
    cell = ods_writer.multilink_cell([cell_source], stylename=style_name)
 
    assert cell.getAttribute('valuetype') == 'string'
 
    assert cell.getAttribute('stylename') == style_name
 
    try:
 
        href, text = cell_source
 
    except ValueError:
 
        href = cell_source
 
        text = None
 
    anchor = get_child(cell, odf.text.A, type='simple', href=href)
 
    assert get_text(anchor) == (text or href)
 

	
 
def test_ods_writer_multilink_cell(ods_writer):
 
    cell = ods_writer.multilink_cell(iter(LINK_CELL_DATA))
 
    assert cell.getAttribute('valuetype') == 'string'
 
    children = get_children(cell, odf.text.A)
 
    for source, child in itertools.zip_longest(LINK_CELL_DATA, children):
 
        try:
 
            href, text = source
 
        except ValueError:
 
            href = source
 
            text = None
 
        assert child.getAttribute('type') == 'simple'
 
        assert child.getAttribute('href') == href
 
        assert get_text(child) == (text or href)
 

	
 
@pytest.mark.parametrize('cell_source,style_name', testutil.combine_values(
 
    STRING_CELL_DATA,
 
    XML_NAMES,
 
))
 
def test_ods_writer_string_cell(ods_writer, cell_source, style_name):
 
    cell = ods_writer.string_cell(cell_source, stylename=style_name)
 
    assert cell.getAttribute('valuetype') == 'string'
 
    assert cell.getAttribute('stylename') == style_name
 
    assert get_text(cell) == str(cell_source)
 

	
 
def test_ods_writer_copy_element(ods_writer):
 
    child1 = odf.text.P()
 
    child1.addElement(odf.text.A(href='linkhref', text='linktext'))
 
    child2 = odf.text.P(text='para2')
 
    cell = odf.table.TableCell(stylename='cellsty')
 
    cell.addElement(child1)
 
    cell.addElement(child2)
 
    actual = ods_writer.copy_element(cell)
 
    assert actual is not cell
 
    assert actual.getAttribute('stylename') == 'cellsty'
 
    actual1, actual2 = actual.childNodes
 
    assert actual1 is not child1
 
    assert actual2 is not child2
 
    actual_a, = actual1.childNodes
 
    assert actual_a.getAttribute('href') == 'linkhref'
 
    assert actual_a.text == 'linktext'
 
    assert actual2.text == 'para2'
 

	
 
def test_ods_writer_default_properties(ods_writer):
 
    meta = ods_writer.document.meta
 
    yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
 
    creation_date_elem = get_child(meta, odf.meta.CreationDate)
 
    creation_date = datetime.datetime.strptime(
 
        creation_date_elem.text, '%Y-%m-%dT%H:%M:%S.%f',
 
    )
 
    assert creation_date > yesterday
 
    generator = get_child(meta, odf.meta.Generator)
 
    assert re.match(r'conservancy_beancount/\d+\.\d+', generator.text)
 

	
 
def test_ods_writer_set_properties(ods_writer):
 
    ctime = datetime.datetime(2009, 9, 19, 9, 49, 59)
 
    ods_writer.set_properties(created=ctime, generator='testgen')
 
    meta = ods_writer.document.meta
 
    creation_date_elem = get_child(meta, odf.meta.CreationDate)
 
    assert creation_date_elem.text == ctime.isoformat()
 
    generator = get_child(meta, odf.meta.Generator)
 
    assert re.match(r'testgen/\d+\.\d+', generator.text)
 

	
 
@pytest.mark.parametrize('value,exptype', [
 
    (1, 'float'),
 
    (12.34, 'float'),
 
    (Decimal('5.99'), 'float'),
 
    (datetime.date(2009, 8, 17), 'date'),
 
    (datetime.datetime(2009, 8, 17, 18, 38, 58), 'date'),
 
    (True, 'boolean'),
 
    (False, 'boolean'),
 
    ('foo', None)
 
])
 
def test_ods_writer_set_custom_property(ods_writer, value, exptype):
 
    cprop = ods_writer.set_custom_property('cprop', value)
 
    assert cprop.getAttribute('name') == 'cprop'
 
    assert cprop.getAttribute('valuetype') == exptype
 
    if exptype == 'boolean':
 
        expected = str(value).lower()
 
    elif exptype == 'date':
 
        expected = value.isoformat()
 
    else:
 
        expected = str(value)
 
    assert cprop.text == expected
 

	
 
def test_ods_writer_set_common_properties(ods_writer):
 
    ods_writer.set_common_properties()
 
    get_child(ods_writer.document.meta, odf.meta.UserDefined, name='ReportCommand')
 

	
 
def test_ods_writer_common_repo_properties(ods_writer):
 
    repo = testutil.TestRepo('abcd1234', False)
 
    ods_writer.set_common_properties(repo)
 
    meta = ods_writer.document.meta
 
    sha_prop = get_child(meta, odf.meta.UserDefined, name='GitSHA')
 
    assert sha_prop.text == 'abcd1234'
 
    dirty_prop = get_child(meta, odf.meta.UserDefined, name='GitDirty')
 
    assert dirty_prop.text == 'false'
 

	
 
def test_ods_writer_common_command(ods_writer):
 
    ods_writer.set_common_properties(command=['testcmd', 'testarg*'])
 
    cmd_prop = get_child(ods_writer.document.meta, odf.meta.UserDefined, name='ReportCommand')
 
    assert cmd_prop.text == 'testcmd \'testarg*\''
tests/testutil.py
Show inline comments
 
"""Mock Beancount objects for testing"""
 
# Copyright © 2020  Brett Smith
 
#
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU Affero General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU Affero General Public License for more details.
 
#
 
# You should have received a copy of the GNU Affero General Public License
 
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import itertools
 
import re
 
import unittest.mock
 

	
 
import beancount.core.amount as bc_amount
 
import beancount.core.data as bc_data
 
import beancount.loader as bc_loader
 
import beancount.parser.options as bc_options
 

	
 
import git
 
import odf.element
 
import odf.opendocument
 
import odf.table
 

	
 
from decimal import Decimal
 
from pathlib import Path
 
from typing import Any, Optional, NamedTuple
 

	
 
from conservancy_beancount import books, data, rtutil
 

	
 
EXTREME_FUTURE_DATE = datetime.date(datetime.MAXYEAR, 12, 30)
 
FUTURE_DATE = datetime.date.today() + datetime.timedelta(days=365 * 99)
 
FY_START_DATE = datetime.date(2020, 3, 1)
 
FY_MID_DATE = datetime.date(2020, 9, 1)
 
PAST_DATE = datetime.date(2000, 1, 1)
 
TESTS_DIR = Path(__file__).parent
 

	
 
# This function is a teardown fixture, but different test files use
 
# it with different scopes. Typical usage looks like:
 
#   clean_account_meta = pytest.fixture([options])(testutil.clean_account_meta)
 
def clean_account_meta():
 
    try:
 
        yield
 
    finally:
 
        data.Account.load_options_map(bc_options.OPTIONS_DEFAULTS)
 
        data.Account._meta_map.clear()
 

	
 
def _ods_cell_value_type(cell):
 
    assert cell.tagName == 'table:table-cell'
 
    return cell.getAttribute('valuetype')
 

	
 
def _ods_cell_value(cell):
 
    value_type = cell.getAttribute('valuetype')
 
    if value_type == 'currency' or value_type == 'float':
 
        return Decimal(cell.getAttribute('value'))
 
    elif value_type == 'date':
 
        return datetime.datetime.strptime(
 
            cell.getAttribute('datevalue'), '%Y-%m-%d',
 
        ).date()
 
    else:
 
        return cell.getAttribute('value')
 

	
 
def _ods_elem_text(elem):
 
    if isinstance(elem, odf.element.Text):
 
        return elem.data
 
    else:
 
        return '\0'.join(_ods_elem_text(child) for child in elem.childNodes)
 

	
 
odf.element.Element.value_type = property(_ods_cell_value_type)
 
odf.element.Element.value = property(_ods_cell_value)
 
odf.element.Element.text = property(_ods_elem_text)
 

	
 
def check_lines_match(lines, expect_patterns, source='output'):
 
    for pattern in expect_patterns:
 
        assert any(re.search(pattern, line) for line in lines), \
 
            f"{pattern!r} not found in {source}"
 

	
 
def check_logs_match(caplog, expected):
 
    records = iter(caplog.records)
 
    for exp_level, exp_msg in expected:
 
        exp_level = exp_level.upper()
 
        assert any(
 
            log.levelname == exp_level and log.message == exp_msg for log in records
 
        ), f"{exp_level} log {exp_msg!r} not found"
 

	
 
def check_post_meta(txn, *expected_meta, default=None):
 
    assert len(txn.postings) == len(expected_meta)
 
    for post, expected in zip(txn.postings, expected_meta):
 
        if not expected:
 
            assert not post.meta
 
        else:
 
            actual = None if post.meta is None else {
 
                key: post.meta.get(key, default) for key in expected
 
            }
 
            assert actual == expected
 

	
 
def combine_values(*value_seqs):
 
    stop = 0
 
    for seq in value_seqs:
 
        try:
 
            stop = max(stop, len(seq))
 
        except TypeError:
 
            pass
 
    return itertools.islice(
 
        zip(*(itertools.cycle(seq) for seq in value_seqs)),
 
        stop,
 
    )
 

	
 
def date_seq(date=FY_MID_DATE, step=1):
 
    while True:
 
        yield date
 
        date += datetime.timedelta(days=step)
 

	
 
def parse_date(s, fmt='%Y-%m-%d'):
 
    return datetime.datetime.strptime(s, fmt).date()
 

	
 
def test_path(s):
 
    if s is None:
 
        return s
 
    s = Path(s)
 
    if not s.is_absolute():
 
        s = TESTS_DIR / s
 
    return s
 

	
 
def Amount(number, currency='USD'):
 
    return bc_amount.Amount(Decimal(number), currency)
 

	
 
def Cost(number, currency='USD', date=FY_MID_DATE, label=None):
 
    return bc_data.Cost(Decimal(number), currency, date, label)
 

	
 
def Posting(account, number,
 
            currency='USD', cost=None, price=None, flag=None,
 
            _post_type=bc_data.Posting, _meta_type=None, **meta):
 
    if cost is not None:
 
        cost = Cost(*cost)
 
    if not meta:
 
        meta = None
 
    elif _meta_type:
 
        meta = _meta_type(meta)
 
    return _post_type(
 
        account,
 
        Amount(number, currency),
 
        cost,
 
        price,
 
        flag,
 
        meta,
 
    )
 

	
 
def Transaction(date=FY_MID_DATE, flag='*', payee=None,
 
                narration='', tags=None, links=None, postings=(),
 
                **meta):
 
    if isinstance(date, str):
 
        date = parse_date(date)
 
    meta.setdefault('filename', '<test>')
 
    meta.setdefault('lineno', 0)
 
    real_postings = []
 
    for post in postings:
 
        try:
 
            post.account
 
        except AttributeError:
 
            if isinstance(post[-1], dict):
 
                args = post[:-1]
 
                kwargs = post[-1]
 
            else:
 
                args = post
 
                kwargs = {}
 
            post = Posting(*args, **kwargs)
 
        real_postings.append(post)
 
    return bc_data.Transaction(
 
        meta,
 
        date,
 
        flag,
 
        payee,
 
        narration,
 
        set(tags or ''),
 
        set(links or ''),
 
        real_postings,
 
    )
 

	
 
LINK_METADATA_STRINGS = {
 
    'Invoices/304321.pdf',
 
    'rt:123/456',
 
    'rt://ticket/234',
 
}
 

	
 
NON_LINK_METADATA_STRINGS = {
 
    '',
 
    ' ',
 
    '     ',
 
}
 

	
 
NON_STRING_METADATA_VALUES = [
 
    Decimal(5),
 
    FY_MID_DATE,
 
    Amount(50),
 
    Amount(500, None),
 
]
 

	
 
OPENING_EQUITY_ACCOUNTS = itertools.cycle([
 
    'Equity:Funds:Unrestricted',
 
    'Equity:Funds:Restricted',
 
    'Equity:OpeningBalance',
 
])
 

	
 
class ODSCell:
 
    @classmethod
 
    def from_row(cls, row):
 
        return row.getElementsByType(odf.table.TableCell)
 

	
 
    @classmethod
 
    def from_sheet(cls, spreadsheet):
 
        for row in spreadsheet.getElementsByType(odf.table.TableRow):
 
            yield list(cls.from_row(row))
 

	
 
    @classmethod
 
    def from_ods_file(cls, path):
 
        ods = odf.opendocument.load(path)
 
        return cls.from_sheet(ods.spreadsheet)
 

	
 

	
 
def OpeningBalance(acct=None, **txn_meta):
 
    if acct is None:
 
        acct = next(OPENING_EQUITY_ACCOUNTS)
 
    return Transaction(**txn_meta, postings=[
 
        ('Assets:Receivable:Accounts', 100),
 
        ('Assets:Receivable:Loans', 200),
 
        ('Liabilities:Payable:Accounts', -15),
 
        ('Liabilities:Payable:Vacation', -25),
 
        (acct, -260),
 
    ])
 

	
 
class TestBooksLoader(books.Loader):
 
    def __init__(self, source):
 
        self.source = source
 

	
 
    def load_all(self, from_year=None):
 
        return bc_loader.load_file(self.source)
 

	
 
    def load_fy_range(self, from_fy, to_fy=None):
 
        return self.load_all()
 

	
 

	
 
class TestConfig:
 
    def __init__(self, *,
 
                 books_path=None,
 
                 fiscal_year=(3, 1),
 
                 payment_threshold=0,
 
                 repo_path=None,
 
                 rt_client=None,
 
    ):
 
        if books_path is None:
 
            self._books_loader = None
 
        else:
 
            self._books_loader = TestBooksLoader(books_path)
 
        self.fiscal_year = fiscal_year
 
        self._payment_threshold = Decimal(payment_threshold)
 
        self.repo_path = test_path(repo_path)
 
        self._rt_client = rt_client
 
        if rt_client is None:
 
            self._rt_wrapper = None
 
        else:
 
            self._rt_wrapper = rtutil.RT(rt_client)
 

	
 
    def books_loader(self):
 
        return self._books_loader
 

	
 
    def config_file_path(self):
 
        return test_path('userconfig/conservancy_beancount/config.ini')
 

	
 
    def fiscal_year_begin(self):
 
        return books.FiscalYear(*self.fiscal_year)
 

	
 
    def payment_threshold(self):
 
        return self._payment_threshold
 

	
 
    def repository_path(self):
 
        return self.repo_path
 

	
 
    def rt_client(self):
 
        return self._rt_client
 

	
 
    def rt_wrapper(self):
 
        return self._rt_wrapper
 

	
 

	
 
def TestRepo(head_hexsha='abcd1234', dirty=False):
 
    retval = unittest.mock.Mock(spec=git.Repo)
 
    retval.is_dirty.return_value = dirty
 
    retval.head.commit.hexsha = head_hexsha
 
    return retval
 

	
 

	
 
class _TicketBuilder:
 
    MESSAGE_ATTACHMENTS = [
 
        ('(Unnamed)', 'multipart/alternative', '0b'),
 
        ('(Unnamed)', 'text/plain', '1.2k'),
 
        ('(Unnamed)', 'text/html', '1.4k'),
 
    ]
 
    MISC_ATTACHMENTS = [
 
        ('Forwarded Message.eml', 'message/rfc822', '3.1k'),
 
        ('photo.jpg', 'image/jpeg', '65.2k'),
 
        ('ConservancyInvoice-301.pdf', 'application/pdf', '326k'),
 
        ('Company_invoice-2020030405_as-sent.pdf', 'application/pdf', '50k'),
 
        ('statement.txt', 'text/plain', '652b'),
 
        ('screenshot.png', 'image/png', '1.9m'),
 
    ]
 

	
 
    def __init__(self):
 
        self.id_seq = itertools.count(1)
 
        self.misc_attchs = itertools.cycle(self.MISC_ATTACHMENTS)
 

	
 
    def new_attch(self, attch):
 
        return (str(next(self.id_seq)), *attch)
 

	
 
    def new_msg_with_attachments(self, attachments_count=1):
 
        for attch in self.MESSAGE_ATTACHMENTS:
 
            yield self.new_attch(attch)
 
        for _ in range(attachments_count):
 
            yield self.new_attch(next(self.misc_attchs))
 

	
 
    def new_messages(self, messages_count, attachments_count=None):
 
        for n in range(messages_count):
 
            if attachments_count is None:
 
                att_count = messages_count - n
 
            else:
 
                att_count = attachments_count
 
            yield from self.new_msg_with_attachments(att_count)
 

	
 

	
 
class RTClient:
 
    _builder = _TicketBuilder()
 
    DEFAULT_URL = 'https://example.org/defaultrt/REST/1.0/'
 
    TICKET_DATA = {
 
        '1': list(_builder.new_messages(1, 3)),
 
        '2': list(_builder.new_messages(2, 1)),
 
        '3': list(_builder.new_messages(3, 0)),
 
    }
 
    del _builder
 

	
 
    def __init__(self,
 
                 url=DEFAULT_URL,
 
                 default_login=None,
 
                 default_password=None,
 
                 proxy=None,
 
                 default_queue='General',
 
                 skip_login=False,
 
                 verify_cert=True,
 
                 http_auth=None,
 
                 want_cfs=True,
 
    ):
 
        self.url = url
 
        if http_auth is None:
 
            self.user = default_login
 
            self.password = default_password
 
            self.auth_method = 'login'
 
            self.login_result = skip_login or None
 
        else:
 
            self.user = http_auth.username
 
            self.password = http_auth.password
 
            self.auth_method = type(http_auth).__name__
 
            self.login_result = True
 
        self.last_login = None
 
        self.want_cfs = want_cfs
 
        self.edits = {}
 

	
 
    def login(self, login=None, password=None):
 
        if login is None and password is None:
 
            login = self.user
 
            password = self.password
 
        self.login_result = bool(login and password and not password.startswith('bad'))
 
        self.last_login = (login, password, self.login_result)
 
        return self.login_result
 

	
 
    def get_attachments(self, ticket_id):
 
        try:
 
            return list(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            return None
 

	
 
    def get_attachment(self, ticket_id, attachment_id):
 
        try:
 
            att_seq = iter(self.TICKET_DATA[str(ticket_id)])
 
        except KeyError:
 
            return None
 
        att_id = str(attachment_id)
 
        multipart_id = None
 
        for attch in att_seq:
 
            if attch[0] == att_id:
 
                break
 
            elif attch[2].startswith('multipart/'):
 
                multipart_id = attch[0]
 
        else:
 
            return None
 
        tx_id = multipart_id or att_id
 
        if attch[1] == '(Unnamed)':
 
            filename = ''
 
        else:
 
            filename = attch[1]
 
        return {
 
            'id': att_id,
 
            'ContentType': attch[2],
 
            'Filename': filename,
 
            'Transaction': tx_id,
 
        }
 

	
 
    def get_ticket(self, ticket_id):
 
        ticket_id_s = str(ticket_id)
 
        if ticket_id_s not in self.TICKET_DATA:
 
            return None
 
        retval = {
 
            'id': 'ticket/{}'.format(ticket_id_s),
 
            'numerical_id': ticket_id_s,
 
            'Requestors': [
 
                f'mx{ticket_id_s}@example.org',
 
                'requestor2@example.org',
 
            ],
 
        }
 
        if self.want_cfs:
 
            retval['CF.{payment-amount}'] = ''
 
            retval['CF.{payment-method}'] = ''
 
            retval['CF.{payment-to}'] = f'Hon. Mx. {ticket_id_s}'
 
        return retval
 

	
 
    def edit_ticket(self, ticket_id, **kwargs):
 
        self.edits.setdefault(str(ticket_id), {}).update(kwargs)
 
        return True
 

	
 
    def get_user(self, user_id):
 
        user_id_s = str(user_id)
 
        match = re.search(r'(\d+)@', user_id_s)
 
        if match is None:
 
            email = f'mx{user_id_s}@example.org'
 
            user_id_num = int(user_id_s)
 
        else:
 
            email = user_id_s
 
            user_id_num = int(match.group(1))
 
        retval = {
 
            'id': f'user/{user_id_num}',
 
            'EmailAddress': email,
 
            'Name': email,
 
        }
 
        if self.want_cfs:
 
            retval['RealName'] = f'Mx. {user_id_num}'
 
        return retval
0 comments (0 inline, 0 general)