Files @ 7156529ceb23
Branch filter:

Location: NPO-Accounting/import2ledger/import2ledger/importers/nbpy2017.py

Brett Smith
nbpy2017: Report item rate names and final unit prices.

This keeps more of the RBI calculation in the templates, where it belongs.
Template logic has become more capable since this importer was first
written, which makes this change practical.
import collections
import decimal
import functools
import re

import bs4
from .. import strparse

STATUS_INVOICED = 'Invoice'
STATUS_PAID = 'Payment'
STATUS_REFUNDED = 'Refund'

class Invoice2017:
    CURRENCY = 'USD'
    ITEM_RE = re.compile(r'(?:^|\()(Ticket|T-Shirt) - ')

    @classmethod
    def _elem_stripped_string(cls, elem):
        return ''.join(elem.stripped_strings)

    @classmethod
    def _table_row_text(cls, table_elem):
        for row in table_elem.find_all('tr'):
            row_text = []
            for cell in row.find_all(('th', 'td')):
                row_text.append(cls._elem_stripped_string(cell))
                try:
                    extra_cols = int(cell['colspan'])
                except KeyError:
                    pass
                else:
                    row_text.extend(None for _ in range(extra_cols - 1))
            yield row_text

    def __init__(self, source_file):
        soup = bs4.BeautifulSoup(source_file, 'html5lib')
        for table in soup.find_all('table'):
            rows_text = self._table_row_text(table)
            first_row_text = next(rows_text, [])
            if first_row_text[:1] == ['Number']:
                handler = self._read_invoice_header
            elif first_row_text == ['Description', 'Quantity', 'Price/Unit', 'Total']:
                handler = self._read_invoice_items
            elif first_row_text == ['Payment time', 'Reference', 'Amount']:
                handler = self._read_invoice_activity
            else:
                continue
            handler(table, first_row_text, rows_text)
        self.base_data = {
            'amount': self.amount,
            'currency': self.CURRENCY,
            'invoice_date': self.invoice_date,
            'invoice_id': self.invoice_id,
            'payee': self.payee,
            'shirt_price': self.shirt_price,
            'shirt_rate': self.shirt_rate,
            'shirts_sold': self.shirts_sold,
            'ticket_price': self.ticket_price,
            'ticket_rate': self.ticket_rate,
            'tickets_sold': self.tickets_sold,
        }
        # Raise an AttributeError if we didn't read any invoice activity.
        self.actions

    def _strpdate(self, s):
        date_s = strparse.rejoin_slice_words(s, slice(2), ',', 2).replace('Sept.', 'Sep.')
        return strparse.date(date_s, '%b. %d, %Y')

    def _read_invoice_header(self, table, first_row_text, rows_text):
        self.invoice_id = first_row_text[1]
        for key, value in rows_text:
            if key == 'Issue date':
                self.invoice_date = self._strpdate(value)
        recipient_h = table.find('th', text='Recipient')
        recipient_cell = recipient_h.find_next_sibling('td')
        self.payee = next(recipient_cell.stripped_strings)

    def _read_invoice_items(self, table, first_row_text, rows_text):
        self.amount = decimal.Decimal(0)
        self.tickets_sold = decimal.Decimal(0)
        self.ticket_price = decimal.Decimal(0)
        self.ticket_rate = ''
        self.shirts_sold = decimal.Decimal(0)
        self.shirt_price = decimal.Decimal(0)
        self.shirt_rate = ''
        for description, qty, unit_price, total in rows_text:
            if qty is None:
                continue
            total = strparse.currency_decimal(total)
            self.amount += total
            match = self.ITEM_RE.search(description)
            try:
                item_type = match.group(1)
            except AttributeError:
                continue
            qty = int(qty)
            unit_price = strparse.currency_decimal(unit_price)
            if item_type == 'Ticket':
                if total > 0:
                    self.tickets_sold += qty
                self.ticket_price += unit_price
                self.ticket_rate = description
            elif item_type == 'T-Shirt':
                if description.startswith('T-shirts complimentary '):
                    self.shirts_sold -= qty
                else:
                    if total > 0:
                        self.shirts_sold += qty
                    self.shirt_price += unit_price
                    self.shirt_rate = description

    def _read_invoice_activity(self, table, first_row_text, rows_text):
        self.actions = [{
            'date': self.invoice_date,
            'status': STATUS_INVOICED,
        }]
        for timestamp, description, amount in rows_text:
            if description.startswith('Paid '):
                last_stripe_id = strparse.rslice_words(description, 1, limit=1)
                action = {
                    'payment_id': last_stripe_id,
                    'status': STATUS_PAID,
                }
            else:
                # Refund handling could go here, if we need it.
                continue
            action['date'] = self._strpdate(timestamp)
            action['stripe_id'] = last_stripe_id
            self.actions.append(action)

    def __iter__(self):
        return (collections.ChainMap(act, self.base_data) for act in self.actions)


@functools.lru_cache(5)
def _parse_invoice(parser_class, source_file):
    try:
        return parser_class(source_file)
    except AttributeError:
        return None

class InvoiceImporter:
    INVOICE_CLASS = Invoice2017
    LEDGER_TEMPLATE_KEY_FMT = 'nbpy2017 {0} ledger entry'

    @classmethod
    def _parse_invoice(cls, source_file):
        return _parse_invoice(cls.INVOICE_CLASS, source_file)

    @classmethod
    def can_import(cls, source_file):
        return cls._parse_invoice(source_file) is not None

    def __init__(self, source_file):
        self.invoice = self._parse_invoice(source_file)

    def __iter__(self):
        for entry in self.invoice:
            entry['ledger template'] = self.LEDGER_TEMPLATE_KEY_FMT.format(entry['status'].lower())
            yield entry