Changeset - 3620d97f9d16
[Not reviewed]
0 1 2
Joar Wandborg - 10 years ago 2013-12-26 12:22:27
joar@wandborg.se
[tests] Added tests and updated storage.ledgercli

- [ledgercli] Log info messages for operations such as add, delete, update
transactions
3 files changed with 284 insertions and 0 deletions:
0 comments (0 inline, 0 general)
accounting/storage/ledgercli.py
Show inline comments
...
 
@@ -118,192 +118,194 @@ class Ledger(Storage):
 
        '''
 
        return self.ledger_process or self.init_process()
 

	
 
    def read_until_prompt(self, process):
 
        r'''
 
        Reads from the subprocess instance :data:`process` until it finds a
 
        combination of ``\n]\x20`` (the prompt), then returns the output
 
        without the prompt.
 
        '''
 
        output = b''
 

	
 
        while True:
 
            line = process.stdout.read(1)  # XXX: This is a hack
 

	
 
            if len(line) > 0:
 
                pass
 
                #_log.debug('line: %s', line)
 

	
 
            output += line
 

	
 
            if b'\n] ' in output:
 
                _log.debug('Found prompt!')
 
                break
 

	
 
        output = output[:-3]  # Cut away the prompt
 

	
 
        _log.debug('output: %s', output)
 

	
 
        return output
 

	
 
    def send_command(self, command):
 
        output = None
 

	
 
        with self.locked_process() as p:
 
            if isinstance(command, str):
 
                command = command.encode('utf8')
 

	
 
            _log.debug('Sending command: %r', command)
 

	
 
            p.stdin.write(command + b'\n')
 
            p.stdin.flush()
 

	
 
            output = self.read_until_prompt(p)
 

	
 
            self.ledger_process.send_signal(subprocess.signal.SIGTERM)
 
            _log.debug('Waiting for ledger to shut down')
 
            self.ledger_process.wait()
 
            self.ledger_process = None
 

	
 
            return output
 

	
 
    def add_transaction(self, transaction):
 
        '''
 
        Writes a transaction to the ledger file by opening it in 'ab' mode and
 
        writing a ledger transaction based on the
 
        :class:`~accounting.models.Transaction` instance in
 
        :data:`transaction`.
 
        '''
 
        if transaction.id is None:
 
            _log.debug('No ID found. Generating an ID.')
 
            transaction.generate_id()
 

	
 
        transaction.metadata.update({'Id': transaction.id})
 

	
 
        transaction_template = ('\n{date} {t.payee}\n'
 
                                '{metadata}'
 
                                '{postings}')
 

	
 
        metadata_template = '   ;{0}: {1}\n'
 

	
 
        # TODO: Generate metadata for postings
 
        posting_template = ('  {account} {p.amount.symbol}'
 
                            ' {p.amount.amount}\n')
 

	
 
        output = b''
 

	
 
        # XXX: Even I hardly understands what this does, however I indent it it
 
        # stays unreadable.
 
        output += transaction_template.format(
 
            date=transaction.date.strftime('%Y-%m-%d'),
 
            t=transaction,
 
            metadata=''.join([
 
                metadata_template.format(k, v)
 
                for k, v in transaction.metadata.items()]),
 
            postings=''.join([posting_template.format(
 
                p=p,
 
                account=p.account + ' ' * (
 
                    80 - (len(p.account) + len(p.amount.symbol) +
 
                          len(str(p.amount.amount)) + 1 + 2)
 
                )) for p in transaction.postings
 
            ])
 
        ).encode('utf8')
 

	
 
        with open(self.ledger_file, 'ab') as f:
 
            f.write(output)
 

	
 
        _log.info('Added transaction %s', transaction.id)
 

	
 
        _log.debug('written to file: %s', output)
 

	
 
        return transaction.id
 

	
 
    def bal(self):
 
        output = self.send_command('xml')
 

	
 
        if output is None:
 
            raise RuntimeError('bal call returned no output')
 

	
 
        accounts = []
 

	
 
        xml = ElementTree.fromstring(output.decode('utf8'))
 

	
 
        accounts = self._recurse_accounts(xml.find('./accounts'))
 

	
 
        return accounts
 

	
 
    def _recurse_accounts(self, root):
 
        accounts = []
 

	
 
        for account in root.findall('./account'):
 
            name = account.find('./fullname').text
 

	
 
            amounts = []
 

	
 
            # Try to find an account total value, then try to find the account
 
            # balance
 
            account_amounts = account.findall(
 
                './account-total/balance/amount') or \
 
                account.findall('./account-amount/amount') or \
 
                account.findall('./account-total/amount')
 

	
 
            if account_amounts:
 
                for amount in account_amounts:
 
                    quantity = amount.find('./quantity').text
 
                    symbol = amount.find('./commodity/symbol').text
 

	
 
                    amounts.append(Amount(amount=quantity, symbol=symbol))
 
            else:
 
                _log.warning('Account %s does not have any amounts', name)
 

	
 
            accounts.append(Account(name=name,
 
                                    amounts=amounts,
 
                                    accounts=self._recurse_accounts(account)))
 

	
 
        return accounts
 

	
 
    def get_transactions(self):
 
        return self.reg()
 

	
 
    def get_transaction(self, transaction_id):
 
        transactions = self.get_transactions()
 

	
 
        for transaction in transactions:
 
            if transaction.id == transaction_id:
 
                return transaction
 

	
 
        raise TransactionNotFound(
 
            'No transaction with id {0} found'.format(transaction_id))
 

	
 
    def reg(self):
 
        output = self.send_command('xml')
 

	
 
        if output is None:
 
            raise RuntimeError('reg call returned no output')
 

	
 
        entries = []
 

	
 
        reg_xml = ElementTree.fromstring(output.decode('utf8'))
 

	
 
        for transaction in reg_xml.findall('./transactions/transaction'):
 
            date = datetime.strptime(transaction.find('./date').text,
 
                                     '%Y/%m/%d')
 
            payee = transaction.find('./payee').text
 

	
 
            postings = []
 

	
 
            for posting in transaction.findall('./postings/posting'):
 
                account = posting.find('./account/name').text
 
                amount = posting.find('./post-amount/amount/quantity').text
 
                symbol = posting.find(
 
                    './post-amount/amount/commodity/symbol').text
 

	
 
                # Get the posting metadata
 
                metadata = {}
 

	
 
                values = posting.findall('./metadata/value')
 
                if values:
 
                    for value in values:
 
                        key = value.get('key')
 
                        value = value.find('./string').text
 

	
 
                        _log.debug('metadata: %s: %s', key, value)
 

	
 
                        metadata.update({key: value})
...
 
@@ -326,145 +328,151 @@ class Ledger(Storage):
 

	
 
                    metadata.update({key: value})
 

	
 
            # Add a Transaction instance to the list
 
            try:
 
                id = metadata.pop('Id')
 
            except KeyError:
 
                _log.warning('Transaction on %s with payee %s does not have an'
 
                             ' Id attribute. A temporary ID will be used.',
 
                             date, payee)
 
                id = 'NO-ID'
 
            entries.append(
 
                Transaction(id=id, date=date, payee=payee, postings=postings,
 
                            metadata=metadata))
 

	
 
        return entries
 

	
 
    def delete_transaction(self, transaction_id):
 
        '''
 
        Delete a transaction from the ledger file.
 

	
 
        This method opens the ledger file, loads all lines into memory and
 
        looks for the transaction_id, then looks for the bounds of that
 
        transaction in the ledger file, removes all lines within the bounds of
 
        the transaction and removes them, then writes the lines back to the
 
        ledger file.
 

	
 
        Exceptions:
 

	
 
        -   RuntimeError: If all boundaries to the transaction are not found
 
        -   TransactionNotFound: If no such transaction_id can be found in
 
            :data:`self.ledger_file`
 
        '''
 
        f = open(self.ledger_file, 'r')
 

	
 
        lines = [i for i in f]
 

	
 
        # A mapping of line meanings and their line numbers as found by the
 
        # following logic
 
        semantic_lines = dict(
 
            id_location=None,
 
            transaction_start=None,
 
            next_transaction_or_eof=None
 
        )
 

	
 
        for i, line in enumerate(lines):
 
            if transaction_id in line:
 
                semantic_lines['id_location'] = i
 
                break
 

	
 
        if not semantic_lines['id_location']:
 
            raise TransactionNotFound(
 
                'No transaction with ID "{0}" found'.format(transaction_id))
 

	
 
        transaction_start_pattern = re.compile(r'^\S')
 

	
 
        cursor = semantic_lines['id_location'] - 1
 

	
 
        # Find the first line of the transaction
 
        while True:
 
            if transaction_start_pattern.match(lines[cursor]):
 
                semantic_lines['transaction_start'] = cursor
 
                break
 

	
 
            cursor -= 1
 

	
 
        cursor = semantic_lines['id_location'] + 1
 

	
 
        # Find the last line of the transaction
 
        while True:
 
            try:
 
                if transaction_start_pattern.match(lines[cursor]):
 
                    semantic_lines['next_transaction_or_eof'] = cursor
 
                    break
 
            except IndexError:
 
                # Set next_line_without_starting_space_or_end_of_file to
 
                # the cursor. The cursor will be an index not included in the
 
                # list of lines
 
                semantic_lines['next_transaction_or_eof'] = cursor
 
                break
 

	
 
            cursor += 1
 

	
 
        if not all(map(lambda v: v is not None, semantic_lines.values())):
 
            raise RuntimeError('Could not find all the values necessary for'
 
                               ' safe deletion of a transaction.')
 

	
 
        del_start = semantic_lines['transaction_start']
 

	
 
        if len(lines) == semantic_lines['next_transaction_or_eof']:
 
            _log.debug('There are no transactions below the transaction being'
 
                       ' deleted. The line before the first line of the'
 
                       ' transaction will be deleted.')
 
            # Delete the preceding line to make the file
 
            del_start -= 1
 

	
 
        _log.info('Removing transaction with ID: %s (lines %d-%d)',
 
                   transaction_id,
 
                   del_start,
 
                   semantic_lines['next_transaction_or_eof'])
 

	
 
        del lines[del_start:semantic_lines['next_transaction_or_eof']]
 

	
 
        with open(self.ledger_file, 'w') as f:
 
            for line in lines:
 
                f.write(line)
 

	
 
    def update_transaction(self, transaction):
 
        '''
 
        Update a transaction in the ledger file.
 

	
 
        Takes a :class:`~accounting.models.Transaction` object and removes
 
        the old transaction using :data:`transaction.id` from the passed
 
        :class:`~accounting.models.Transaction` instance and adds
 
        :data:`transaction` to the database.
 
        '''
 
        if not transaction.id:
 
            return AccountingException(('The transaction {0} has no'
 
                                        ' id attribute').format(transaction))
 

	
 
        old_transaction = self.get_transaction(transaction.id)
 

	
 
        self.delete_transaction(transaction.id)
 

	
 
        self.add_transaction(transaction)
 

	
 
        _log.info('Updated transaction %s', transaction.id)
 
        _log.debug('Updated transaction from: %s to: %s', old_transaction,
 
                   transaction)
 

	
 

	
 
def main(argv=None):
 
    import argparse
 
    if argv is None:
 
        argv = sys.argv
 

	
 
    parser = argparse.ArgumentParser()
 
    parser.add_argument('-v', '--verbosity',
 
                        default='INFO',
 
                        help=('Filter logging output. Possible values:' +
 
                              ' CRITICAL, ERROR, WARNING, INFO, DEBUG'))
 

	
 
    args = parser.parse_args(argv[1:])
 
    logging.basicConfig(level=getattr(logging, args.verbosity, 'INFO'))
 
    ledger = Ledger(ledger_file='non-profit-test-data.ledger')
 
    print(ledger.bal())
 
    print(ledger.reg())
 

	
 

	
 
if __name__ == '__main__':
 
    sys.exit(main())
accounting/tests/__init__.py
Show inline comments
 
new file 100644
accounting/tests/test_transactions.py
Show inline comments
 
new file 100644
 
'''
 
Tests for accounting-api
 
'''
 
import os
 
import unittest
 
import tempfile
 
import logging
 
import copy
 
import uuid
 

	
 
from datetime import datetime
 

	
 
from flask import json
 

	
 
from accounting.web import app, init_ledger
 

	
 
from accounting.transport import AccountingEncoder, AccountingDecoder
 
from accounting.models import Transaction, Posting, Amount
 

	
 
logging.basicConfig(level=logging.DEBUG)
 

	
 

	
 
class TransactionTestCase(unittest.TestCase):
 
    def setUp(self):
 
        self.app = app.test_client()
 
        self.fd, app.config['LEDGER_FILE'] = tempfile.mkstemp()
 
        init_ledger()
 
        self.simple_transaction = Transaction(
 
            date=datetime.today(),
 
            payee='Joar',
 
            postings=[
 
                Posting('Assets:Checking', Amount('-133.7', 'USD')),
 
                Posting('Expenses:Foo', Amount('133.7', 'USD'))
 
            ]
 
        )
 

	
 
    def tearDown(self):
 
        os.close(self.fd)
 
        os.unlink(app.config['LEDGER_FILE'])
 

	
 
    def test_get_transactions(self):
 
        open(app.config['LEDGER_FILE'], 'w').write(
 
            '1400-12-21 Old stuff\n'
 
            '  ;Id: foo\n'
 
            '  Assets:Checking  -100 USD\n'
 
            '  Expenses:Tax  100 USD\n')
 
        rv = self.app.get('/transaction')
 

	
 
        json_transaction = (
 
            b'{\n'
 
            b'  "transactions": [\n'
 
            b'    {\n'
 
            b'      "__type__": "Transaction", \n'
 
            b'      "date": "1400-12-21", \n'
 
            b'      "id": "foo", \n'
 
            b'      "metadata": {}, \n'
 
            b'      "payee": "Old stuff", \n'
 
            b'      "postings": [\n'
 
            b'        {\n'
 
            b'          "__type__": "Posting", \n'
 
            b'          "account": "Assets:Checking", \n'
 
            b'          "amount": {\n'
 
            b'            "__type__": "Amount", \n'
 
            b'            "amount": "-100", \n'
 
            b'            "symbol": "USD"\n'
 
            b'          }, \n'
 
            b'          "metadata": {}\n'
 
            b'        }, \n'
 
            b'        {\n'
 
            b'          "__type__": "Posting", \n'
 
            b'          "account": "Expenses:Tax", \n'
 
            b'          "amount": {\n'
 
            b'            "__type__": "Amount", \n'
 
            b'            "amount": "100", \n'
 
            b'            "symbol": "USD"\n'
 
            b'          }, \n'
 
            b'          "metadata": {}\n'
 
            b'        }\n'
 
            b'      ]\n'
 
            b'    }\n'
 
            b'  ]\n'
 
            b'}')
 

	
 
        self.assertEqual(rv.get_data(), json_transaction)
 

	
 
    def _post_json(self, path, data, expect=200, **kw):
 
        response = self.app.post(
 
            path,
 
            content_type='application/json',
 
            data=json.dumps(data, cls=AccountingEncoder),
 
            **kw
 
        )
 

	
 
        self.assertEqual(response.status_code, expect)
 

	
 
        return self._decode_response(response)
 

	
 
    def _decode_response(self, response):
 
        return json.loads(response.data, cls=AccountingDecoder)
 

	
 
    def _get_json(self, path, expect=200, **kw):
 
        response = self.app.get(path, **kw)
 

	
 
        self.assertEqual(response.status_code, expect)
 

	
 
        return self._decode_response(response)
 

	
 
    def _open_json(self, method, path, expect=200, **kw):
 
        response = self.app.open(
 
            path,
 
            method=method.upper(),
 
            **kw
 
        )
 

	
 
        self.assertEqual(response.status_code, expect)
 

	
 
        return self._decode_response(response)
 

	
 
    def _add_simple_transaction(self, transaction_id=None):
 
        if transaction_id is None:
 
            transaction_id = str(uuid.uuid4())
 

	
 
        transaction = copy.deepcopy(self.simple_transaction)
 
        transaction.id = transaction_id
 

	
 
        response = self._post_json('/transaction', transaction)
 

	
 
        self.assertEqual(len(response['transaction_ids']), 1)
 
        self.assertEqual(response['status'], 'OK')
 

	
 
        response = self._get_json('/transaction/' + transaction.id)
 

	
 
        self.assertEqual(transaction_id, response['transaction'].id)
 

	
 
        self.assertEqual(response['transaction'], transaction)
 

	
 
        return transaction
 

	
 
    def test_post_transaction_without_id(self):
 
        transaction = copy.deepcopy(self.simple_transaction)
 

	
 
        response = self._post_json('/transaction', transaction)
 

	
 
        self.assertEqual(len(response['transaction_ids']), 1)
 
        self.assertEqual(response['status'], 'OK')
 

	
 
        transaction.id = response['transaction_ids'][0]
 

	
 
        response = self._get_json('/transaction/' + transaction.id)
 

	
 
        self.assertEqual(response['transaction'], transaction)
 

	
 
    def test_delete_transaction(self):
 
        transaction = copy.deepcopy(self.simple_transaction)
 

	
 
        response = self._post_json('/transaction', transaction)
 

	
 
        transaction_id = response['transaction_ids'][0]
 

	
 
        self.assertIsNotNone(transaction_id)
 

	
 
        response = self._open_json('DELETE',
 
                                  '/transaction/' + transaction_id)
 

	
 
        self.assertEqual(response['status'], 'OK')
 

	
 
        with self.assertRaises(ValueError):
 
            # ValueError thrown because the response does not contain any JSON
 
            response = self._get_json('/transaction/' + transaction_id, 404)
 

	
 
    def test_post_multiple_transactions(self):
 
        transactions = [
 
            Transaction(
 
                date=datetime.today(),
 
                payee='Rent',
 
                postings=[
 
                    Posting(
 
                        account='Assets:Checking',
 
                        amount=Amount(amount='-4600.00', symbol='SEK')
 
                    ),
 
                    Posting(
 
                        account='Expenses:Rent',
 
                        amount=Amount(amount='4600.00', symbol='SEK')
 
                    )
 
                ]
 
            ),
 
            Transaction(
 
                date=datetime.today(),
 
                payee='Hosting',
 
                postings=[
 
                    Posting(
 
                        account='Assets:Checking',
 
                        amount=Amount(amount='-700.00', symbol='SEK')
 
                    ),
 
                    Posting(
 
                        account='Expenses:Hosting',
 
                        amount=Amount(amount='700.00', symbol='SEK')
 
                    )
 
                ]
 
            )
 
        ]
 

	
 
        response = self._post_json('/transaction',
 
                                  {'transactions': transactions})
 

	
 
        self.assertEqual(len(response['transaction_ids']), 2)
 

	
 
        transactions[0].id = response['transaction_ids'][0]
 
        transactions[1].id = response['transaction_ids'][1]
 

	
 
        response = self._get_json('/transaction/' + transactions[0].id)
 

	
 
        self.assertEqual(transactions[0], response['transaction'])
 

	
 
        response = self._get_json('/transaction/' + transactions[1].id)
 

	
 
        self.assertEqual(transactions[1], response['transaction'])
 

	
 
    def test_update_transaction_payee(self):
 
        transaction = self._add_simple_transaction()
 

	
 
        transaction.payee = 'not Joar'
 

	
 
        response = self._post_json('/transaction/' + transaction.id,
 
                                   {'transaction': transaction})
 

	
 
        self.assertEqual(response['status'], 'OK')
 

	
 
        response = self._get_json('/transaction/'+ transaction.id)
 

	
 
        self.assertEqual(response['transaction'], transaction)
 

	
 
    def test_update_transaction_postings(self):
 
        transaction = self._add_simple_transaction()
 

	
 
        postings = [
 
            Posting(account='Assets:Checking',
 
                    amount=Amount(amount='-733.10', symbol='SEK')),
 
            Posting(account='Expenses:Bar',
 
                    amount=Amount(amount='733.10', symbol='SEK'))
 
        ]
 

	
 
        transaction.postings = postings
 

	
 
        response = self._post_json('/transaction/' + transaction.id,
 
                                   {'transaction': transaction})
 

	
 
        self.assertEqual(response['status'], 'OK')
 

	
 
        response = self._get_json('/transaction/' + transaction.id)
 

	
 
        self.assertEqual(response['transaction'], transaction)
 

	
 
    def test_post_unbalanced_transaction(self):
 
        transaction = Transaction(
 
            date=datetime.today(),
 
            payee='Unbalanced Transaction',
 
            postings=[
 
                Posting(account='Assets:Checking',
 
                        amount=Amount(amount='100.00', symbol='USD')),
 
                Posting(account='Income:Foo',
 
                        amount=Amount(amount='-100.01', symbol='USD'))
 
            ]
 
        )
 

	
 
        self._post_json('/transaction', transaction)
 

	
 
        response = self._get_json('/transaction')
 

	
 
        import pdb; pdb.set_trace()
 

	
 
    def test_update_transaction_amounts(self): pass
 

	
 

	
 
if __name__ == '__main__':
 
    unittest.main()
0 comments (0 inline, 0 general)