Files @ 8da3fd4bd369
Branch filter:

Location: NPO-Accounting/import2ledger/import2ledger/importers/nbpy2017.py

Brett Smith
util: Add common word-slicing functions.
import decimal
import functools

import bs4
from .. import util

class Invoice2017:
    STANDARD_TICKET_RATE = decimal.Decimal('42.50')
    DISCOUNT_TICKET_RATE = STANDARD_TICKET_RATE / 2
    STANDARD_SHIRT_RATE = decimal.Decimal('25.50')
    DISCOUNT_SHIRT_RATE = STANDARD_SHIRT_RATE
    DATE_FMT = '%b. %d, %Y'
    CURRENCY = 'USD'

    @classmethod
    def _elem_stripped_string(cls, elem):
        return ''.join(elem.stripped_strings)

    @classmethod
    def _table_row_text(cls, table_elem):
        for row in table_elem.find_all('tr'):
            row_text = []
            for cell in row.find_all(('th', 'td')):
                row_text.append(cls._elem_stripped_string(cell))
                try:
                    extra_cols = int(cell['colspan'])
                except KeyError:
                    pass
                else:
                    row_text.extend(None for _ in range(extra_cols - 1))
            yield row_text

    def __init__(self, source_file):
        soup = bs4.BeautifulSoup(source_file, 'html5lib')
        for table in soup.find_all('table'):
            rows_text = self._table_row_text(table)
            first_row_text = next(rows_text, [])
            if first_row_text[:1] == ['Number']:
                handler = self._read_invoice_header
            elif first_row_text == ['Description', 'Quantity', 'Price/Unit', 'Total']:
                handler = self._read_invoice_items
            elif first_row_text == ['Payment time', 'Reference', 'Amount']:
                handler = self._read_invoice_activity
            else:
                continue
            handler(table, first_row_text, rows_text)
        self.base_data = {
            'amount': self.amount,
            'currency': self.CURRENCY,
            'invoice_id': self.invoice_id,
            'payee': self.payee,
            'shirt_rate': self.shirt_rate,
            'shirts_sold': self.shirts_sold,
            'ticket_rate': self.ticket_rate,
            'tickets_sold': self.tickets_sold,
        }
        # Raise an AttributeError if we didn't read any invoice activity.
        self.actions

    def _read_invoice_header(self, table, first_row_text, rows_text):
        self.invoice_id = first_row_text[1]
        recipient_h = table.find('th', text='Recipient')
        recipient_cell = recipient_h.find_next_sibling('td')
        self.payee = next(recipient_cell.stripped_strings)

    def _read_invoice_items(self, table, first_row_text, rows_text):
        self.amount = decimal.Decimal(0)
        self.tickets_sold = decimal.Decimal(0)
        self.ticket_rate = self.STANDARD_TICKET_RATE
        self.shirts_sold = decimal.Decimal(0)
        self.shirt_rate = self.STANDARD_SHIRT_RATE
        for description, qty, unit_price, total in rows_text:
            if description.startswith('Ticket - '):
                self.tickets_sold += 1
            elif description.startswith('T-Shirt - '):
                self.shirts_sold += 1
            elif description.startswith('Early Bird ('):
                self.ticket_rate = self.DISCOUNT_TICKET_RATE
            if qty:
                self.amount += decimal.Decimal(total.lstrip('$'))

    def _read_invoice_activity(self, table, first_row_text, rows_text):
        self.actions = []
        for timestamp, description, amount in rows_text:
            if description.startswith('Paid '):
                last_stripe_id = util.rslice_words(description, 1, limit=1)
                action = {
                    'multiplier': 1,
                    'payment_id': last_stripe_id,
                }
            else:
                # Refund handling could go here, if we need it.
                continue
            # Trim extraneous text like the time/a.m./p.m.
            date_str = util.rejoin_slice_words(timestamp, slice(2), ',', 2)
            action['date'] = util.strpdate(date_str, self.DATE_FMT)
            action['stripe_id'] = last_stripe_id
            self.actions.append(action)

    def __iter__(self):
        for action in self.actions:
            data = self.base_data.copy()
            data.update(action)
            multiplier = data.pop('multiplier')
            for key in ['amount', 'tickets_sold', 'shirts_sold']:
                data[key] *= multiplier
            yield data


@functools.lru_cache(5)
def _parse_invoice(parser_class, source_file):
    try:
        return parser_class(source_file)
    except AttributeError:
        return None

class ImporterBase:
    @classmethod
    def _parse_invoice(cls, source_file):
        return _parse_invoice(cls.INVOICE_CLASS, source_file)

    @classmethod
    def can_import(cls, source_file):
        return cls._parse_invoice(source_file) is not None

    def __init__(self, source_file):
        self.invoice = self._parse_invoice(source_file)

    def __iter__(self):
        for entry in self.invoice:
            if self._should_yield_entry(entry):
                yield entry


class Payment2017Importer(ImporterBase):
    TEMPLATE_KEY = 'template nbpy2017 payment'
    INVOICE_CLASS = Invoice2017

    def _should_yield_entry(self, entry):
        return entry['amount'] > 0