Source code for runcommands.run

import os
import sys
import toml
from importlib import import_module
from pathlib import Path

from . import __version__
from .args import arg, json_value
from .command import Command
from .collection import Collection
from .exc import RunAborted, RunnerError
from .runner import CommandRunner
from .util import (
    abs_path,
    is_mapping,
    is_project_root,
    is_sequence,
    merge_dicts,
    module_from_path,
    printer,
)


[docs]class Run(Command): name = "runcommands" allowed_config_file_args = ( "globals", "envs", "args", "environ", )
[docs] def implementation( self, commands_module: arg( short_option="-m", ) = None, config_file: arg( short_option="-f", ) = None, # Globals globals_: arg( container=dict, type=json_value, help="Global variables & default args for *all* commands; will be " "injected into itself, default args, and environment variables " "(higher precedence than keyword args)", ) = None, # Special globals (for command line convenience) env: arg( help="env will be added to globals if specified", ) = None, version: arg( help="version will be added to globals if specified", ) = None, echo: arg( type=bool, help="echo=True will be added to globals", inverse_help="echo=False will be added to globals", ) = None, # Environment variables environ: arg( container=dict, help="Additional environment variables; " "added just before commands are run", ) = None, # Meta info: arg( no_inverse=True, help="Show info and exit", mutual_exclusion_group="meta-show", ) = False, list_commands: arg( no_inverse=True, help="Show info & commands and exit", mutual_exclusion_group="meta-show", ) = False, debug: arg( type=bool, help="Print debugging info & re-raise exceptions; also added to globals", ) = None, *, all_argv=(), run_argv=(), command_argv=(), ): """Run one or more commands in succession. For example, assume the commands ``local`` and ``remote`` have been defined; the following will run ``ls`` first on the local host and then on the remote host:: runcommands local ls remote <host> ls When a command name is encountered in ``argv``, it will be considered the starting point of the next command *unless* the previous item in ``argv`` was an option like ``--xyz`` that expects a value (i.e., it's not a flag). To avoid ambiguity when an option value matches a command name, the value can be prepended with a colon to force it to be considered a value and not a command name. """ commands_module = self.find_commands_module(commands_module) if commands_module is None: raise RunnerError("Could not find commands module") config_file = self.find_config_file(config_file) collection = Collection.load_from_module(commands_module) cli_globals = globals_ or {} if env: cli_globals["env"] = env if version: cli_globals["version"] = version if echo is not None: cli_globals["echo"] = echo if debug is not None: cli_globals["debug"] = debug if config_file: args_from_file = self.read_config_file(config_file, collection) args = merge_dicts(args_from_file, {"environ": environ or {}}) config_file_globals = args["globals"] env = env or config_file_globals.get("env") if env: envs = args["envs"] try: env_globals = envs[env] except KeyError: raise RunnerError(f"Unknown env: {env}") # Don't add the selected env's default args dict to # globals, but save it so it can be added back to the # the global envs dict (for inspection purposes). env_default_args = env_globals.pop("args") globals_ = merge_dicts(config_file_globals, env_globals, cli_globals) globals_["envs"] = envs env_globals["args"] = env_default_args else: env_default_args = {} globals_ = merge_dicts(config_file_globals, cli_globals) base_default_args = merge_dicts(args["args"], env_default_args) default_args = {name: {} for name in collection} default_args = merge_dicts(default_args, base_default_args) for command_name, command_default_args in default_args.items(): command = collection[command_name] # Add globals that correspond to this command (that # aren't present in default args section). for name, value in globals_.items(): param = command.find_parameter(name) if param is not None: if param.name not in command_default_args: command_default_args[param.name] = value elif command.has_kwargs: name = Command.normalize_name(name) command_default_args[name] = value if "default_args" not in default_args: # This gives commands access to both their own and # other commands' default args. if command.find_parameter("default_args"): command_default_args["default_args"] = base_default_args # Convert lists to tuples for the command's args that are # specified as being tuples. for name, value in command_default_args.items(): command_arg = command.find_arg(name) if ( command_arg and command_arg.container and isinstance(value, list) ): command_default_args[name] = command_arg.container(value) default_args = {name: args for name, args in default_args.items() if args} environ = args["environ"] else: globals_ = cli_globals default_args = {} environ = environ or {} debug = globals_.get("debug", False) show_info = info or list_commands or not command_argv or debug print_and_exit = info or list_commands globals_, default_args, environ = self.interpolate( globals_, default_args, environ ) if show_info: print("RunCommands", __version__) if debug: printer.debug("Commands module:", commands_module) printer.debug("Config file:", config_file) printer.debug("All args:", all_argv) printer.debug("Run args:", run_argv) printer.debug("Command args:", command_argv) items = ( ("Globals:", globals_), ("Default args:", default_args), ("Env default args:", env_default_args), ("Environment variables:", environ), ) for label, data in items: if data: printer.debug(label) for k in sorted(data): v = data[k] printer.debug(f" - {k} = {v!r}") else: printer.debug(label, data) if environ: os.environ.update(environ) collection.set_attrs(debug=debug) collection.set_default_args(default_args) runner = CommandRunner(collection, debug) if print_and_exit: if list_commands: runner.print_usage() elif not command_argv: printer.warning("\nNo command(s) specified") runner.print_usage() else: runner.run(command_argv)
def run(self, argv, **kwargs): all_argv, run_argv, command_argv = self.partition_argv(argv) if "-d" in run_argv or "--debug" in run_argv: self.debug = True kwargs.update( { "all_argv": all_argv, "run_argv": run_argv, "command_argv": command_argv, } ) return super().run(run_argv, **kwargs)
[docs] def console_script(self, argv=None, **overrides): _argv = sys.argv[1:] if argv is None else argv if "-d" in _argv or "--debug" in _argv: self.debug = True return super().console_script(argv, **overrides)
def partition_argv(self, argv=None): if argv is None: argv = sys.argv[1:] if not argv: return argv, [], [] # Consume all args that appear to be options (and their values, # if applicable), even those that aren't know run options, up # until the first non-option word is reached. That word is # assumed to be the start of commands. def looks_like_option(s): return bool( (s.startswith("-") or s.startswith("--")) and not s.startswith("---") and s.strip("-") ) i = 0 argc = len(argv) run_argv = [] parse_optional = self.parse_optional parse_multi_short_option = self.parse_multi_short_option while i < argc: a = argv[i] if a == "--": # Explicit end of run args. i += 1 break option_data = parse_optional(a) if option_data is not None: # Arg is a known run option. name, option, value = option_data run_argv.append(a) if a in ("-d", "--debug"): self.debug = True elif value is None and option.takes_value: # Collect the option's value if it takes one and one # wasn't provided via --opt=<value>. j = i + 1 if j < argc: run_argv.append(argv[j]) i = j else: if not looks_like_option(a): # Non-option word; assumed to be start of commands. break short_options, value = parse_multi_short_option(a) if short_options is None: run_argv.append(a) else: run_argv.extend(short_options) if "-d" in short_options: self.debug = True if value is not None: run_argv.append(value) else: # Collect the last short option's value if it # takes one and one wasn't provided via # -abc<value>. option_data = parse_optional(short_options[-1]) if option_data is not None: name, option, value = option_data if value is None and option.takes_value: j = i + 1 if j < argc: run_argv.append(argv[j]) i = j i += 1 command_argv = argv[i:] return argv, run_argv, command_argv def find_commands_module(self, commands_module, start_dir="."): if commands_module: if commands_module.endswith(".py"): commands_module = abs_path(commands_module) if not os.path.isfile(commands_module): raise RunnerError( f"Commands file does not exist: {commands_module}" ) else: return module_from_path("commands", commands_module) else: try: return import_module(commands_module) except ImportError: raise RunnerError( f"Commands module could not be imported: {commands_module}" ) current_dir = Path(start_dir).resolve() file_system_root = Path(current_dir.root) checked_file_system_root = False candidates = ("runcommands.py", "commands.py") while not checked_file_system_root: for candidate in candidates: candidate = current_dir / candidate if candidate.is_file(): return module_from_path("commands", candidate) if current_dir == file_system_root: checked_file_system_root = True if is_project_root(current_dir): break current_dir = current_dir.parent return None def find_config_file(self, config_file, start_dir="."): if config_file: config_file = abs_path(config_file) if not os.path.exists(config_file): raise RunnerError(f"Config file does not exists: {config_file}") return config_file current_dir = Path(start_dir).resolve() file_system_root = Path(current_dir.root) checked_file_system_root = False candidates = ("runcommands.toml", "commands.toml", "pyproject.toml") while not checked_file_system_root: for candidate in candidates: candidate = current_dir / candidate if candidate.is_file(): return candidate if current_dir == file_system_root: checked_file_system_root = True if is_project_root(current_dir): break current_dir = current_dir.parent return None def read_config_file(self, config_file, collection): return self._read_config_file(config_file, collection) def _read_config_file(self, config_file, collection): with open(config_file) as fp: args = toml.load(fp) if os.path.basename(config_file) == "pyproject.toml": tool = args.get("tool") or {} args = tool.get("runcommands") or {} for name in self.allowed_config_file_args: # Not present or present but not set if args.get(name) is None: args[name] = {} extends = args.pop("extends", None) for name in tuple(args): if name not in self.allowed_config_file_args: raise RunnerError(f"Arg cannot be specified in config file: {name}") self.normalize_command_and_arg_names(args["args"], collection) envs = args["envs"] for env, data in envs.items(): if data is None: data = {} envs[env] = data data.setdefault("args", {}) self.normalize_command_and_arg_names(data["args"], collection) if extends: extends = abs_path(extends, relative_to=os.path.dirname(config_file)) extended_args = self._read_config_file(extends, collection) args = merge_dicts(extended_args, args) return args def normalize_command_and_arg_names(self, config, collection): for command_name in tuple(config): try: command = collection[command_name] except KeyError: raise RunnerError( f"Unknown command in default args section of " f"config file: {command_name}" ) if command.name != command_name: config[command.name] = config.pop(command_name) command_default_args = config[command.name] for name in tuple(command_default_args): param = command.find_parameter(name) if param is None: raise RunnerError( f"Unknown arg for command {command_name} in " f"default args section of config file: {name}" ) if param is not None and name != param.name: command_default_args[param.name] = command_default_args.pop(name) def interpolate(self, globals_, default_args, environ): if globals_: globals_ = self._interpolate(globals_, globals_) if default_args: context = merge_dicts(globals_, default_args) default_args = self._interpolate(default_args, context) if environ: environ = self._interpolate(environ, globals_) return globals_, default_args, environ def _interpolate(self, obj, context): if is_mapping(obj): items = ((k, self._interpolate(v, context)) for k, v in obj.items()) obj = obj.__class__(items) elif is_sequence(obj): items = (self._interpolate(v, context) for v in obj) obj = obj.__class__(items) else: obj = self._inject(obj, context) return obj def _inject(self, value, context, start=0): if not isinstance(value, str): return value i = value.find("{{", start) if i == -1: return value h = i - 1 if h >= 0 and value[h] == "\\": return self._inject(value, context, h + 2) j = value.rfind("}}", i + 2) if j == -1: # String looks like "{{ abc" # XXX: Error? if self.debug: printer.warning(f'Unclosed interpolation group in value: "{value}"') return value k = j + 2 key = value[i + 2 : j].strip() if not key: # String looks like "{{}} xyz" # XXX: Error? if self.debug: printer.warning(f'Empty interpolation group in value: "{value}"') return value context_value = self._find_in_context(context, key) context_value = self._inject(context_value, context) if i == 0 and k == len(value): value = context_value else: value = f"{value[:i]}{context_value}{value[k:]}" value = self._inject(value, context) return value def _find_in_context(self, context, key): value = context parts = key.split(".") for segment in parts: value = value[segment] return value def sigint_handler(self, _sig_num, _frame): raise RunAborted(0, message="\nAborted by Ctrl-C (SIGINT)")
run = Run()