Changeset - ab8559c75bdb
[Not reviewed]
0 5 0
Brett Smith - 5 years ago 2019-08-28 14:22:10
csv: Support importing squared CSV spreadsheets.

See the test comment for more rationale.
5 files changed with 41 insertions and 5 deletions:
0 comments (0 inline, 0 general)
Show inline comments
@@ -34,27 +34,35 @@ class CSVImporterBase:
      None, _read_header expects this is the row with column names for the
      real data, and uses it in its return value.
    * Reader: A class that accepts the input source and iterates over rows of
      formatted data.  Default csv.reader.
    * DictReader: A class that accepts the input source and iterates over rows
      of data organized into dictionaries.  Default csv.DictReader.
    ENTRY_SEED = {}
    Reader = csv.reader
    DictReader = csv.DictReader

    def _row_rindex(cls, row, default=None):
        """Return the index of the last cell in the row that has a value."""
        for offset, value in enumerate(reversed(row), 1):
            if value:
                return len(row) - offset
        return default

    def _read_header_row(cls, row):
        return {} if len(row) < cls._HEADER_MAX_LEN else None
        return {} if cls._row_rindex(row, -1) + 1 < cls._HEADER_MAX_LEN else None

    def _read_header(cls, input_file):
        cls._NEEDED_KEYS = cls.NEEDED_FIELDS.union(cls.COPIED_FIELDS)
        cls._HEADER_MAX_LEN = len(cls._NEEDED_KEYS)
        header = {}
        row = None
        for row in cls.Reader(input_file):
            row_data = cls._read_header_row(row)
            if row_data is None:
Show inline comments
@@ -2,28 +2,28 @@ import decimal

from . import _csv
from .. import strparse

ZERO_DECIMAL = decimal.Decimal(0)

class _DonationsImporterBase(_csv.CSVImporterBase):
    NAME_FIELDS = ['Donor First Name', 'Donor Last Name']
    NOT_SHARED = 'Not shared by donor'

    def _read_header_row(cls, row):
        row_len = len(row)
        if row_len > 2:
        row_rindex = cls._row_rindex(row, -1)
        if row_rindex > 1:
            return None
        elif row_len == 2 and row[0] in cls.HEADER_FIELDS:
        elif row_rindex == 1 and row[0] in cls.HEADER_FIELDS:
            return {cls.HEADER_FIELDS[row[0]]: row[1]}
            return {}

    def _read_row(self, row):
        date_s = row.get(self.DATE_FIELD)
        if not date_s:
            return None
        if all(row[key] == self.NOT_SHARED for key in self.NAME_FIELDS):
            payee = 'Anonymous'
            payee = ' '.join(row[key] for key in self.NAME_FIELDS)
Show inline comments
@@ -21,24 +21,24 @@ all_extras_require = [

REQUIREMENTS['extras_require']['all_importers'] = all_extras_require
REQUIREMENTS['tests_require'] = [

    description="Import different sources of financial data to Ledger",
    author='Brett Smith',
    license='GNU AGPLv3+',

    packages=find_packages(include=['import2ledger', 'import2ledger.*']),
        'console_scripts': ['import2ledger = import2ledger.__main__:main'],

Show inline comments
@@ -255,36 +255,40 @@
      ticket_rate: !!python/object/apply:decimal.Decimal ["42.50"]
      shirts_sold: !!python/object/apply:decimal.Decimal ["2"]
      shirt_rate: !!python/object/apply:decimal.Decimal ["25.50"]
      status: Payment
      currency: USD
      invoice_date: !!python/object/ [2017, 9, 5]
      payment_id: ch_daer0ahwoh9oDeiqu2eimoD7
      stripe_id: ch_daer0ahwoh9oDeiqu2eimoD7
      invoice_id: "11"

- source: AmazonAffiliateEarnings.csv
  importer: amazon.EarningsImporter
  header_rows: 1
  header_cols: 12
    - payee: Amazon
      date: !!python/object/ [2016, 12, 20]
      amount: !!python/object/apply:decimal.Decimal ["4.24"]
      currency: USD
    - payee: Amazon
      date: !!python/object/ [2017, 1, 7]
      amount: !!python/object/apply:decimal.Decimal ["-.08"]
      currency: USD

- source: Benevity2018.csv
  importer: benevity.Donations2018Importer
  header_rows: 11
  header_cols: 17
    - date: !!python/object/ [2017, 10, 28]
      currency: USD
      disbursement_id: ABCDE12345
      amount: !!python/object/apply:decimal.Decimal [20]
      net_amount: !!python/object/apply:decimal.Decimal [20]
      donation_amount: !!python/object/apply:decimal.Decimal [20]
      match_amount: !!python/object/apply:decimal.Decimal [0]
      match_amount: !!python/object/apply:decimal.Decimal [0]
      donation_fee: !!python/object/apply:decimal.Decimal [0]
      match_fee: !!python/object/apply:decimal.Decimal [0]
      merchant_fee: !!python/object/apply:decimal.Decimal [0]
@@ -357,24 +361,26 @@
      donation_fee: !!python/object/apply:decimal.Decimal [0]
      match_fee: !!python/object/apply:decimal.Decimal [0]
      merchant_fee: !!python/object/apply:decimal.Decimal [0]
      corporation: Company B
      project: ""
      comment: ""
      frequency: Recurring
      transaction_id: 67890TYUIO
      ledger template: benevity donations ledger entry

- source: Benevity2019.csv
  importer: benevity.Donations2019Importer
  header_rows: 11
  header_cols: 21
    - date: !!python/object/ [2017, 10, 28]
      currency: USD
      disbursement_id: ABCDE12345
      amount: !!python/object/apply:decimal.Decimal [20]
      net_amount: !!python/object/apply:decimal.Decimal [20]
      donation_amount: !!python/object/apply:decimal.Decimal [20]
      match_amount: !!python/object/apply:decimal.Decimal [0]
      donation_fee: !!python/object/apply:decimal.Decimal [0]
      match_fee: !!python/object/apply:decimal.Decimal [0]
      merchant_fee: !!python/object/apply:decimal.Decimal [0]
      payee: Dakota Smith
Show inline comments
import csv
import datetime
import decimal
import io
import importlib
import itertools
import pathlib
import shutil
import re

import pytest
import yaml
from import2ledger import importers, strparse

from . import DATA_DIR

class TestImporters:
    with pathlib.Path(DATA_DIR, 'imports.yml').open() as yaml_file:
        test_data = yaml.load(yaml_file)
    for test in test_data:
@@ -19,24 +22,43 @@ class TestImporters:

        module_name, class_name = test['importer'].rsplit('.', 1)
        module = importlib.import_module('.' + module_name, 'import2ledger.importers')
        test['importer'] = getattr(module, class_name)

    @pytest.mark.parametrize('source_path,importer', [
        (t['source'], t['importer']) for t in test_data
    def test_can_import(self, source_path, importer):
        with as source_file:
            assert importer.can_import(source_file)

    @pytest.mark.parametrize('source_path,importer,header_rows,header_cols', [
        (t['source'], t['importer'], t['header_rows'], t['header_cols'])
        for t in test_data if t.get('header_rows')
    def test_can_import_squared_csv(self, source_path, importer, header_rows, header_cols):
        # Sometimes when we munge spreadsheets by hand (e.g., to filter by
        # project) tools like LibreOffice Calc write a "squared" spreadsheet,
        # where every row has the same length.  This test ensures the results
        # are still recognized for import.
        with io.StringIO() as squared_file:
            csv_writer = csv.writer(squared_file)
            with as source_file:
                for row in itertools.islice(csv.reader(source_file), header_rows):
                    padding = [None] * (header_cols - len(row))
                    csv_writer.writerow(row + padding)
                shutil.copyfileobj(source_file, squared_file)
            assert importer.can_import(squared_file)

    @pytest.mark.parametrize('source_path,import_class,expect_results', [
        (t['source'], t['importer'], t['expect']) for t in test_data
    def test_import(self, source_path, import_class, expect_results):
        with as source_file:
            importer = import_class(source_file)
            for actual, expected in itertools.zip_longest(importer, expect_results):
                actual['amount'] = strparse.currency_decimal(actual['amount'])
                assert actual == expected

    def test_loader(self):
        all_importers = list(importers.load_all())
0 comments (0 inline, 0 general)