import collections import datetime import decimal import functools import io import logging import operator import re import tokenize import babel.numbers from . import HOOK_KINDS from .. import errors, strparse logger = logging.getLogger('import2ledger.hooks.ledger_entry') class TokenTransformer: def __init__(self, source): try: source = source.readline except AttributeError: pass self.in_tokens = tokenize.tokenize(source) @classmethod def from_bytes(cls, b): return cls(io.BytesIO(b).readline) @classmethod def from_str(cls, s, encoding='utf-8'): return cls.from_bytes(s.encode(encoding)) def __iter__(self): for ttype, tvalue, _, _, _ in self.in_tokens: try: transformer = getattr(self, 'transform_' + tokenize.tok_name[ttype]) except AttributeError: raise ValueError("{} token {!r} not supported".format(ttype, tvalue)) yield from transformer(ttype, tvalue) def _noop_transformer(self, ttype, tvalue): yield (ttype, tvalue) transform_ENDMARKER = _noop_transformer transform_NEWLINE = _noop_transformer def transform_ENCODING(self, ttype, tvalue): self.in_encoding = tvalue return self._noop_transformer(ttype, tvalue) def transform(self): out_bytes = tokenize.untokenize(self) return out_bytes.decode(self.in_encoding) class AmountTokenTransformer(TokenTransformer): SUPPORTED_NAMES = frozenset([ 'if', 'else', 'and', 'or', 'not', 'in', ]) SUPPORTED_OPS = frozenset([ '(', ')', '+', '-', '*', '/', '==', '!=', '<', '<=', '>', '>=', ]) def __iter__(self): tokens = super().__iter__() for token in tokens: yield token if token[0] == tokenize.NAME: break else: raise ValueError("no amount in expression") yield from tokens def transform_NAME(self, ttype, tvalue): if tvalue in self.SUPPORTED_NAMES: yield from self._noop_transformer(ttype, tvalue) else: raise ValueError("unsupported bare word {!r}".format(tvalue)) def transform_NUMBER(self, ttype, tvalue): yield (tokenize.NAME, 'Decimal') yield (tokenize.OP, '(') yield (tokenize.STRING, repr(tvalue)) yield (tokenize.OP, ')') def transform_OP(self, ttype, tvalue): if tvalue == '{': try: name_type, name_value, _, _, _ = next(self.in_tokens) close_type, close_value, _, _, _ = next(self.in_tokens) if (name_type != tokenize.NAME or name_value != name_value.lower() or close_type != tokenize.OP or close_value != '}'): raise ValueError() except (StopIteration, ValueError): raise ValueError("opening { does not name variable") yield (tokenize.NAME, name_value) elif tvalue in self.SUPPORTED_OPS: yield from self._noop_transformer(ttype, tvalue) else: raise ValueError("unsupported operator {!r}".format(tvalue)) transform_STRING = TokenTransformer._noop_transformer class AccountSplitter: EVAL_GLOBALS = { 'Decimal': decimal.Decimal, } TARGET_LINE_LEN = 78 # -4 because that's how many spaces prefix an account line. TARGET_ACCTLINE_LEN = TARGET_LINE_LEN - 4 def __init__(self, signed_currencies, signed_currency_fmt, unsigned_currency_fmt, template_name): self.splits = [] self.metadata = [] self.signed_currency_fmt = signed_currency_fmt self.unsigned_currency_fmt = unsigned_currency_fmt self.signed_currencies = set(signed_currencies) self.template_name = template_name self._last_template_vars = object() def is_empty(self): return not self.splits def add(self, account, amount_expr): try: clean_expr = AmountTokenTransformer.from_str(amount_expr).transform() compiled_expr = compile(clean_expr, self.template_name, 'eval') except (SyntaxError, tokenize.TokenError, ValueError) as error: raise errors.UserInputConfigurationError(error.args[0], amount_expr) else: self.splits.append((account, compiled_expr)) self.metadata.append('') def set_metadata(self, metadata_s): self.metadata[-1] = metadata_s def _currency_decimal(self, amount, currency): return decimal.Decimal(babel.numbers.format_currency(amount, currency, '###0.###')) def _balance_amounts(self, amounts, to_amount): cmp_func = operator.lt if to_amount > 0 else operator.gt should_balance = functools.partial(cmp_func, 0) remainder = to_amount balance_index = None for index, (_, amount) in enumerate(amounts): if should_balance(amount): remainder -= amount balance_index = index if balance_index is None: pass elif (abs(remainder) / abs(to_amount)) >= decimal.Decimal('.1'): raise errors.UserInputConfigurationError( "template can't balance amounts to {}".format(to_amount), self.template_name, ) else: account_name, start_amount = amounts[balance_index] amounts[balance_index] = (account_name, start_amount + remainder) def _build_amounts(self, template_vars): try: amounts = [ (account, self._currency_decimal(eval(amount_expr, self.EVAL_GLOBALS, template_vars), template_vars['currency']), ) for account, amount_expr in self.splits ] except (ArithmeticError, NameError, TypeError, ValueError) as error: raise errors.UserInputConfigurationError( "{}: {}".format(type(error).__name__, error), "template {!r}".format(self.template_name) ) from error if sum(amt for _, amt in amounts) != 0: self._balance_amounts(amounts, template_vars['amount']) self._balance_amounts(amounts, -template_vars['amount']) return amounts def _iter_splits(self, template_vars): amounts = self._build_amounts(template_vars) if template_vars['currency'] in self.signed_currencies: amt_fmt = self.signed_currency_fmt else: amt_fmt = self.unsigned_currency_fmt for (account, amount), metadata in zip(amounts, self.metadata): if amount == 0: yield '' else: account_s = account.format_map(template_vars) amount_s = babel.numbers.format_currency(amount, template_vars['currency'], amt_fmt) sep_len = max(2, self.TARGET_ACCTLINE_LEN - len(account_s) - len(amount_s)) yield '\n {}{}{}{}'.format( account_s, ' ' * sep_len, amount_s, metadata.format_map(template_vars), ) def render_next(self, template_vars): if template_vars is not self._last_template_vars: self._split_iter = self._iter_splits(template_vars) self._last_template_vars = template_vars return next(self._split_iter) class Template: ACCOUNT_SPLIT_RE = re.compile(r'(?:\t| )\s*') DATE_FMT = '%Y/%m/%d' PAYEE_LINE_RE = re.compile(r'^\{(\w*_)*date\}\s') SIGNED_CURRENCY_FMT = '¤#,##0.###;¤-#,##0.###' UNSIGNED_CURRENCY_FMT = '#,##0.### ¤¤' def __init__(self, template_s, signed_currencies=frozenset(), date_fmt=DATE_FMT, signed_currency_fmt=SIGNED_CURRENCY_FMT, unsigned_currency_fmt=UNSIGNED_CURRENCY_FMT, template_name='