Changeset - 758f601316d4
[Not reviewed]
0 1 0
Joar Wandborg - 11 years ago 2013-12-09 18:10:57
joar@wandborg.se
timout => timeout
1 file changed with 1 insertions and 1 deletions:
0 comments (0 inline, 0 general)
accounting/__init__.py
Show inline comments
 
import subprocess
 
import logging
 
import time
 

	
 
from datetime import datetime
 
from xml.etree import ElementTree
 
from contextlib import contextmanager
 

	
 
_log = logging.getLogger(__name__)
 

	
 
class Ledger:
 
    def __init__(self, ledger_file=None, ledger_bin=None):
 
        if ledger_file is None:
 
            raise ValueError('ledger_file cannot be None')
 

	
 
        self.ledger_bin = ledger_bin or 'ledger'
 
        self.ledger_file = ledger_file
 
        _log.info('ledger file: %s', ledger_file)
 

	
 
        self.locked = False
 
        self.ledger_process = None
 

	
 
    @contextmanager
 
    def locked_process(self):
 
        if self.locked:
 
            raise RuntimeError('The process has already been locked,'
 
                               ' something\'s out of order.')
 

	
 
            # XXX: This code has no purpose in a single-threaded process
 
            timout = 5  # Seconds
 
            timeout = 5  # Seconds
 

	
 
            for i in range(1, timeout + 2):
 
                if i > timeout:
 
                    raise RuntimeError('Ledger process is already locked')
 

	
 
                if not self.locked:
 
                    break
 
                else:
 
                    _log.info('Waiting for one second... %d/%d', i, timeout)
 
                    time.sleep(1)
 

	
 

	
 
        process = self.get_process()
 

	
 
        self.locked = True
 
        _log.debug('lock enabled')
 

	
 
        yield process
 

	
 
        self.locked = False
 
        _log.debug('lock disabled')
 

	
 
    def assemble_arguments(self):
 
        return [
 
            self.ledger_bin,
 
            '-f',
 
            self.ledger_file,
 
        ]
 

	
 
    def init_process(self):
 
        _log.debug('starting ledger process')
 
        self.ledger_process = subprocess.Popen(
 
            self.assemble_arguments(),
 
            stdout=subprocess.PIPE,
 
            stdin=subprocess.PIPE,
 
            stderr=subprocess.PIPE)
 

	
 
        # Swallow the banner
 
        with self.locked_process() as p:
 
            self.read_until_prompt(p)
 

	
 
        return self.ledger_process
 

	
 
    def get_process(self):
 
        return self.ledger_process or self.init_process()
 

	
 
    def read_until_prompt(self, p):
 
        output = b''
 

	
 
        while True:
 
            # _log.debug('reading data')
 

	
 
            line = p.stdout.read(1)  # XXX: This is a hack
 
            # _log.debug('line: %s', line)
 

	
 
            output += line
 

	
 
            if b'\n] ' in output:
 
                _log.debug('found prompt!')
 
                break
 

	
 
        output = output[:-3]  # Cut away the prompt
 

	
 
        _log.debug('output: %s', output)
 

	
 
        return output
 

	
 
    def send_command(self, p, command):
 
        # TODO: Should be extended to handle the locking and return the output
 
        _bytes = p.stdin.write(command + b'\n')
 
        p.stdin.flush()
 

	
 
        return _bytes
 

	
 
    def bal(self):
 
        output = None
 

	
 
        with self.locked_process() as p:
 
            _log.debug('aquired process lock')
 
            self.send_command(p, b'bal --format "%A|%t\\\\n"')
 
            _log.debug('sent command')
 

	
 
            output = self.read_until_prompt(p)
 

	
 
        if output is None:
 
            raise RuntimeError('bal call returned no output')
 

	
 
        accounts = []
 

	
 
        for line in output.split(b'\n'):
 
            name, balance =  line.decode('utf8').split('|')
 

	
 
            accounts.append(Account(name=name, balance=balance))
 

	
 
        return accounts
 

	
 
    def reg(self):
 
        output = None
 

	
 
        with self.locked_process() as p:
 
            _log.debug('aquired process lock')
 
            self.send_command(p, b'xml')
 

	
 
            output = self.read_until_prompt(p)
 

	
 
        if output is None:
 
            raise RuntimeError('reg call returned no output')
 

	
 
        entries = []
 

	
 
        reg_xml = ElementTree.fromstring(output.decode('utf8'))
 

	
 
        for transaction in reg_xml.findall('./transactions/transaction'):
 
            date = datetime.strptime(transaction.find('./date').text,
 
                                     '%Y/%m/%d')
 
            payee = transaction.find('./payee').text
 

	
 
            postings = []
 

	
 
            for posting in transaction.findall('./postings/posting'):
 
                account = posting.find('./account/name').text
 
                amount = posting.find('./post-amount/amount/quantity').text
 
                symbol = posting.find(
 
                    './post-amount/amount/commodity/symbol').text
 

	
 
                postings.append(
 
                    Posting(account=account, amount=amount, symbol=symbol))
 

	
 
            entries.append(
 
                Transaction(date=date, payee=payee, postings=postings))
 

	
 
        return entries
 

	
 

	
 
class Transaction:
 
    def __init__(self, date=None, payee=None, postings=None):
 
        self.date = date
 
        self.payee = payee
 
        self.postings = postings
 

	
 
    def __repr__(self):
 
        return ('<{self.__class__.__name__} {date}' +
 
                ' {self.payee} {self.postings}').format(
 
                    self=self,
 
                    date=self.date.isoformat())
 

	
 
class Posting:
 
    def __init__(self, account=None, amount=None, symbol=None):
 
        self.account = account
 
        self.amount = amount
 
        self.symbol = symbol
 

	
 
    def __repr__(self):
 
        return ('<{self.__class__.__name__} "{self.account}"' +
 
                ' {self.symbol} {self.amount}>').format(self=self)
 

	
 

	
 
class Account:
 
    def __init__(self, name=None, balance=None):
 
        self.name = name
 
        self.balance = balance
 

	
 
    def __repr__(self):
 
        return '<{self.__class__.__name__}: "{self.name}" {self.balance} >'.format(
 
            self=self)
 

	
 

	
 
def main():
 
    ledger = Ledger(ledger_file='non-profit-test-data.ledger')
 
    print(ledger.bal())
 
    print(ledger.reg())
 

	
 

	
 
if __name__ == '__main__':
 
    logging.basicConfig(level=logging.DEBUG)
 
    main()
0 comments (0 inline, 0 general)