Files @ 3e76b47ee35b
Branch filter:

Location: website/www/conservancy/static/projects/policies/publish-policy.py

brett
projects: PyPy is transitioning out.
#!/usr/bin/env python3

import argparse
import contextlib
import functools
import locale
import os
import pathlib
import shutil
import subprocess
import sys
import tempfile

try:
    import markdown
    from markdown.extensions import tables as mdx_tables
    from markdown.extensions import sane_lists as mdx_sane_lists
    from markdown.extensions import smarty as mdx_smarty
    from markdown.extensions import toc as mdx_toc
    markdown_import_success = True
except ImportError:
    if __name__ != '__main__':
        raise
    markdown_import_success = False

TEMPLATE_HEADER = """{% extends "base_projects.html" %}
{% block subtitle %}{% endblock %}
{% block submenuselection %}Policies{% endblock %}
{% block content %}

"""

TEMPLATE_FOOTER = """

{% endblock %}
"""

@contextlib.contextmanager
def run(cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs):
    kwargs.setdefault('stdout', subprocess.PIPE)
    if encoding is None:
        mode = 'rb'
        no_data = b''
    else:
        mode = 'r'
        no_data = ''
    with contextlib.ExitStack() as exit_stack:
        proc = exit_stack.enter_context(subprocess.Popen(cmd, **kwargs))
        pipes = [exit_stack.enter_context(open(
                   getattr(proc, name).fileno(), mode, encoding=encoding, closefd=False))
                 for name in ['stdout', 'stderr']
                 if kwargs.get(name) is subprocess.PIPE]
        if pipes:
            yield (proc, *pipes)
        else:
            yield proc
        for pipe in pipes:
            for _ in iter(lambda: pipe.read(4096), no_data):
                pass
    if proc.returncode not in ok_exitcodes:
        raise subprocess.CalledProcessError(proc.returncode, cmd)

class GitPath:
    GIT_BIN = shutil.which('git')
    CLEAN_ENV = {k: v for k, v in os.environ.items() if not k.startswith('GIT_')}
    ANY_EXITCODE = range(-256, 257)
    IGNORE_ERRORS = {
        'ok_exitcodes': ANY_EXITCODE,
        'stderr': subprocess.DEVNULL,
    }
    STATUS_CLEAN_OR_UNMANAGED = frozenset(' ?')

    def __init__(self, path, encoding, env=None):
        self.path = path
        self.dir_path = path if path.is_dir() else path.parent
        self.encoding = encoding
        self.run_defaults = {
            'cwd': str(self.dir_path),
            'env': env,
        }

    @classmethod
    def can_run(cls):
        return cls.GIT_BIN is not None

    def _run(self, cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs):
        return run(cmd, encoding, ok_exitcodes, **self.run_defaults, **kwargs)

    def _cache(orig_func):
        attr_name = '_cached_' + orig_func.__name__
        @functools.wraps(orig_func)
        def cache_wrapper(self):
            try:
                return getattr(self, attr_name)
            except AttributeError:
                setattr(self, attr_name, orig_func(self))
                return getattr(self, attr_name)
        return cache_wrapper

    @_cache
    def is_work_tree(self):
        with self._run([self.GIT_BIN, 'rev-parse', '--is-inside-work-tree'],
                       self.encoding, **self.IGNORE_ERRORS) as (_, stdout):
            return stdout.readline() == 'true\n'

    @_cache
    def status_lines(self):
        with self._run([self.GIT_BIN, 'status', '-z'],
                       self.encoding) as (_, stdout):
            return stdout.read().split('\0')

    @_cache
    def has_managed_modifications(self):
        return any(line and line[1] not in self.STATUS_CLEAN_OR_UNMANAGED
                   for line in self.status_lines())

    @_cache
    def has_staged_changes(self):
        return any(line and line[0] not in self.STATUS_CLEAN_OR_UNMANAGED
                   for line in self.status_lines())

    def commit_at(self, revision):
        with self._run([self.GIT_BIN, 'rev-parse', revision],
                       self.encoding) as (_, stdout):
            return stdout.readline().rstrip('\n') or None

    @_cache
    def upstream_commit(self):
        return self.commit_at('@{upstream}')

    @_cache
    def head_commit(self):
        return self.commit_at('HEAD')

    def in_sync_with_upstream(self):
        return self.upstream_commit() == self.head_commit()

    @_cache
    def last_commit(self):
        with self._run([self.GIT_BIN, 'log', '-n1', '--format=format:%H', self.path.name],
                       self.encoding, **self.IGNORE_ERRORS) as (_, stdout):
            return stdout.readline().rstrip('\n') or None

    def operate(self, subcmd, ok_exitcodes=frozenset([0])):
        with self._run([self.GIT_BIN, *subcmd], None, ok_exitcodes, stdout=None):
            pass


def add_parser_flag(argparser, dest, **kwargs):
    kwargs.update(dest=dest, default=None)
    switch_root = dest.replace('_', '-')
    switch = '--' + switch_root
    argparser.add_argument(switch, **kwargs, action='store_true')
    kwargs['help'] = "Do not do {}".format(switch)
    argparser.add_argument('--no-' + switch_root, **kwargs, action='store_false')

def parse_arguments(arglist):
    parser = argparse.ArgumentParser(
        epilog="""By default, the program will pull from Git if the output path
is a Git checkout with a tracking branch, and will commit and push if
that checkout is in sync with the tracking branch without any staged changes.
Setting any flag will always override the default behavior.
""",
    )

    parser.add_argument(
        '--encoding', '-E',
        default=locale.getpreferredencoding(),
        help="Encoding to use for all I/O. "
        "Default is your locale's encoding.",
    )
    parser.add_argument(
        '--revision', '-r',
        help="Revision string to version the published page. "
        "Default determined from the revision of the source file.",
    )
    add_parser_flag(
        parser, 'pull',
        help="Try to pull the remote tracking branch to make the checkout "
        "up-to-date before making changes"
    )
    add_parser_flag(
        parser, 'commit',
        help="Commit changes to the website repository",
    )
    parser.add_argument(
        '-m', dest='commit_message',
        default="Publish {filename} revision {revision}.",
        help="Message for any commit",
    )
    add_parser_flag(
        parser, 'push',
        help="Push to the remote tracking branch after committing changes",
    )
    parser.add_argument(
        'input_path', type=pathlib.Path,
        help="Path to the Conservancy policy Markdown source",
    )
    parser.add_argument(
        'output_path', type=pathlib.Path,
        nargs='?', default=pathlib.Path(__file__).parent,
        help="Path to the directory to write output files",
    )

    if not markdown_import_success:
        parser.error("""markdown module is not installed.
Try `apt install python3-markdown` or `python3 -m pip install --user Markdown`.""")

    args = parser.parse_args(arglist)
    args.git_output = GitPath(args.output_path, args.encoding)
    if args.pull or args.commit or args.push:
        if not args.git_output.can_run():
            parser.error("Git operation requested but `git` not found in PATH")
        elif not args.git_output.is_work_tree():
            parser.error("Git operation requested but {} is not a working path".format(
                args.output_path.as_posix()))
    if args.revision is None:
        try:
            args.revision = GitPath(args.input_path, args.encoding, GitPath.CLEAN_ENV).last_commit()
        except subprocess.CalledProcessError:
            pass
        if args.revision is None:
            parser.error("no --revision specified and not found from input path")
    args.output_link_path = args.git_output.dir_path / args.input_path.with_suffix('.html').name
    args.output_file_path = args.output_link_path.with_suffix('.{}.html'.format(args.revision))
    return args

class GitOperation:
    def __init__(self, args):
        self.args = args
        self.git_path = args.git_output
        self.exitcode = None
        self.on_work_tree = self.git_path.can_run() and self.git_path.is_work_tree()

    def run(self):
        arg_state = getattr(self.args, self.NAME)
        if arg_state is None:
            arg_state = self.should_run()
        if not arg_state:
            return
        try:
            self.exitcode = self.run_git() or 0
        except subprocess.CalledProcessError as error:
            self.exitcode = error.returncode


class GitPull(GitOperation):
    NAME = 'pull'

    def should_run(self):
        return self.on_work_tree and not self.git_path.has_staged_changes()

    def run_git(self):
        self.git_path.operate(['fetch', '--no-tags'])
        self.git_path.operate(['merge', '--ff-only'])


class GitCommit(GitOperation):
    NAME = 'commit'
    VERB = 'committed'

    def __init__(self, args):
        super().__init__(args)
        try:
            self._should_run = ((not self.git_path.has_staged_changes())
                                and self.git_path.in_sync_with_upstream())
        except subprocess.CalledProcessError:
            self._should_run = False

    def should_run(self):
        return self.on_work_tree and self._should_run

    def run_git(self):
        self.git_path.operate([
            'add', str(self.args.output_file_path), str(self.args.output_link_path),
        ])
        commit_message = self.args.commit_message.format(
            filename=self.args.output_link_path.name,
            revision=self.args.revision,
        )
        self.git_path.operate(['commit', '-m', commit_message])


class GitPush(GitCommit):
    NAME = 'push'
    VERB = 'pushed'

    def run_git(self):
        self.git_path.operate(['push'])


def write_output(args):
    converter = markdown.Markdown(
        extensions=[
            mdx_tables.TableExtension(),
            mdx_sane_lists.SaneListExtension(),
            mdx_smarty.SmartyExtension(),
            mdx_toc.TocExtension(),
        ],
        output_format='html5',
    )
    header = TEMPLATE_HEADER
    with args.input_path.open(encoding=args.encoding) as src_file:
        for line in src_file:
            if line.startswith('# '):
                subtitle = line[2:].replace('Software Freedom Conservancy', '').strip()
                header = header.replace(
                    '{% block subtitle %}',
                    '{{% block subtitle %}}{} - '.format(subtitle),
                )
                break
        src_file.seek(0)
        body = converter.convert(src_file.read())
    with tempfile.NamedTemporaryFile(
            'w',
            encoding=args.encoding,
            dir=args.git_output.dir_path.as_posix(),
            suffix='.html',
            delete=False,
    ) as tmp_out:
        try:
            tmp_out.write(header)
            tmp_out.write(body)
            tmp_out.write(TEMPLATE_FOOTER)
            tmp_out.flush()
            os.rename(tmp_out.name, str(args.output_file_path))
        except BaseException:
            os.unlink(tmp_out.name)
            raise
    if args.output_link_path.is_symlink():
        args.output_link_path.unlink()
    args.output_link_path.symlink_to(args.output_file_path.name)

def main(arglist=None, stdout=sys.stdout, stderr=sys.stderr):
    args = parse_arguments(arglist)
    pull = GitPull(args)
    pull.run()
    if pull.exitcode:
        return pull.exitcode
    write_output(args)
    ops = [GitCommit(args), GitPush(args)]
    for op in ops:
        op.run()
        if op.exitcode != 0:
            exitcode = op.exitcode or 0
            break
    else:
        exitcode = 0
    print(args.input_path.name, "converted,",
          ", ".join(op.VERB if op.exitcode == 0 else "not " + op.VERB for op in ops),
          file=stdout)
    return exitcode

if __name__ == '__main__':
    exit(main())