Files @ ea0e3d3a73b7
Branch filter:

Location: NPO-Accounting/import2ledger/import2ledger/config.py

Brett Smith
dynload: Pre-configure objects and skip unconfigured ones.

Now that all importers and hooks receive the configuration object, they can
tell us at initialization time whether or not they actually have enough
configuration to be useful. Initialize them immediately on loading, and
only use importers and hooks that are actually configured.
import argparse
import configparser
import contextlib
import datetime
import locale
import logging
import os.path
import pathlib

import babel
import babel.numbers
from . import errors, strparse

class Configuration:
    HOME_PATH = pathlib.Path(os.path.expanduser('~'))
    DEFAULT_CONFIG_PATH = pathlib.Path(HOME_PATH, '.config', 'import2ledger.ini')
    DEFAULT_ENCODING = locale.getpreferredencoding()
    LOCALE = babel.core.Locale.default()
    TODAY = datetime.date.today()
    CONFIG_DEFAULTS = {
        'date_format': '%%Y/%%m/%%d',
        'date_range': '-',
        'loglevel': 'WARNING',
        'output_path': '-',
        'signed_currencies': ','.join(babel.numbers.get_territory_currencies(
            LOCALE.territory, start_date=TODAY)),
        'signed_currency_format': '¤#,##0.###;¤-#,##0.###',
        'unsigned_currency_format': '#,##0.### ¤¤',
    }

    def __init__(self, arglist, stdout, stderr):
        self.stdout = stdout
        self.stderr = stderr

        argparser = self._build_argparser()
        self.error = argparser.error
        self.args = argparser.parse_args(arglist)

        if self.args.config_file is None:
            self.args.config_file = []
            if self.DEFAULT_CONFIG_PATH.exists():
                self.args.config_file.append(self.DEFAULT_CONFIG_PATH)
        self.conffile = self._build_conffile()
        conffile_paths = [path.as_posix() for path in self.args.config_file]
        read_files = self.conffile.read(conffile_paths)
        for expected_path, read_path in zip(conffile_paths, read_files):
            if read_path != expected_path:
                self.error("failed to read configuration file {!r}".format(expected_path))

        self.finalize()

    def _build_argparser(self):
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--config-file', '-C', metavar='PATH', type=pathlib.Path,
            action='append',
            help="Path of a configuration file to read",
        )
        parser.add_argument(
            '--use-config', '-c', metavar='SECTION',
            help="Read settings from this section of the configuration file",
        )
        parser.add_argument(
            'input_paths', metavar='PATH',
            nargs='+',
            help="Path to generate Ledger entries from.",
        )

        out_args = parser.add_argument_group(
            "default overrides",
            description="These options take priority over settings in the "
            "[DEFAULT] section of your config file, but not other sections.",
        )
        out_args.add_argument(
            '--date', '-d', metavar='DATE',
            help="Date to use in Ledger entries when the source doesn't "
            "provide one. Write this in your configured date format. "
            "Default today.",
        )
        out_args.add_argument(
            '--date-format', '-D', metavar='FORMAT',
            help="Date format to use in Ledger entries",
        )
        out_args.add_argument(
            '--date-range', metavar='DATE-DATE',
            help="Only import entries in this date range, inclusive. "
            "Write dates in your configured date format. "
            "You can omit either side of the range.",
        )
        out_args.add_argument(
            '--loglevel', '-L', metavar='LEVEL',
            choices=['debug', 'info', 'warning', 'error', 'critical'],
            help="Log messages at this level and above. Default WARNING.",
        )
        out_args.add_argument(
            '--output-path', '-O', metavar='PATH',
            help="Path of file to append entries to, or '-' for stdout (default).",
        )
        out_args.add_argument(
            '--signed-currency', '--sign', metavar='CODE',
            action='append', dest='signed_currencies',
            help="Currency code to use currency sign for in Ledger entry amounts. "
            "Can be specified multiple times.",
        )
        out_args.add_argument(
            '--signed-currency-format', '--sign-format', '-S', metavar='FORMAT',
            help="Unicode number pattern to use for signed currencies in Ledger entry amounts",
        )
        out_args.add_argument(
            '--unsigned-currency-format', '--unsign-format', '-U', metavar='FORMAT',
            help="Unicode number pattern to use for unsigned currencies in Ledger entry amounts",
        )

        return parser

    def _build_conffile(self):
        return configparser.ConfigParser(
            comment_prefixes='#',
            defaults=self.CONFIG_DEFAULTS,
        )

    def _s_to_path(self, s):
        return None if s == '-' else pathlib.Path(s)

    def _strpdate(self, date_s, date_fmt):
        try:
            return strparse.date(date_s, date_fmt)
        except ValueError as error:
            raise errors.UserInputConfigurationError(error.args[0], date_s)

    def _parse_section_date(self, section_name, default=TODAY):
        section = self.conffile[section_name]
        try:
            return self._strpdate(section['date'], section['date_format'])
        except KeyError:
            return default

    def _parse_date_range(self, section_name):
        section = self.conffile[section_name]
        range_s = section['date_range']
        date_fmt = section['date_format']
        if not range_s:
            range_s = '-'
        if range_s.startswith('-'):
            start_s = ''
            end_s = range_s[1:]
        elif range_s.endswith('-'):
            start_s = range_s[:-1]
            end_s = ''
        else:
            range_parts = range_s.split('-')
            mid_index = len(range_parts) // 2
            start_s = '-'.join(range_parts[:mid_index])
            end_s = '-'.join(range_parts[mid_index:])
        start_d = self._strpdate(start_s, date_fmt) if start_s else datetime.date.min
        end_d = self._strpdate(end_s, date_fmt) if end_s else datetime.date.max
        return range(start_d.toordinal(), end_d.toordinal() + 1)

    def finalize(self):
        default_secname = self.conffile.default_section
        if self.args.use_config is None:
            self.args.use_config = default_secname
        elif not self.conffile.has_section(self.args.use_config):
            self.error("section {!r} not found in config file".format(self.args.use_config))
        self.args.input_paths = [self._s_to_path(s) for s in self.args.input_paths]

        defaults = self.conffile[default_secname]
        for key in self.CONFIG_DEFAULTS:
            value = getattr(self.args, key)
            if value is None:
                pass
            elif key == 'signed_currencies':
                defaults[key] = ','.join(value)
            else:
                defaults[key] = value.replace('%', '%%')

        # We parse all the dates now to make sure they're valid.
        if self.args.date is not None:
            default_date = self._strpdate(self.args.date, defaults['date_format'])
        elif 'date' in defaults:
            default_date = self._strpdate(defaults['date'], defaults['date_format'])
        else:
            default_date = self.TODAY

        self.dates = {secname: self._parse_section_date(secname, default_date)
                      for secname in self.conffile}
        self.dates[default_secname] = default_date
        self.date_ranges = {secname: self._parse_date_range(secname)
                            for secname in self.conffile}
        self.date_ranges[default_secname] = self._parse_date_range(default_secname)

    @contextlib.contextmanager
    def _open_path(self, path, fallback_file, *args, **kwargs):
        if path is None:
            yield fallback_file
        else:
            with path.open(*args, **kwargs) as open_file:
                yield open_file

    def get_section(self, section_name):
        if section_name is None:
            section_name = self.args.use_config
        try:
            self.conffile.add_section(section_name)
        except (configparser.DuplicateSectionError, ValueError):
            pass
        return self.conffile[section_name]

    def _get_from_dict(self, confdict, section_name=None):
        if section_name is None:
            section_name = self.args.use_config
        try:
            return confdict[section_name]
        except KeyError:
            return confdict[self.conffile.default_section]

    def date_in_want_range(self, date, section_name=None):
        return date.toordinal() in self._get_from_dict(self.date_ranges, section_name)

    def get_default_date(self, section_name=None):
        return self._get_from_dict(self.dates, section_name)

    def get_loglevel(self, section_name=None):
        section_config = self.get_section(section_name)
        level_name = section_config['loglevel']
        try:
            return getattr(logging, level_name.upper())
        except AttributeError:
            raise errors.UserInputConfigurationError("not a valid loglevel", level_name)

    def get_output_path(self, section_name=None):
        section_config = self.get_section(section_name)
        return self._s_to_path(section_config['output_path'])

    def open_output_file(self, section_name=None):
        path = self.get_output_path(section_name)
        return self._open_path(path, self.stdout, 'a')

    def setup_logger(self, logger, section_name=None):
        logger.setLevel(self.get_loglevel(section_name))