Source code for runcommands.runner
import os
from itertools import chain
from .exc import RunAborted, RunCommandsError, RunnerError
from .util import printer
[docs]class CommandRunner:
"""A runner for a given collection of commands."""
def __init__(self, collection, debug=False):
self.collection = collection
self.debug = debug
def run(self, argv):
debug = self.debug
commands_to_run = self.get_commands_to_run(self.collection, argv)
show_help_for = [c for c in commands_to_run if c.help_requested]
if show_help_for:
count = len(show_help_for)
for cmd in show_help_for:
if count > 1:
printer.hr("Help for", cmd.name, end=os.linesep * 2)
cmd.show_help()
return ()
aborted = False
aborted_exc = None
results = []
commands_with_callbacks = []
for cmd in commands_to_run:
try:
result = cmd.run()
except RunCommandsError as exc:
if isinstance(exc, RunAborted):
aborted = True
aborted_exc = exc.create_nested()
else:
raise
if debug:
if aborted:
printer.debug("\nExiting command runner due to abort")
else:
printer.debug("Exiting command runner due to error")
break
else:
result, return_code = cmd.process_result(result)
results.append(result)
if cmd.callbacks:
commands_with_callbacks.append((cmd, result))
for cmd, result in commands_with_callbacks:
for callback in cmd.callbacks:
callback(cmd.command, result, aborted)
# XXX: Defer till here so callbacks are run first.
if aborted:
raise aborted_exc
return tuple(results)
def get_commands_to_run(self, collection, argv):
debug = self.debug
partition_args = self.partition_args
commands_to_run = []
while argv:
command, command_argv = partition_args(collection, argv)
command.debug = self.debug
command_to_run = CommandToRun(command, command_argv)
commands_to_run.append(command_to_run)
num_consumed = len(command_argv) + 1
argv = argv[num_consumed:]
if debug:
printer.debug("Command to run:", command_to_run)
return commands_to_run
def partition_args(self, collection, args):
name = args[0]
try:
command = collection[name]
except KeyError:
raise RunnerError(f"Unknown command: {name}")
args = args[1:]
command_args = []
partition = [command, command_args]
prev_args = chain([None], args[:-1])
for prev_arg, arg in zip(prev_args, args):
if arg in collection:
# Found a command name. If the previous arg wasn't an
# option expecting a value, assume the command name is
# the next command to run and not an option value or
# positional.
option = command.option_map.get(prev_arg)
if option is None or not option.takes_value:
break
if arg and arg[0] == ":" and arg != ":" and arg[1:] in collection:
# Found an escaped command name. Unescape it.
arg = arg[1:]
command_args.append(arg)
return partition
def print_usage(self):
if not self.collection:
printer.warning("No commands available")
return
print("\nAvailable commands:\n")
for name in sorted(self.collection):
print(f" {name}")
print("\nFor detailed help on a command: runcommands <command> --help")
class CommandToRun:
__slots__ = ("name", "command", "argv", "callbacks", "help_requested")
def __init__(self, command, argv):
argv = command.expand_short_options(argv)
self.name = command.name
self.command = command
self.argv = argv
self.callbacks = command.callbacks
# Ignore args after -- when determining whether help was
# requested for a command because such args aren't command
# options.
try:
dash_dash_index = argv.index("--")
except ValueError:
help_requested_argv = argv
else:
help_requested_argv = argv[:dash_dash_index]
self.help_requested = (
"-h" in help_requested_argv or "--help" in help_requested_argv
)
def run(self):
return self.command.run(self.argv, _expand_short_options=False)
def process_result(self, result, **kwargs):
return self.command.process_result(result, self.argv, **kwargs)
def show_help(self):
print(self.command.help)
def __repr__(self):
return f"Command(name={self.name}, argv={self.argv})"