Files @ 13c316acf6e1
Branch filter:

Location: NPO-Accounting/import2ledger/import2ledger/importers/_xls.py

Brett Smith
tests: More tests for positive results for last commit.
import mmap

import xlrd
from . import _csv

class BookFromFile:
    def __init__(self, xls_file, length=0, access=mmap.ACCESS_READ, **kwargs):
        self.mmap = mmap.mmap(xls_file.fileno(), length, access=access)
        self.book = xlrd.open_workbook(
            xls_file.name,
            file_contents=self.mmap,
            **kwargs,
        )

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, exc_tb):
        self.close()

    def close(self):
        self.mmap.close()
        del self.book


class RowReader:
    def __init__(self, rows):
        self._rows = iter(rows)

    def __iter__(self):
        return self

    def __next__(self):
        return self._format_row(next(self._rows))

    def _format_row(self, row):
        return [self._format_cell(cell) for cell in row]

    def _format_cell(self, cell):
        cell_type = cell.ctype
        if cell_type is xlrd.XL_CELL_EMPTY:
            return None
        elif cell_type is xlrd.XL_CELL_BOOLEAN:
            return bool(cell.value)
        else:
            return cell.value


class DictReader(RowReader):
    def __init__(self, rows, fieldnames=None):
        super().__init__(rows)
        if fieldnames is None:
            fieldnames = super()._format_row(next(self._rows))
        self.fieldnames = fieldnames

    def _format_row(self, row):
        return {k: v for k, v in zip(self.fieldnames, super()._format_row(row))}


class XLSImporterBase(_csv.CSVImporterBase):
    """Base class for Excel spreadsheet importers.

    Subclasses may define the following:
    * _get_rows: A method that accepts an xlrd.Book object and returns an
      iterator of rows from it.  The default implementation yields each row
      from each sheet in order.
    """

    BOOK_KWARGS = {}
    Reader = RowReader
    DictReader = DictReader

    @classmethod
    def _open_book(cls, input_file):
        return BookFromFile(input_file, **cls.BOOK_KWARGS)

    @classmethod
    def _get_rows(cls, book):
        for sheet_index in range(book.nsheets):
            yield from book.sheet_by_index(sheet_index).get_rows()

    @classmethod
    def can_import(cls, input_file):
        try:
            with cls._open_book(input_file) as book_wrapper:
                return super().can_import(cls._get_rows(book_wrapper.book))
        except xlrd.biffh.XLRDError:
            return False

    def __init__(self, input_file):
        self.wrapper = self._open_book(input_file)
        return super().__init__(self._get_rows(self.wrapper.book))

    def __iter__(self):
        yield from super().__iter__()
        self.wrapper.close()