Changeset - ab8559c75bdb
[Not reviewed]
0 5 0
Brett Smith - 5 years ago 2019-08-28 14:22:10
brettcsmith@brettcsmith.org
csv: Support importing squared CSV spreadsheets.

See the test comment for more rationale.
5 files changed with 41 insertions and 5 deletions:
0 comments (0 inline, 0 general)
import2ledger/importers/_csv.py
Show inline comments
...
 
@@ -34,27 +34,35 @@ class CSVImporterBase:
 
      None, _read_header expects this is the row with column names for the
 
      real data, and uses it in its return value.
 
    * Reader: A class that accepts the input source and iterates over rows of
 
      formatted data.  Default csv.reader.
 
    * DictReader: A class that accepts the input source and iterates over rows
 
      of data organized into dictionaries.  Default csv.DictReader.
 
    """
 
    ENTRY_SEED = {}
 
    COPIED_FIELDS = {}
 
    Reader = csv.reader
 
    DictReader = csv.DictReader
 

	
 
    @classmethod
 
    def _row_rindex(cls, row, default=None):
 
        """Return the index of the last cell in the row that has a value."""
 
        for offset, value in enumerate(reversed(row), 1):
 
            if value:
 
                return len(row) - offset
 
        return default
 

	
 
    @classmethod
 
    def _read_header_row(cls, row):
 
        return {} if len(row) < cls._HEADER_MAX_LEN else None
 
        return {} if cls._row_rindex(row, -1) + 1 < cls._HEADER_MAX_LEN else None
 

	
 
    @classmethod
 
    def _read_header(cls, input_file):
 
        cls._NEEDED_KEYS = cls.NEEDED_FIELDS.union(cls.COPIED_FIELDS)
 
        cls._HEADER_MAX_LEN = len(cls._NEEDED_KEYS)
 
        header = {}
 
        row = None
 
        for row in cls.Reader(input_file):
 
            row_data = cls._read_header_row(row)
 
            if row_data is None:
 
                break
 
            else:
import2ledger/importers/benevity.py
Show inline comments
...
 
@@ -2,28 +2,28 @@ import decimal
 

	
 
from . import _csv
 
from .. import strparse
 

	
 
ZERO_DECIMAL = decimal.Decimal(0)
 

	
 
class _DonationsImporterBase(_csv.CSVImporterBase):
 
    NAME_FIELDS = ['Donor First Name', 'Donor Last Name']
 
    NOT_SHARED = 'Not shared by donor'
 

	
 
    @classmethod
 
    def _read_header_row(cls, row):
 
        row_len = len(row)
 
        if row_len > 2:
 
        row_rindex = cls._row_rindex(row, -1)
 
        if row_rindex > 1:
 
            return None
 
        elif row_len == 2 and row[0] in cls.HEADER_FIELDS:
 
        elif row_rindex == 1 and row[0] in cls.HEADER_FIELDS:
 
            return {cls.HEADER_FIELDS[row[0]]: row[1]}
 
        else:
 
            return {}
 

	
 
    def _read_row(self, row):
 
        date_s = row.get(self.DATE_FIELD)
 
        if not date_s:
 
            return None
 
        if all(row[key] == self.NOT_SHARED for key in self.NAME_FIELDS):
 
            payee = 'Anonymous'
 
        else:
 
            payee = ' '.join(row[key] for key in self.NAME_FIELDS)
setup.py
Show inline comments
...
 
@@ -21,24 +21,24 @@ all_extras_require = [
 
]
 

	
 
REQUIREMENTS['extras_require']['all_importers'] = all_extras_require
 
REQUIREMENTS['tests_require'] = [
 
    'pytest',
 
    'PyYAML',
 
    *all_extras_require,
 
]
 

	
 
setup(
 
    name='import2ledger',
 
    description="Import different sources of financial data to Ledger",
 
    version='0.9.2',
 
    version='0.9.3',
 
    author='Brett Smith',
 
    author_email='brettcsmith@brettcsmith.org',
 
    license='GNU AGPLv3+',
 

	
 
    packages=find_packages(include=['import2ledger', 'import2ledger.*']),
 
    entry_points={
 
        'console_scripts': ['import2ledger = import2ledger.__main__:main'],
 
    },
 

	
 
    **REQUIREMENTS,
 
)
tests/data/imports.yml
Show inline comments
...
 
@@ -255,36 +255,40 @@
 
      ticket_rate: !!python/object/apply:decimal.Decimal ["42.50"]
 
      shirts_sold: !!python/object/apply:decimal.Decimal ["2"]
 
      shirt_rate: !!python/object/apply:decimal.Decimal ["25.50"]
 
      status: Payment
 
      currency: USD
 
      invoice_date: !!python/object/apply:datetime.date [2017, 9, 5]
 
      payment_id: ch_daer0ahwoh9oDeiqu2eimoD7
 
      stripe_id: ch_daer0ahwoh9oDeiqu2eimoD7
 
      invoice_id: "11"
 

	
 
- source: AmazonAffiliateEarnings.csv
 
  importer: amazon.EarningsImporter
 
  header_rows: 1
 
  header_cols: 12
 
  expect:
 
    - payee: Amazon
 
      date: !!python/object/apply:datetime.date [2016, 12, 20]
 
      amount: !!python/object/apply:decimal.Decimal ["4.24"]
 
      currency: USD
 
    - payee: Amazon
 
      date: !!python/object/apply:datetime.date [2017, 1, 7]
 
      amount: !!python/object/apply:decimal.Decimal ["-.08"]
 
      currency: USD
 

	
 
- source: Benevity2018.csv
 
  importer: benevity.Donations2018Importer
 
  header_rows: 11
 
  header_cols: 17
 
  expect:
 
    - date: !!python/object/apply:datetime.date [2017, 10, 28]
 
      currency: USD
 
      disbursement_id: ABCDE12345
 
      amount: !!python/object/apply:decimal.Decimal [20]
 
      net_amount: !!python/object/apply:decimal.Decimal [20]
 
      donation_amount: !!python/object/apply:decimal.Decimal [20]
 
      match_amount: !!python/object/apply:decimal.Decimal [0]
 
      match_amount: !!python/object/apply:decimal.Decimal [0]
 
      donation_fee: !!python/object/apply:decimal.Decimal [0]
 
      match_fee: !!python/object/apply:decimal.Decimal [0]
 
      merchant_fee: !!python/object/apply:decimal.Decimal [0]
...
 
@@ -357,24 +361,26 @@
 
      donation_fee: !!python/object/apply:decimal.Decimal [0]
 
      match_fee: !!python/object/apply:decimal.Decimal [0]
 
      merchant_fee: !!python/object/apply:decimal.Decimal [0]
 
      corporation: Company B
 
      project: ""
 
      comment: ""
 
      frequency: Recurring
 
      transaction_id: 67890TYUIO
 
      ledger template: benevity donations ledger entry
 

	
 
- source: Benevity2019.csv
 
  importer: benevity.Donations2019Importer
 
  header_rows: 11
 
  header_cols: 21
 
  expect:
 
    - date: !!python/object/apply:datetime.date [2017, 10, 28]
 
      currency: USD
 
      disbursement_id: ABCDE12345
 
      amount: !!python/object/apply:decimal.Decimal [20]
 
      net_amount: !!python/object/apply:decimal.Decimal [20]
 
      donation_amount: !!python/object/apply:decimal.Decimal [20]
 
      match_amount: !!python/object/apply:decimal.Decimal [0]
 
      donation_fee: !!python/object/apply:decimal.Decimal [0]
 
      match_fee: !!python/object/apply:decimal.Decimal [0]
 
      merchant_fee: !!python/object/apply:decimal.Decimal [0]
 
      payee: Dakota Smith
tests/test_importers.py
Show inline comments
 
import csv
 
import datetime
 
import decimal
 
import io
 
import importlib
 
import itertools
 
import pathlib
 
import shutil
 
import re
 

	
 
import pytest
 
import yaml
 
from import2ledger import importers, strparse
 

	
 
from . import DATA_DIR
 

	
 
class TestImporters:
 
    with pathlib.Path(DATA_DIR, 'imports.yml').open() as yaml_file:
 
        test_data = yaml.load(yaml_file)
 
    for test in test_data:
...
 
@@ -19,24 +22,43 @@ class TestImporters:
 

	
 
        module_name, class_name = test['importer'].rsplit('.', 1)
 
        module = importlib.import_module('.' + module_name, 'import2ledger.importers')
 
        test['importer'] = getattr(module, class_name)
 

	
 
    @pytest.mark.parametrize('source_path,importer', [
 
        (t['source'], t['importer']) for t in test_data
 
    ])
 
    def test_can_import(self, source_path, importer):
 
        with source_path.open() as source_file:
 
            assert importer.can_import(source_file)
 

	
 
    @pytest.mark.parametrize('source_path,importer,header_rows,header_cols', [
 
        (t['source'], t['importer'], t['header_rows'], t['header_cols'])
 
        for t in test_data if t.get('header_rows')
 
    ])
 
    def test_can_import_squared_csv(self, source_path, importer, header_rows, header_cols):
 
        # Sometimes when we munge spreadsheets by hand (e.g., to filter by
 
        # project) tools like LibreOffice Calc write a "squared" spreadsheet,
 
        # where every row has the same length.  This test ensures the results
 
        # are still recognized for import.
 
        with io.StringIO() as squared_file:
 
            csv_writer = csv.writer(squared_file)
 
            with source_path.open() as source_file:
 
                for row in itertools.islice(csv.reader(source_file), header_rows):
 
                    padding = [None] * (header_cols - len(row))
 
                    csv_writer.writerow(row + padding)
 
                shutil.copyfileobj(source_file, squared_file)
 
            squared_file.seek(0)
 
            assert importer.can_import(squared_file)
 

	
 
    @pytest.mark.parametrize('source_path,import_class,expect_results', [
 
        (t['source'], t['importer'], t['expect']) for t in test_data
 
    ])
 
    def test_import(self, source_path, import_class, expect_results):
 
        with source_path.open() as source_file:
 
            importer = import_class(source_file)
 
            for actual, expected in itertools.zip_longest(importer, expect_results):
 
                actual['amount'] = strparse.currency_decimal(actual['amount'])
 
                assert actual == expected
 

	
 
    def test_loader(self):
 
        all_importers = list(importers.load_all())
0 comments (0 inline, 0 general)