diff --git a/doc/build/html/_modules/accounting/storage/ledgercli.html b/doc/build/html/_modules/accounting/storage/ledgercli.html new file mode 100644 index 0000000000000000000000000000000000000000..c32fbcf174fa9317c4dec862553a4d767cf76d6c --- /dev/null +++ b/doc/build/html/_modules/accounting/storage/ledgercli.html @@ -0,0 +1,429 @@ + + + + + + + + accounting.storage.ledgercli — Accounting API 0.1-beta documentation + + + + + + + + + + + + + + +
+
+
+
+ +

Source code for accounting.storage.ledgercli

+import sys
+import subprocess
+import logging
+import time
+
+from datetime import datetime
+from xml.etree import ElementTree
+from contextlib import contextmanager
+
+from accounting.models import Account, Transaction, Posting, Amount
+from accounting.storage import Storage
+
+_log = logging.getLogger(__name__)
+
+
+
[docs]class Ledger(Storage): + def __init__(self, app=None, ledger_file=None, ledger_bin=None): + if app: + ledger_file = app.config['LEDGER_FILE'] + + if ledger_file is None: + raise ValueError('ledger_file cannot be None') + + self.ledger_bin = ledger_bin or 'ledger' + self.ledger_file = ledger_file + _log.info('ledger file: %s', ledger_file) + + self.locked = False + self.ledger_process = None + + @contextmanager +
[docs] def locked_process(self): + r''' + Context manager that checks that the ledger process is not already + locked, then "locks" the process and yields the process handle and + unlocks the process when execution is returned. + + Since this decorated as a :func:`contextlib.contextmanager` the + recommended use is with the ``with``-statement. + + .. code-block:: python + + with self.locked_process() as p: + p.stdin.write(b'bal\n') + + output = self.read_until_prompt(p) + + ''' + if self.locked: + raise RuntimeError('The process has already been locked,' + ' something\'s out of order.') + + # XXX: This code has no purpose in a single-threaded process + timeout = 5 # Seconds + + for i in range(1, timeout + 2): + if i > timeout: + raise RuntimeError('Ledger process is already locked') + + if not self.locked: + break + else: + _log.info('Waiting for one second... %d/%d', i, timeout) + time.sleep(1) + + process = self.get_process() + + self.locked = True + _log.debug('Lock enabled') + + yield process + + self.locked = False + _log.debug('Lock disabled') +
+
[docs] def assemble_arguments(self): + ''' + Returns a list of arguments suitable for :class:`subprocess.Popen` + based on :attr:`self.ledger_bin` and :attr:`self.ledger_file`. + ''' + return [ + self.ledger_bin, + '-f', + self.ledger_file, + ] +
+
[docs] def init_process(self): + ''' + Creates a new (presumably) ledger subprocess based on the args from + :meth:`Ledger.assemble_arguments()` and then runs + :meth:`Ledger.read_until_prompt()` once (which should return the banner + text) and discards the output. + ''' + _log.debug('Starting ledger process...') + self.ledger_process = subprocess.Popen( + self.assemble_arguments(), + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE) + + # Swallow the banner + with self.locked_process() as p: + self.read_until_prompt(p) + + return self.ledger_process +
+
[docs] def get_process(self): + ''' + Returns :attr:`self.ledger_process` if it evaluates to ``True``. If + :attr:`self.ledger_process` is not set the result of + :meth:`self.init_process() <Ledger.init_process>` is returned. + ''' + return self.ledger_process or self.init_process() +
+
[docs] def read_until_prompt(self, process): + r''' + Reads from the subprocess instance :data:`process` until it finds a + combination of ``\n]\x20`` (the prompt), then returns the output + without the prompt. + ''' + output = b'' + + while True: + line = process.stdout.read(1) # XXX: This is a hack + + output += line + + if b'\n] ' in output: + _log.debug('Found prompt!') + break + + output = output[:-3] # Cut away the prompt + + _log.debug('output: %s', output) + + return output +
+
[docs] def send_command(self, command): + output = None + + with self.locked_process() as p: + if isinstance(command, str): + command = command.encode('utf8') + + p.stdin.write(command + b'\n') + p.stdin.flush() + + output = self.read_until_prompt(p) + + self.ledger_process.send_signal(subprocess.signal.SIGTERM) + _log.debug('Waiting for ledger to shut down') + self.ledger_process.wait() + self.ledger_process = None + + return output +
+
[docs] def add_transaction(self, transaction): + ''' + Writes a transaction to the ledger file by opening it in 'ab' mode and + writing a ledger transaction based on the + :class:`~accounting.models.Transaction` instance in + :data:`transaction`. + ''' + if not transaction.metadata.get('Id'): + transaction.generate_id() + + transaction_template = ('\n{date} {t.payee}\n' + '{tags}' + '{postings}') + + metadata_template = ' ;{0}: {1}\n' + + # TODO: Generate metadata for postings + posting_template = (' {account} {p.amount.symbol}' + ' {p.amount.amount}\n') + + output = b'' + + # XXX: Even I hardly understands what this does, however I indent it it + # stays unreadable. + output += transaction_template.format( + date=transaction.date.strftime('%Y-%m-%d'), + t=transaction, + tags=''.join([ + metadata_template.format(k, v) + for k, v in transaction.metadata.items()]), + postings=''.join([posting_template.format( + p=p, + account=p.account + ' ' * ( + 80 - (len(p.account) + len(p.amount.symbol) + + len(str(p.amount.amount)) + 1 + 2) + )) for p in transaction.postings + ]) + ).encode('utf8') + + with open(self.ledger_file, 'ab') as f: + f.write(output) + + _log.debug('written to file: %s', output) +
+
[docs] def bal(self): + output = self.send_command('xml') + + if output is None: + raise RuntimeError('bal call returned no output') + + accounts = [] + + xml = ElementTree.fromstring(output.decode('utf8')) + + accounts = self._recurse_accounts(xml.find('./accounts')) + + return accounts +
+ def _recurse_accounts(self, root): + accounts = [] + + for account in root.findall('./account'): + name = account.find('./fullname').text + + amounts = [] + + # Try to find an account total value, then try to find the account + # balance + account_amounts = account.findall( + './account-total/balance/amount') or \ + account.findall('./account-amount/amount') or \ + account.findall('./account-total/amount') + + if account_amounts: + for amount in account_amounts: + quantity = amount.find('./quantity').text + symbol = amount.find('./commodity/symbol').text + + amounts.append(Amount(amount=quantity, symbol=symbol)) + else: + _log.warning('Account %s does not have any amounts', name) + + accounts.append(Account(name=name, + amounts=amounts, + accounts=self._recurse_accounts(account))) + + return accounts + +
[docs] def get_transactions(self): + return self.reg() +
+
[docs] def reg(self): + output = self.send_command('xml') + + if output is None: + raise RuntimeError('reg call returned no output') + + entries = [] + + reg_xml = ElementTree.fromstring(output.decode('utf8')) + + for transaction in reg_xml.findall('./transactions/transaction'): + date = datetime.strptime(transaction.find('./date').text, + '%Y/%m/%d') + payee = transaction.find('./payee').text + + postings = [] + + for posting in transaction.findall('./postings/posting'): + account = posting.find('./account/name').text + amount = posting.find('./post-amount/amount/quantity').text + symbol = posting.find( + './post-amount/amount/commodity/symbol').text + + # Get the posting metadata + metadata = {} + + values = posting.findall('./metadata/value') + if values: + for value in values: + key = value.get('key') + value = value.find('./string').text + + _log.debug('metadata: %s: %s', key, value) + + metadata.update({key: value}) + + postings.append( + Posting(account=account, + metadata=metadata, + amount=Amount(amount=amount, symbol=symbol))) + + # Get the transaction metadata + metadata = {} + + values = transaction.findall('./metadata/value') + if values: + for value in values: + key = value.get('key') + value = value.find('./string').text + + _log.debug('metadata: %s: %s', key, value) + + metadata.update({key: value}) + + # Add a Transaction instance to the list + id = metadata.pop('Id') + entries.append( + Transaction(id=id, date=date, payee=payee, postings=postings, + metadata=metadata)) + + return entries +
+
[docs] def update_transaction(self, transaction): + _log.debug('DUMMY: Updated transaction: %s', transaction) + +
+
[docs]def main(argv=None): + import argparse + if argv is None: + argv = sys.argv + + parser = argparse.ArgumentParser() + parser.add_argument('-v', '--verbosity', + default='INFO', + help=('Filter logging output. Possible values:' + + ' CRITICAL, ERROR, WARNING, INFO, DEBUG')) + + args = parser.parse_args(argv[1:]) + logging.basicConfig(level=getattr(logging, args.verbosity, 'INFO')) + ledger = Ledger(ledger_file='non-profit-test-data.ledger') + print(ledger.bal()) + print(ledger.reg()) + +
+if __name__ == '__main__': + sys.exit(main()) +
+ +
+
+
+
+
+ + +
+
+
+
+ + + + \ No newline at end of file