Changeset - ab9c65d20dc5
[Not reviewed]
0 5 1
Brett Smith - 7 years ago 2017-10-22 20:10:17
brettcsmith@brettcsmith.org
import2ledger: Support only importing entries in a date range.
6 files changed with 127 insertions and 14 deletions:
0 comments (0 inline, 0 general)
import2ledger/__main__.py
Show inline comments
...
 
@@ -13,49 +13,52 @@ class FileImporter:
 
        self.hooks = [hook(config) for hook in hooks.load_all()]
 
        self.stdout = stdout
 

	
 
    def import_file(self, in_file):
 
        importers = []
 
        for importer in self.importers:
 
            in_file.seek(0)
 
            if importer.can_import(in_file):
 
                importers.append(importer)
 
        if not importers:
 
            raise errors.UserInputFileError("no importers available", in_file.name)
 
        with contextlib.ExitStack() as exit_stack:
 
            output_path = self.config.get_output_path()
 
            if output_path is None:
 
                out_file = self.stdout
 
            else:
 
                out_file = exit_stack.enter_context(output_path.open('a'))
 
            for importer in importers:
 
                template = self.config.get_template(importer.TEMPLATE_KEY)
 
                default_date = self.config.get_default_date()
 
                in_file.seek(0)
 
                for entry_data in importer(in_file):
 
                    for hook in self.hooks:
 
                        hook.run(entry_data)
 
                    print(template.render(**entry_data), file=out_file, end='')
 
                        if not entry_data:
 
                            break
 
                    else:
 
                        print(template.render(**entry_data), file=out_file, end='')
 

	
 
    def import_path(self, in_path):
 
        if in_path is None:
 
            raise errors.UserInputFileError("only seekable files are supported", '<stdin>')
 
        with in_path.open() as in_file:
 
            if not in_file.seekable():
 
                raise errors.UserInputFileError("only seekable files are supported", in_path)
 
            return self.import_file(in_file)
 

	
 
    def import_paths(self, path_seq):
 
        for in_path in path_seq:
 
            try:
 
                retval = self.import_path(in_path)
 
            except (OSError, errors.UserInputError) as error:
 
                yield in_path, error
 
            else:
 
                yield in_path, retval
 

	
 

	
 
def setup_logger(logger, main_config, stream):
 
    formatter = logging.Formatter('%(name)s: %(levelname)s: %(message)s')
 
    handler = logging.StreamHandler(stream)
 
    handler.setFormatter(formatter)
 
    logger.addHandler(handler)
import2ledger/config.py
Show inline comments
 
import argparse
 
import configparser
 
import contextlib
 
import datetime
 
import locale
 
import logging
 
import os.path
 
import pathlib
 

	
 
import babel
 
import babel.numbers
 
from . import errors, template, util
 

	
 
class Configuration:
 
    HOME_PATH = pathlib.Path(os.path.expanduser('~'))
 
    DEFAULT_CONFIG_PATH = pathlib.Path(HOME_PATH, '.config', 'import2ledger.ini')
 
    DEFAULT_ENCODING = locale.getpreferredencoding()
 
    LOCALE = babel.core.Locale.default()
 
    TODAY = datetime.date.today()
 
    CONFIG_DEFAULTS = {
 
        'date_format': '%%Y/%%m/%%d',
 
        'date_range': '-',
 
        'loglevel': 'WARNING',
 
        'output_path': '-',
 
        'signed_currencies': ','.join(babel.numbers.get_territory_currencies(
 
            LOCALE.territory, start_date=TODAY)),
 
        'signed_currency_format': '¤#,##0.###;¤-#,##0.###',
 
        'unsigned_currency_format': '#,##0.### ¤¤',
 
    }
 

	
 
    def __init__(self, arglist):
 
        argparser = self._build_argparser()
 
        self.error = argparser.error
 
        self.args = argparser.parse_args(arglist)
 

	
 
        if self.args.config_file is None:
 
            self.args.config_file = []
 
            if self.DEFAULT_CONFIG_PATH.exists():
 
                self.args.config_file.append(self.DEFAULT_CONFIG_PATH)
 
        self.conffile = self._build_conffile()
 
        conffile_paths = [path.as_posix() for path in self.args.config_file]
 
        read_files = self.conffile.read(conffile_paths)
 
        for expected_path, read_path in zip(conffile_paths, read_files):
 
            if read_path != expected_path:
 
                self.error("failed to read configuration file {!r}".format(expected_path))
 

	
 
        self.finalize()
 

	
 
    def _build_argparser(self):
 
        parser = argparse.ArgumentParser()
 
        parser.add_argument(
 
            '--config-file', '-C', metavar='PATH', type=pathlib.Path,
 
            action='append',
 
            help="Path of a configuration file to read",
 
        )
 
        parser.add_argument(
 
            '--use-config', '-c', metavar='SECTION',
 
            help="Read settings from this section of the configuration file",
 
        )
 
        parser.add_argument(
 
            'input_paths', metavar='PATH',
 
            nargs='+',
 
            help="Path to generate Ledger entries from.",
 
        )
 

	
 
        out_args = parser.add_argument_group(
 
            "default overrides",
 
            description="These options take priority over settings in the "
 
            "[DEFAULT] section of your config file, but not other sections.",
 
        )
 
        parser.add_argument(
 
            '--loglevel', '-L', metavar='LEVEL',
 
            choices=['debug', 'info', 'warning', 'error', 'critical'],
 
            help="Log messages at this level and above. Default WARNING.",
 
        )
 
        out_args.add_argument(
 
            '--date', '-d', metavar='DATE',
 
            help="Date to use in Ledger entries when the source doesn't "
 
            "provide one. Write this in your configured date format. "
 
            "Default today.",
 
        )
 
        out_args.add_argument(
 
            '--date-format', '-D', metavar='FORMAT',
 
            help="Date format to use in Ledger entries",
 
        )
 
        out_args.add_argument(
 
            '--date-range', metavar='DATE-DATE',
 
            help="Only import entries in this date range, inclusive. "
 
            "Write dates in your configured date format. "
 
            "You can omit either side of the range.",
 
        )
 
        out_args.add_argument(
 
            '--loglevel', '-L', metavar='LEVEL',
 
            choices=['debug', 'info', 'warning', 'error', 'critical'],
 
            help="Log messages at this level and above. Default WARNING.",
 
        )
 
        out_args.add_argument(
 
            '--output-path', '-O', metavar='PATH',
 
            help="Path of file to append entries to, or '-' for stdout (default).",
 
        )
 
        out_args.add_argument(
 
            '--signed-currency', '--sign', metavar='CODE',
 
            action='append', dest='signed_currencies',
 
            help="Currency code to use currency sign for in Ledger entry amounts. "
 
            "Can be specified multiple times.",
 
        )
 
        out_args.add_argument(
 
            '--signed-currency-format', '--sign-format', '-S', metavar='FORMAT',
 
            help="Unicode number pattern to use for signed currencies in Ledger entry amounts",
 
        )
 
        out_args.add_argument(
 
            '--unsigned-currency-format', '--unsign-format', '-U', metavar='FORMAT',
 
            help="Unicode number pattern to use for unsigned currencies in Ledger entry amounts",
 
        )
 

	
 
        return parser
 

	
 
    def _build_conffile(self):
 
        return configparser.ConfigParser(
 
            comment_prefixes='#',
 
            defaults=self.CONFIG_DEFAULTS,
 
        )
 

	
 
    def _s_to_path(self, s):
 
        return None if s == '-' else pathlib.Path(s)
 

	
 
    def _strpdate(self, date_s, date_fmt):
 
        try:
 
            return util.strpdate(date_s, date_fmt)
 
        except ValueError as error:
 
            raise errors.UserInputConfigurationError(error.args[0], date_s)
 

	
 
    def _parse_section_date(self, section_name, default=TODAY):
 
        section = self.conffile[section_name]
 
        try:
 
            return self._strpdate(section['date'], section['date_format'])
 
        except KeyError:
 
            return default
 

	
 
    def _parse_date_range(self, section_name):
 
        section = self.conffile[section_name]
 
        range_s = section['date_range']
 
        date_fmt = section['date_format']
 
        if not range_s:
 
            range_s = '-'
 
        if range_s.startswith('-'):
 
            start_s = ''
 
            end_s = range_s[1:]
 
        elif range_s.endswith('-'):
 
            start_s = range_s[:-1]
 
            end_s = ''
 
        else:
 
            range_parts = range_s.split('-')
 
            mid_index = len(range_parts) // 2
 
            start_s = '-'.join(range_parts[:mid_index])
 
            end_s = '-'.join(range_parts[mid_index:])
 
        start_d = self._strpdate(start_s, date_fmt) if start_s else datetime.date.min
 
        end_d = self._strpdate(end_s, date_fmt) if end_s else datetime.date.max
 
        return range(start_d.toordinal(), end_d.toordinal() + 1)
 

	
 
    def finalize(self):
 
        default_secname = self.conffile.default_section
 
        if self.args.use_config is None:
 
            self.args.use_config = self.conffile.default_section
 
            self.args.use_config = default_secname
 
        elif not self.conffile.has_section(self.args.use_config):
 
            self.error("section {!r} not found in config file".format(self.args.use_config))
 
        self.args.input_paths = [self._s_to_path(s) for s in self.args.input_paths]
 

	
 
        defaults = self.conffile[self.conffile.default_section]
 
        defaults = self.conffile[default_secname]
 
        for key in self.CONFIG_DEFAULTS:
 
            value = getattr(self.args, key)
 
            if value is None:
 
                pass
 
            elif key == 'signed_currencies':
 
                defaults[key] = ','.join(value)
 
            else:
 
                defaults[key] = value
 
                defaults[key] = value.replace('%', '%%')
 

	
 
        # We parse all the dates now to make sure they're valid.
 
        if self.args.date is not None:
 
            default_date = self._strpdate(self.args.date, defaults['date_format'])
 
        elif 'date' in defaults:
 
            default_date = self._strpdate(defaults['date'], defaults['date_format'])
 
        else:
 
            default_date = self.TODAY
 

	
 
        self.dates = {secname: self._parse_section_date(secname, default_date)
 
                      for secname in self.conffile}
 
        self.dates[self.conffile.default_section] = default_date
 
        self.dates[default_secname] = default_date
 
        self.date_ranges = {secname: self._parse_date_range(secname)
 
                            for secname in self.conffile}
 
        self.date_ranges[default_secname] = self._parse_date_range(default_secname)
 

	
 
    @contextlib.contextmanager
 
    def from_section(self, section_name):
 
        prev_section = self.args.use_config
 
        self.args.use_config = section_name
 
        try:
 
            yield self
 
        finally:
 
            self.args.use_config = prev_section
 

	
 
    def _get_section(self, section_name):
 
        if section_name is None:
 
            section_name = self.args.use_config
 
        return self.conffile[section_name]
 

	
 
    def get_default_date(self, section_name=None):
 
    def _get_from_dict(self, confdict, section_name=None):
 
        if section_name is None:
 
            section_name = self.args.use_config
 
        try:
 
            return self.dates[section_name]
 
            return confdict[section_name]
 
        except KeyError:
 
            return self.dates[self.conffile.default_section]
 
            return confdict[self.conffile.default_section]
 

	
 
    def date_in_want_range(self, date, section_name=None):
 
        return date.toordinal() in self._get_from_dict(self.date_ranges, section_name)
 

	
 
    def get_default_date(self, section_name=None):
 
        return self._get_from_dict(self.dates, section_name)
 

	
 
    def get_loglevel(self, section_name=None):
 
        section_config = self._get_section(section_name)
 
        level_name = section_config['loglevel']
 
        try:
 
            return getattr(logging, level_name.upper())
 
        except AttributeError:
 
            raise errors.UserInputConfigurationError("not a valid loglevel", level_name)
 

	
 
    def get_output_path(self, section_name=None):
 
        section_config = self._get_section(section_name)
 
        return self._s_to_path(section_config['output_path'])
 

	
 
    def get_template(self, config_key, section_name=None, factory=template.Template):
 
        section_config = self._get_section(section_name)
 
        try:
 
            template_s = section_config[config_key]
 
        except KeyError:
 
            raise errors.UserInputConfigurationError(
 
                "template not defined in [{}]".format(section_name or self.args.use_config),
 
                config_key,
 
            )
 
        return factory(
 
            template_s,
import2ledger/hooks/filter_by_date.py
Show inline comments
 
new file 100644
 
class FilterByDateHook:
 
    def __init__(self, config):
 
        self.config = config
 

	
 
    def run(self, entry_data):
 
        try:
 
            date = entry_data['date']
 
        except KeyError:
 
            pass
 
        else:
 
            if not self.config.date_in_want_range(date):
 
                entry_data.clear()
tests/test_config.py
Show inline comments
 
import contextlib
 
import datetime
 
import itertools
 
import logging
 
import os
 
import pathlib
 

	
 
START_DATE = datetime.date.today()
 

	
 
from unittest import mock
 

	
 
import pytest
 
from import2ledger import config, errors
 

	
 
from . import DATA_DIR
 

	
 
def config_from_file(path, arglist=[]):
 
    path = pathlib.Path(path)
 
    if not path.is_absolute():
 
        path = DATA_DIR / path
 
    arglist = ['-C', path.as_posix(), *arglist, os.devnull]
 
    return config.Configuration(arglist)
 

	
 
def test_defaults():
 
    config = config_from_file('test_config.ini', ['--sign', 'GBP', '-O', 'out_arg'])
 
    factory = mock.Mock(name='Template')
 
    template = config.get_template('one', 'Templates', factory)
...
 
@@ -42,48 +43,64 @@ def test_template_parsing():
 
    except IndexError as error:
 
        assert False, error
 
    assert "\n;Tag1: {value}\n" in tmpl_s
 
    assert "\nIncome:Donations  -100%\n" in tmpl_s
 
    assert "\n;IncomeTag: Donations\n" in tmpl_s
 

	
 
@pytest.mark.parametrize('arg_s', [None, '-', 'output.ledger'])
 
def test_output_path(arg_s):
 
    arglist = [] if arg_s is None else ['-O', arg_s]
 
    config = config_from_file(os.devnull, arglist)
 
    output_path = config.get_output_path()
 
    if (arg_s is None) or (arg_s == '-'):
 
        assert output_path is None
 
    else:
 
        assert output_path == pathlib.Path(arg_s)
 

	
 
def test_output_path_from_section():
 
    expected_path = pathlib.Path('Template.output')
 
    config = config_from_file('test_config.ini', ['-O', 'output.ledger'])
 
    assert config.get_output_path('Templates') == expected_path
 
    assert config.get_output_path() != expected_path
 
    with config.from_section('Templates'):
 
        assert config.get_output_path() == expected_path
 

	
 
@pytest.mark.parametrize('range_s,date_fmt', [
 
    (range_s.replace('/', sep), sep.join(['%Y', '%m', '%d']))
 
    for range_s, sep in itertools.product([
 
            '-',
 
            '2016/06/01-2016/06/30',
 
            '2016/06/01-',
 
            '-2016/06/30',
 
            ], '/-')
 
])
 
def test_date_in_want_range(range_s, date_fmt):
 
    config = config_from_file(os.devnull, ['--date-range=' + range_s, '--date-format', date_fmt])
 
    assert config.date_in_want_range(datetime.date(2016, 5, 31)) == range_s.startswith('-')
 
    assert config.date_in_want_range(datetime.date(2016, 6, 1))
 
    assert config.date_in_want_range(datetime.date(2016, 6, 30))
 
    assert config.date_in_want_range(datetime.date(2016, 7, 1)) == range_s.endswith('-')
 

	
 
@pytest.mark.parametrize('arglist,expect_date', [
 
    ([], None),
 
    (['-d', '2017-10-12'], datetime.date(2017, 10, 12)),
 
    (['-c', 'Date'], datetime.date(2017, 10, 8)),
 
])
 
def test_default_date(arglist, expect_date):
 
    config = config_from_file('test_config.ini', arglist)
 
    default_date = config.get_default_date()
 
    if expect_date is None:
 
        assert START_DATE <= default_date <= datetime.date.today()
 
    else:
 
        assert default_date == expect_date
 
    assert config.get_default_date('Date') == datetime.date(2017, 10, 8)
 

	
 
@pytest.mark.parametrize('level_s,expect_level', [
 
    (s, getattr(logging, s.upper()))
 
    for s in ['critical', 'debug', 'error', 'info', 'warning']
 
])
 
def test_loglevel(level_s, expect_level):
 
    config = config_from_file(os.devnull, ['--loglevel', level_s])
 
    assert config.get_loglevel() == expect_level
 

	
 
@contextlib.contextmanager
 
def bad_config(expect_input):
tests/test_hooks.py
Show inline comments
 
import argparse
 
import datetime
 
import itertools
 

	
 
import pytest
 

	
 
from import2ledger import hooks
 
from import2ledger.hooks import add_entity, default_date
 
from import2ledger.hooks import add_entity, default_date, filter_by_date
 

	
 
def test_load_all():
 
    all_hooks = list(hooks.load_all())
 
    assert add_entity.AddEntityHook in all_hooks
 

	
 
@pytest.mark.parametrize('payee,expected', [
 
    ('Alex Smith', 'Smith-Alex'),
 
    ('Dakota D.  Doe', 'Doe-Dakota-D'),
 
    ('Björk', 'Bjork'),
 
    ('Fran Doe-Smith', 'Doe-Smith-Fran'),
 
    ('Alex(Nickname) Smith', 'Smith-Alex'),
 
    ('稲荷', '稲荷'),
 
    ('Pøweł', 'Powel'),
 
    ('Elyse Jan Smith', 'Smith-Elyse-Jan'),
 
    ('Jan van Smith', 'van-Smith-Jan'),
 
    ('Francis da Silva', 'da-Silva-Francis'),
 
])
 
def test_add_entity(payee, expected):
 
    data = {'payee': payee}
 
    hook = add_entity.AddEntityHook(argparse.Namespace())
 
    hook.run(data)
 
    assert data['entity'] == expected
 

	
 

	
 
class DateRangeConfig:
 
    def __init__(self, start_date=None, end_date=None):
 
        self.start_date = start_date
 
        self.end_date = end_date
 

	
 
    def date_in_want_range(self, date):
 
        return (
 
            ((self.start_date is None) or (date >= self.start_date))
 
            and ((self.end_date is None) or (date <= self.end_date))
 
        )
 

	
 

	
 
@pytest.mark.parametrize('entry_date,start_date,end_date,allowed', [
 
    (datetime.date(2016, 5, 10), datetime.date(2016, 1, 1), datetime.date(2016, 12, 31), True),
 
    (datetime.date(2016, 1, 1), datetime.date(2016, 1, 1), datetime.date(2016, 12, 31), True),
 
    (datetime.date(2016, 12, 31), datetime.date(2016, 1, 1), datetime.date(2016, 12, 31), True),
 
    (datetime.date(2016, 1, 1), datetime.date(2016, 1, 1), None, True),
 
    (datetime.date(2016, 12, 31), None, datetime.date(2016, 12, 31), True),
 
    (datetime.date(1999, 1, 2), None, None, True),
 
    (datetime.date(2016, 1, 25), datetime.date(2016, 2, 1), datetime.date(2016, 12, 31), False),
 
    (datetime.date(2016, 12, 26), datetime.date(2016, 1, 1), datetime.date(2016, 11, 30), False),
 
    (datetime.date(2016, 1, 31), datetime.date(2016, 2, 1), None, False),
 
    (datetime.date(2016, 12, 1), None, datetime.date(2016, 11, 30), False),
 
])
 
def test_filter_by_date(entry_date, start_date, end_date, allowed):
 
    entry_data = {'date': entry_date}
 
    hook = filter_by_date.FilterByDateHook(DateRangeConfig(start_date, end_date))
 
    hook.run(entry_data)
 
    assert bool(entry_data) == allowed
 

	
 
class DefaultDateConfig:
 
    ONE_DAY = datetime.timedelta(days=1)
 

	
 
    def __init__(self, start_date=None):
 
        if start_date is None:
 
            start_date = datetime.date(2016, 3, 5)
 
        self.date = start_date - self.ONE_DAY
 

	
 
    def get_default_date(self, section_name=None):
 
        self.date += self.ONE_DAY
 
        return self.date
 

	
 

	
 
class TestDefaultDate:
 
    def test_simple_case(self):
 
        expect_date = datetime.date(2016, 2, 4)
 
        config = DefaultDateConfig(expect_date)
 
        data = {}
 
        hook = default_date.DefaultDateHook(config)
 
        hook.run(data)
 
        assert data['date'] == expect_date
 

	
 
    def test_no_caching(self):
 
        config = DefaultDateConfig()
tests/test_main.py
Show inline comments
...
 
@@ -28,24 +28,37 @@ def iter_entries(in_file):
 
        else:
 
            lines.append(line)
 
    if lines:
 
        yield ''.join(lines)
 

	
 
def entries2set(in_file):
 
    return set(normalize_whitespace(e) for e in iter_entries(in_file))
 

	
 
def expected_entries(path):
 
    path = pathlib.Path(path)
 
    if not path.is_absolute():
 
        path = DATA_DIR / path
 
    with path.open() as in_file:
 
        return entries2set(in_file)
 

	
 
def test_fees_import():
 
    arglist = ARGLIST + [
 
        '-c', 'One',
 
        pathlib.Path(DATA_DIR, 'PatreonEarnings.csv').as_posix(),
 
    ]
 
    exitcode, stdout, _ = run_main(arglist)
 
    assert exitcode == 0
 
    actual = entries2set(stdout)
 
    assert actual == expected_entries('test_main_fees_import.ledger')
 

	
 
def test_date_range_import():
 
    arglist = ARGLIST + [
 
        '-c', 'One',
 
        '--date-range', '2017/10/01-',
 
        pathlib.Path(DATA_DIR, 'PatreonEarnings.csv').as_posix(),
 
    ]
 
    exitcode, stdout, _ = run_main(arglist)
 
    assert exitcode == 0
 
    actual = entries2set(stdout)
 
    expected = {entry for entry in expected_entries('test_main_fees_import.ledger')
 
                if entry.startswith('2017/10/')}
 
    assert actual == expected
0 comments (0 inline, 0 general)