Files @ b41a7a99af17
Branch filter:

Location: NPO-Accounting/import2ledger/import2ledger/__main__.py

Brett Smith
hooks: Filter entries by setting entry_data['_hook_cancel'] to True.

The current method only works for plain dicts and other simple mappings.
Mapping that may still contain items after .clear(), like ChainMap, can't
rely on the old method.
import contextlib
import logging
import sys

from . import config, errors, hooks, importers

logger = logging.getLogger('import2ledger')

class FileImporter:
    def __init__(self, config, stdout):
        self.config = config
        self.importers = list(importers.load_all())
        self.hooks = [hook(config) for hook in hooks.load_all()]
        self.stdout = stdout

    def import_file(self, in_file):
        importers = []
        for importer in self.importers:
            in_file.seek(0)
            if importer.can_import(in_file):
                try:
                    template = self.config.get_template(importer.TEMPLATE_KEY)
                except errors.UserInputConfigurationError as error:
                    if error.strerror.startswith('template not defined '):
                        have_template = False
                    else:
                        raise
                else:
                    have_template = not template.is_empty()
                if have_template:
                    importers.append((importer, template))
        if not importers:
            raise errors.UserInputFileError("no importers available", in_file.name)
        with contextlib.ExitStack() as exit_stack:
            output_path = self.config.get_output_path()
            if output_path is None:
                out_file = self.stdout
            else:
                out_file = exit_stack.enter_context(output_path.open('a'))
            for importer, template in importers:
                default_date = self.config.get_default_date()
                in_file.seek(0)
                for entry_data in importer(in_file):
                    entry_data['_hook_cancel'] = False
                    for hook in self.hooks:
                        hook.run(entry_data)
                        if entry_data['_hook_cancel']:
                            break
                    else:
                        del entry_data['_hook_cancel']
                        print(template.render(**entry_data), file=out_file, end='')

    def import_path(self, in_path):
        if in_path is None:
            raise errors.UserInputFileError("only seekable files are supported", '<stdin>')
        with in_path.open(errors='replace') as in_file:
            if not in_file.seekable():
                raise errors.UserInputFileError("only seekable files are supported", in_path)
            return self.import_file(in_file)

    def import_paths(self, path_seq):
        for in_path in path_seq:
            try:
                retval = self.import_path(in_path)
            except (OSError, errors.UserInputError) as error:
                yield in_path, error
            else:
                yield in_path, retval


def setup_logger(logger, main_config, stream):
    formatter = logging.Formatter('%(name)s: %(levelname)s: %(message)s')
    handler = logging.StreamHandler(stream)
    handler.setFormatter(formatter)
    logger.addHandler(handler)

def main(arglist=None, stdout=sys.stdout, stderr=sys.stderr):
    try:
        my_config = config.Configuration(arglist)
    except errors.UserInputError as error:
        my_config.error("{}: {!r}".format(error.strerror, error.user_input))
        return 3
    setup_logger(logger, my_config, stderr)
    importer = FileImporter(my_config, stdout)
    failures = 0
    for input_path, error in importer.import_paths(my_config.args.input_paths):
        if error is None:
            logger.info("%s: imported", input_path)
        else:
            logger.warning("%s: failed to import: %s", input_path or error.path, error)
            failures += 1
    if failures == 0:
        return 0
    else:
        return min(10 + failures, 99)

if __name__ == '__main__':
    exit(main())