#!/usr/bin/env python3 import argparse import contextlib import functools import locale import os import pathlib import shutil import subprocess import sys import tempfile try: import markdown from markdown.extensions import tables as mdx_tables from markdown.extensions import sane_lists as mdx_sane_lists from markdown.extensions import smarty as mdx_smarty from markdown.extensions import toc as mdx_toc markdown_import_success = True except ImportError: if __name__ != '__main__': raise markdown_import_success = False TEMPLATE_HEADER = """{% extends "base_projects.html" %} {% block subtitle %}{% endblock %} {% block submenuselection %}Policies{% endblock %} {% block content %} """ TEMPLATE_FOOTER = """ {% endblock %} """ @contextlib.contextmanager def run(cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs): kwargs.setdefault('stdout', subprocess.PIPE) if encoding is None: mode = 'rb' no_data = b'' else: mode = 'r' no_data = '' with contextlib.ExitStack() as exit_stack: proc = exit_stack.enter_context(subprocess.Popen(cmd, **kwargs)) pipes = [exit_stack.enter_context(open( getattr(proc, name).fileno(), mode, encoding=encoding, closefd=False)) for name in ['stdout', 'stderr'] if kwargs.get(name) is subprocess.PIPE] if pipes: yield (proc, *pipes) else: yield proc for pipe in pipes: for _ in iter(lambda: pipe.read(4096), no_data): pass if proc.returncode not in ok_exitcodes: raise subprocess.CalledProcessError(proc.returncode, cmd) class GitPath: GIT_BIN = shutil.which('git') CLEAN_ENV = {k: v for k, v in os.environ.items() if not k.startswith('GIT_')} ANY_EXITCODE = range(-256, 257) IGNORE_ERRORS = { 'ok_exitcodes': ANY_EXITCODE, 'stderr': subprocess.DEVNULL, } STATUS_CLEAN_OR_UNMANAGED = frozenset(' ?') def __init__(self, path, encoding, env=None): self.path = path self.dir_path = path if path.is_dir() else path.parent self.encoding = encoding self.run_defaults = { 'cwd': str(self.dir_path), 'env': env, } @classmethod def can_run(cls): return cls.GIT_BIN is not None def _run(self, cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs): return run(cmd, encoding, ok_exitcodes, **self.run_defaults, **kwargs) def _cache(orig_func): attr_name = '_cached_' + orig_func.__name__ @functools.wraps(orig_func) def cache_wrapper(self): try: return getattr(self, attr_name) except AttributeError: setattr(self, attr_name, orig_func(self)) return getattr(self, attr_name) return cache_wrapper @_cache def is_work_tree(self): with self._run([self.GIT_BIN, 'rev-parse', '--is-inside-work-tree'], self.encoding, **self.IGNORE_ERRORS) as (_, stdout): return stdout.readline() == 'true\n' @_cache def status_lines(self): with self._run([self.GIT_BIN, 'status', '-z'], self.encoding) as (_, stdout): return stdout.read().split('\0') @_cache def has_managed_modifications(self): return any(line and line[1] not in self.STATUS_CLEAN_OR_UNMANAGED for line in self.status_lines()) @_cache def has_staged_changes(self): return any(line and line[0] not in self.STATUS_CLEAN_OR_UNMANAGED for line in self.status_lines()) def commit_at(self, revision): with self._run([self.GIT_BIN, 'rev-parse', revision], self.encoding) as (_, stdout): return stdout.readline().rstrip('\n') or None @_cache def upstream_commit(self): return self.commit_at('@{upstream}') @_cache def head_commit(self): return self.commit_at('HEAD') def in_sync_with_upstream(self): return self.upstream_commit() == self.head_commit() @_cache def last_commit(self): with self._run([self.GIT_BIN, 'log', '-n1', '--format=format:%H', self.path.name], self.encoding, **self.IGNORE_ERRORS) as (_, stdout): return stdout.readline().rstrip('\n') or None def operate(self, subcmd, ok_exitcodes=frozenset([0])): with self._run([self.GIT_BIN, *subcmd], None, ok_exitcodes, stdout=None): pass def add_parser_flag(argparser, dest, **kwargs): kwargs.update(dest=dest, default=None) switch_root = dest.replace('_', '-') switch = '--' + switch_root argparser.add_argument(switch, **kwargs, action='store_true') kwargs['help'] = "Do not do {}".format(switch) argparser.add_argument('--no-' + switch_root, **kwargs, action='store_false') def parse_arguments(arglist): parser = argparse.ArgumentParser( epilog="""By default, the program will pull from Git if the output path is a Git checkout with a tracking branch, and will commit and push if that checkout is in sync with the tracking branch without any staged changes. Setting any flag will always override the default behavior. """, ) parser.add_argument( '--encoding', '-E', default=locale.getpreferredencoding(), help="Encoding to use for all I/O. " "Default is your locale's encoding.", ) parser.add_argument( '--revision', '-r', help="Revision string to version the published page. " "Default determined from the revision of the source file.", ) add_parser_flag( parser, 'pull', help="Try to pull the remote tracking branch to make the checkout " "up-to-date before making changes" ) add_parser_flag( parser, 'commit', help="Commit changes to the website repository", ) parser.add_argument( '-m', dest='commit_message', default="Publish {filename} revision {revision}.", help="Message for any commit", ) add_parser_flag( parser, 'push', help="Push to the remote tracking branch after committing changes", ) parser.add_argument( 'input_path', type=pathlib.Path, help="Path to the Conservancy policy Markdown source", ) parser.add_argument( 'output_path', type=pathlib.Path, nargs='?', default=pathlib.Path(__file__).parent, help="Path to the directory to write output files", ) if not markdown_import_success: parser.error("""markdown module is not installed. Try `apt install python3-markdown` or `python3 -m pip install --user Markdown`.""") args = parser.parse_args(arglist) args.git_output = GitPath(args.output_path, args.encoding) if args.pull or args.commit or args.push: if not args.git_output.can_run(): parser.error("Git operation requested but `git` not found in PATH") elif not args.git_output.is_work_tree(): parser.error("Git operation requested but {} is not a working path".format( args.output_path.as_posix())) if args.revision is None: try: args.revision = GitPath(args.input_path, args.encoding, GitPath.CLEAN_ENV).last_commit() except subprocess.CalledProcessError: pass if args.revision is None: parser.error("no --revision specified and not found from input path") args.output_link_path = args.git_output.dir_path / args.input_path.with_suffix('.html').name args.output_file_path = args.output_link_path.with_suffix('.{}.html'.format(args.revision)) return args class GitOperation: def __init__(self, args): self.args = args self.git_path = args.git_output self.exitcode = None self.on_work_tree = self.git_path.can_run() and self.git_path.is_work_tree() def run(self): arg_state = getattr(self.args, self.NAME) if arg_state is None: arg_state = self.should_run() if not arg_state: return try: self.exitcode = self.run_git() or 0 except subprocess.CalledProcessError as error: self.exitcode = error.returncode class GitPull(GitOperation): NAME = 'pull' def should_run(self): return self.on_work_tree and not self.git_path.has_staged_changes() def run_git(self): self.git_path.operate(['fetch', '--no-tags']) self.git_path.operate(['merge', '--ff-only']) class GitCommit(GitOperation): NAME = 'commit' VERB = 'committed' def __init__(self, args): super().__init__(args) try: self._should_run = ((not self.git_path.has_staged_changes()) and self.git_path.in_sync_with_upstream()) except subprocess.CalledProcessError: self._should_run = False def should_run(self): return self.on_work_tree and self._should_run def run_git(self): self.git_path.operate([ 'add', str(self.args.output_file_path), str(self.args.output_link_path), ]) commit_message = self.args.commit_message.format( filename=self.args.output_link_path.name, revision=self.args.revision, ) self.git_path.operate(['commit', '-m', commit_message]) class GitPush(GitCommit): NAME = 'push' VERB = 'pushed' def run_git(self): self.git_path.operate(['push']) def write_output(args): converter = markdown.Markdown( extensions=[ mdx_tables.TableExtension(), mdx_sane_lists.SaneListExtension(), mdx_smarty.SmartyExtension(), mdx_toc.TocExtension(), ], output_format='html5', ) header = TEMPLATE_HEADER with args.input_path.open(encoding=args.encoding) as src_file: for line in src_file: if line.startswith('# '): subtitle = line[2:].replace('Software Freedom Conservancy', '').strip() header = header.replace( '{% block subtitle %}', '{{% block subtitle %}}{} - '.format(subtitle), ) break src_file.seek(0) body = converter.convert(src_file.read()) with tempfile.NamedTemporaryFile( 'w', encoding=args.encoding, dir=args.git_output.dir_path.as_posix(), suffix='.html', delete=False, ) as tmp_out: try: tmp_out.write(header) tmp_out.write(body) tmp_out.write(TEMPLATE_FOOTER) tmp_out.flush() os.rename(tmp_out.name, str(args.output_file_path)) except BaseException: os.unlink(tmp_out.name) raise if args.output_link_path.is_symlink(): args.output_link_path.unlink() args.output_link_path.symlink_to(args.output_file_path.name) def main(arglist=None, stdout=sys.stdout, stderr=sys.stderr): args = parse_arguments(arglist) pull = GitPull(args) pull.run() if pull.exitcode: return pull.exitcode write_output(args) ops = [GitCommit(args), GitPush(args)] for op in ops: op.run() if op.exitcode != 0: exitcode = op.exitcode or 0 break else: exitcode = 0 print(args.input_path.name, "converted,", ", ".join(op.VERB if op.exitcode == 0 else "not " + op.VERB for op in ops), file=stdout) return exitcode if __name__ == '__main__': exit(main())