| [1999] | 1 | """ | 
|---|
|  | 2 | Wrappers around subprocess functionality that simulate an actual shell. | 
|---|
|  | 3 | """ | 
|---|
|  | 4 |  | 
|---|
|  | 5 | import subprocess | 
|---|
|  | 6 | import logging | 
|---|
|  | 7 | import sys | 
|---|
|  | 8 | import os | 
|---|
|  | 9 | import errno | 
|---|
|  | 10 |  | 
|---|
|  | 11 | class Shell(object): | 
|---|
|  | 12 | """ | 
|---|
|  | 13 | An advanced shell that performs logging.  If ``dry`` is ``True``, | 
|---|
|  | 14 | no commands are actually run. | 
|---|
|  | 15 | """ | 
|---|
|  | 16 | def __init__(self, dry = False): | 
|---|
|  | 17 | self.dry = dry | 
|---|
|  | 18 | self.cwd = None | 
|---|
|  | 19 | def call(self, *args, **kwargs): | 
|---|
|  | 20 | """ | 
|---|
|  | 21 | Performs a system call.  The actual executable and options should | 
|---|
|  | 22 | be passed as arguments to this function.  Several keyword arguments | 
|---|
|  | 23 | are also supported: | 
|---|
|  | 24 |  | 
|---|
|  | 25 | :param input: input to feed the subprocess on standard input. | 
|---|
|  | 26 | :param interactive: whether or not directly hook up all pipes | 
|---|
|  | 27 | to the controlling terminal, to allow interaction with subprocess. | 
|---|
|  | 28 | :param strip: if ``True``, instead of returning a tuple, | 
|---|
|  | 29 | return the string stdout output of the command with trailing newlines | 
|---|
|  | 30 | removed.  This emulates the behavior of backticks and ``$()`` in Bash. | 
|---|
|  | 31 | Prefer to use :meth:`eval` instead (you should only need to explicitly | 
|---|
|  | 32 | specify this if you are using another wrapper around this function). | 
|---|
|  | 33 | :param log: if True, we log the call as INFO, if False, we log the call | 
|---|
|  | 34 | as DEBUG, otherwise, we detect based on ``strip``. | 
|---|
|  | 35 | :param stdout: | 
|---|
|  | 36 | :param stderr: | 
|---|
|  | 37 | :param stdin: a file-type object that will be written to or read from as a pipe. | 
|---|
|  | 38 | :returns: a tuple of strings ``(stdout, stderr)``, or a string ``stdout`` | 
|---|
|  | 39 | if ``strip`` is specified. | 
|---|
|  | 40 |  | 
|---|
|  | 41 | >>> sh = Shell() | 
|---|
|  | 42 | >>> sh.call("echo", "Foobar") | 
|---|
|  | 43 | ('Foobar\\n', '') | 
|---|
|  | 44 | >>> sh.call("cat", input='Foobar') | 
|---|
|  | 45 | ('Foobar', '') | 
|---|
|  | 46 | """ | 
|---|
|  | 47 | self._wait() | 
|---|
|  | 48 | kwargs.setdefault("interactive", False) | 
|---|
|  | 49 | kwargs.setdefault("strip", False) | 
|---|
|  | 50 | kwargs.setdefault("python", None) | 
|---|
|  | 51 | kwargs.setdefault("log", None) | 
|---|
|  | 52 | kwargs.setdefault("stdout", subprocess.PIPE) | 
|---|
|  | 53 | kwargs.setdefault("stdin", subprocess.PIPE) | 
|---|
|  | 54 | kwargs.setdefault("stderr", subprocess.PIPE) | 
|---|
|  | 55 | msg = "Running `" + ' '.join(args) + "`" | 
|---|
|  | 56 | if kwargs["strip"] and not kwargs["log"] is True or kwargs["log"] is False: | 
|---|
|  | 57 | logging.debug(msg) | 
|---|
|  | 58 | else: | 
|---|
|  | 59 | logging.info(msg) | 
|---|
|  | 60 | if self.dry: | 
|---|
|  | 61 | if kwargs["strip"]: | 
|---|
|  | 62 | return '' | 
|---|
|  | 63 | return None, None | 
|---|
|  | 64 | kwargs.setdefault("input", None) | 
|---|
|  | 65 | if kwargs["interactive"]: | 
|---|
|  | 66 | stdout=sys.stdout | 
|---|
|  | 67 | stdin=sys.stdin | 
|---|
|  | 68 | stderr=sys.stderr | 
|---|
|  | 69 | else: | 
|---|
|  | 70 | stdout=kwargs["stdout"] | 
|---|
|  | 71 | stdin=kwargs["stdin"] | 
|---|
|  | 72 | stderr=kwargs["stderr"] | 
|---|
|  | 73 | # XXX: There is a possible problem here where we can fill up | 
|---|
|  | 74 | # the kernel buffer if we have 64KB of data.  This shouldn't | 
|---|
|  | 75 | # be a problem, and the fix for such case would be to write to | 
|---|
|  | 76 | # temporary files instead of a pipe. | 
|---|
|  | 77 | # Another possible way of fixing this is converting from a | 
|---|
|  | 78 | # waitpid() pump to a select() pump, creating a pipe to | 
|---|
|  | 79 | # ourself, and then setting up a | 
|---|
|  | 80 | # SIGCHILD handler to write a single byte to the pipe to get | 
|---|
|  | 81 | # us out of select() when a subprocess exits. | 
|---|
|  | 82 | proc = subprocess.Popen(args, stdout=stdout, stderr=stderr, stdin=stdin, cwd=self.cwd, ) | 
|---|
|  | 83 | if self._async(proc, args, **kwargs): | 
|---|
|  | 84 | return proc | 
|---|
|  | 85 | stdout, stderr = proc.communicate(kwargs["input"]) | 
|---|
|  | 86 | # can occur if we were doing interactive communication; i.e. | 
|---|
|  | 87 | # we didn't pass in PIPE. | 
|---|
|  | 88 | if stdout is None: | 
|---|
|  | 89 | stdout = "" | 
|---|
|  | 90 | if stderr is None: | 
|---|
|  | 91 | stderr = "" | 
|---|
|  | 92 | if not kwargs["interactive"]: | 
|---|
|  | 93 | if kwargs["strip"]: | 
|---|
|  | 94 | self._log(None, stderr) | 
|---|
|  | 95 | else: | 
|---|
|  | 96 | self._log(stdout, stderr) | 
|---|
|  | 97 | if proc.returncode: | 
|---|
|  | 98 | raise CallError(proc.returncode, args, stdout, stderr) | 
|---|
|  | 99 | if kwargs["strip"]: | 
|---|
|  | 100 | return str(stdout).rstrip("\n") | 
|---|
|  | 101 | return (stdout, stderr) | 
|---|
|  | 102 | def _log(self, stdout, stderr): | 
|---|
|  | 103 | """Logs the standard output and standard input from a command.""" | 
|---|
|  | 104 | if stdout: | 
|---|
|  | 105 | logging.debug("STDOUT:\n" + stdout) | 
|---|
|  | 106 | if stderr: | 
|---|
|  | 107 | logging.debug("STDERR:\n" + stderr) | 
|---|
|  | 108 | def _wait(self): | 
|---|
|  | 109 | pass | 
|---|
|  | 110 | def _async(self, *args, **kwargs): | 
|---|
|  | 111 | return False | 
|---|
|  | 112 | def callAsUser(self, *args, **kwargs): | 
|---|
|  | 113 | """ | 
|---|
|  | 114 | Performs a system call as a different user.  This is only possible | 
|---|
|  | 115 | if you are running as root.  Keyword arguments | 
|---|
|  | 116 | are the same as :meth:`call` with the following additions: | 
|---|
|  | 117 |  | 
|---|
|  | 118 | :param user: name of the user to run command as. | 
|---|
|  | 119 | :param uid: uid of the user to run command as. | 
|---|
|  | 120 |  | 
|---|
|  | 121 | .. note:: | 
|---|
|  | 122 |  | 
|---|
|  | 123 | The resulting system call internally uses :command:`sudo`, | 
|---|
|  | 124 | and as such environment variables will get scrubbed.  We | 
|---|
|  | 125 | manually preserve :envvar:`SSH_GSSAPI_NAME`. | 
|---|
|  | 126 | """ | 
|---|
|  | 127 | user = kwargs.pop("user", None) | 
|---|
|  | 128 | uid = kwargs.pop("uid", None) | 
|---|
|  | 129 | if not user and not uid: return self.call(*args, **kwargs) | 
|---|
|  | 130 | if os.getenv("SSH_GSSAPI_NAME"): | 
|---|
|  | 131 | # This might be generalized as "preserve some environment" | 
|---|
|  | 132 | args = list(args) | 
|---|
|  | 133 | args.insert(0, "SSH_GSSAPI_NAME=" + os.getenv("SSH_GSSAPI_NAME")) | 
|---|
|  | 134 | if uid: return self.call("sudo", "-u", "#" + str(uid), *args, **kwargs) | 
|---|
|  | 135 | if user: return self.call("sudo", "-u", user, *args, **kwargs) | 
|---|
|  | 136 | def safeCall(self, *args, **kwargs): | 
|---|
|  | 137 | """ | 
|---|
|  | 138 | Checks if the owner of the current working directory is the same | 
|---|
|  | 139 | as the current user, and if it isn't, attempts to sudo to be | 
|---|
|  | 140 | that user.  The intended use case is for calling Git commands | 
|---|
|  | 141 | when running as root, but this method should be used when | 
|---|
|  | 142 | interfacing with any moderately complex program that depends | 
|---|
|  | 143 | on working directory context.  Keyword arguments are the | 
|---|
|  | 144 | same as :meth:`call`. | 
|---|
|  | 145 | """ | 
|---|
|  | 146 | if os.getuid(): | 
|---|
|  | 147 | return self.call(*args, **kwargs) | 
|---|
|  | 148 | uid = os.stat(os.getcwd()).st_uid | 
|---|
|  | 149 | # consider also checking ruid? | 
|---|
|  | 150 | if uid != os.geteuid(): | 
|---|
|  | 151 | kwargs['uid'] = uid | 
|---|
|  | 152 | return self.callAsUser(*args, **kwargs) | 
|---|
|  | 153 | else: | 
|---|
|  | 154 | return self.call(*args, **kwargs) | 
|---|
|  | 155 | def eval(self, *args, **kwargs): | 
|---|
|  | 156 | """ | 
|---|
|  | 157 | Evaluates a command and returns its output, with trailing newlines | 
|---|
|  | 158 | stripped (like backticks in Bash).  This is a convenience method for | 
|---|
|  | 159 | calling :meth:`call` with ``strip``. | 
|---|
|  | 160 |  | 
|---|
|  | 161 | >>> sh = Shell() | 
|---|
|  | 162 | >>> sh.eval("echo", "Foobar") | 
|---|
|  | 163 | 'Foobar' | 
|---|
|  | 164 | """ | 
|---|
|  | 165 | kwargs["strip"] = True | 
|---|
|  | 166 | return self.call(*args, **kwargs) | 
|---|
|  | 167 | def setcwd(self, cwd): | 
|---|
|  | 168 | """ | 
|---|
|  | 169 | Sets the directory processes are executed in. This sets a value | 
|---|
|  | 170 | to be passed as the ``cwd`` argument to ``subprocess.Popen``. | 
|---|
|  | 171 | """ | 
|---|
|  | 172 | self.cwd = cwd | 
|---|
|  | 173 |  | 
|---|
|  | 174 | class ParallelShell(Shell): | 
|---|
|  | 175 | """ | 
|---|
|  | 176 | Modifies the semantics of :class:`Shell` so that | 
|---|
|  | 177 | commands are queued here, and executed in parallel using waitpid | 
|---|
|  | 178 | with ``max`` subprocesses, and result in callback execution | 
|---|
|  | 179 | when they finish. | 
|---|
|  | 180 |  | 
|---|
|  | 181 | .. method:: call(*args, **kwargs) | 
|---|
|  | 182 |  | 
|---|
|  | 183 | Enqueues a system call for parallel processing.  If there are | 
|---|
|  | 184 | no openings in the queue, this will block.  Keyword arguments | 
|---|
|  | 185 | are the same as :meth:`Shell.call` with the following additions: | 
|---|
|  | 186 |  | 
|---|
|  | 187 | :param on_success: Callback function for success (zero exit status). | 
|---|
|  | 188 | The callback function should accept two arguments, | 
|---|
|  | 189 | ``stdout`` and ``stderr``. | 
|---|
|  | 190 | :param on_error: Callback function for failure (nonzero exit status). | 
|---|
|  | 191 | The callback function should accept one argument, the | 
|---|
|  | 192 | exception that would have been thrown by the synchronous | 
|---|
|  | 193 | version. | 
|---|
|  | 194 | :return: The :class:`subprocess.Proc` object that was opened. | 
|---|
|  | 195 |  | 
|---|
|  | 196 | .. method:: callAsUser(*args, **kwargs) | 
|---|
|  | 197 |  | 
|---|
|  | 198 | Enqueues a system call under a different user for parallel | 
|---|
|  | 199 | processing.  Keyword arguments are the same as | 
|---|
|  | 200 | :meth:`Shell.callAsUser` with the additions of keyword | 
|---|
|  | 201 | arguments from :meth:`call`. | 
|---|
|  | 202 |  | 
|---|
|  | 203 | .. method:: safeCall(*args, **kwargs) | 
|---|
|  | 204 |  | 
|---|
|  | 205 | Enqueues a "safe" call for parallel processing.  Keyword | 
|---|
|  | 206 | arguments are the same as :meth:`Shell.safeCall` with the | 
|---|
|  | 207 | additions of keyword arguments from :meth:`call`. | 
|---|
|  | 208 |  | 
|---|
|  | 209 | .. method:: eval(*args, **kwargs) | 
|---|
|  | 210 |  | 
|---|
|  | 211 | No difference from :meth:`call`.  Consider having a | 
|---|
|  | 212 | non-parallel shell if the program you are shelling out | 
|---|
|  | 213 | to is fast. | 
|---|
|  | 214 |  | 
|---|
|  | 215 | """ | 
|---|
|  | 216 | def __init__(self, dry = False, max = 10): | 
|---|
|  | 217 | super(ParallelShell, self).__init__(dry=dry) | 
|---|
|  | 218 | self.running = {} | 
|---|
|  | 219 | self.max = max # maximum of commands to run in parallel | 
|---|
|  | 220 | @staticmethod | 
|---|
|  | 221 | def make(no_parallelize, max): | 
|---|
|  | 222 | """Convenience method oriented towards command modules.""" | 
|---|
|  | 223 | if no_parallelize: | 
|---|
|  | 224 | return DummyParallelShell() | 
|---|
|  | 225 | else: | 
|---|
|  | 226 | return ParallelShell(max=max) | 
|---|
|  | 227 | def _async(self, proc, args, python, on_success, on_error, **kwargs): | 
|---|
|  | 228 | """ | 
|---|
|  | 229 | Gets handed a :class:`subprocess.Proc` object from our deferred | 
|---|
|  | 230 | execution.  See :meth:`Shell.call` source code for details. | 
|---|
|  | 231 | """ | 
|---|
|  | 232 | self.running[proc.pid] = (proc, args, python, on_success, on_error) | 
|---|
|  | 233 | return True # so that the parent function returns | 
|---|
|  | 234 | def _wait(self): | 
|---|
|  | 235 | """ | 
|---|
|  | 236 | Blocking call that waits for an open subprocess slot.  This is | 
|---|
|  | 237 | automatically called by :meth:`Shell.call`. | 
|---|
|  | 238 | """ | 
|---|
|  | 239 | # XXX: This API sucks; the actual call/callAsUser call should | 
|---|
|  | 240 | # probably block automatically (unless I have a good reason not to) | 
|---|
|  | 241 | # bail out immediately on initial ramp up | 
|---|
|  | 242 | if len(self.running) < self.max: return | 
|---|
|  | 243 | # now, wait for open pids. | 
|---|
|  | 244 | try: | 
|---|
|  | 245 | self.reap(*os.waitpid(-1, 0)) | 
|---|
|  | 246 | except OSError as e: | 
|---|
|  | 247 | if e.errno == errno.ECHILD: return | 
|---|
|  | 248 | raise | 
|---|
|  | 249 | def join(self): | 
|---|
|  | 250 | """Waits for all of our subprocesses to terminate.""" | 
|---|
|  | 251 | try: | 
|---|
|  | 252 | while True: | 
|---|
|  | 253 | self.reap(*os.waitpid(-1, 0)) | 
|---|
|  | 254 | except OSError as e: | 
|---|
|  | 255 | if e.errno == errno.ECHILD: return | 
|---|
|  | 256 | raise | 
|---|
|  | 257 | def reap(self, pid, status): | 
|---|
|  | 258 | """Reaps a process.""" | 
|---|
|  | 259 | # ooh, zombie process. reap it | 
|---|
|  | 260 | proc, args, python, on_success, on_error = self.running.pop(pid) | 
|---|
|  | 261 | # XXX: this is slightly dangerous; should actually use | 
|---|
|  | 262 | # temporary files | 
|---|
|  | 263 | stdout = proc.stdout.read() | 
|---|
|  | 264 | stderr = proc.stderr.read() | 
|---|
|  | 265 | self._log(stdout, stderr) | 
|---|
|  | 266 | if status: | 
|---|
|  | 267 | on_error(CallError(proc.returncode, args, stdout, stderr)) | 
|---|
|  | 268 | return | 
|---|
|  | 269 | on_success(stdout, stderr) | 
|---|
|  | 270 |  | 
|---|
|  | 271 | # Setup a convenience global instance | 
|---|
|  | 272 | shell = Shell() | 
|---|
|  | 273 | call = shell.call | 
|---|
|  | 274 | callAsUser = shell.callAsUser | 
|---|
|  | 275 | safeCall = shell.safeCall | 
|---|
|  | 276 | eval = shell.eval | 
|---|
|  | 277 |  | 
|---|
|  | 278 | class DummyParallelShell(ParallelShell): | 
|---|
|  | 279 | """Same API as :class:`ParallelShell`, but doesn't actually | 
|---|
|  | 280 | parallelize (i.e. all calls to :meth:`wait` block.)""" | 
|---|
|  | 281 | def __init__(self, dry = False): | 
|---|
|  | 282 | super(DummyParallelShell, self).__init__(dry=dry, max=1) | 
|---|
|  | 283 |  | 
|---|
|  | 284 | class CallError: | 
|---|
|  | 285 | """Indicates that a subprocess call returned a nonzero exit status.""" | 
|---|
|  | 286 | #: The exit code of the failed subprocess. | 
|---|
|  | 287 | code = None | 
|---|
|  | 288 | #: List of the program and arguments that failed. | 
|---|
|  | 289 | args = None | 
|---|
|  | 290 | #: The stdout of the program. | 
|---|
|  | 291 | stdout = None | 
|---|
|  | 292 | #: The stderr of the program. | 
|---|
|  | 293 | stderr = None | 
|---|
|  | 294 | def __init__(self, code, args, stdout, stderr): | 
|---|
|  | 295 | self.code = code | 
|---|
|  | 296 | self.args = args | 
|---|
|  | 297 | self.stdout = stdout | 
|---|
|  | 298 | self.stderr = stderr | 
|---|
|  | 299 | def __str__(self): | 
|---|
|  | 300 | compact = self.stderr.rstrip().split("\n")[-1] | 
|---|
|  | 301 | return "%s (exited with %d)\n%s" % (compact, self.code, self.stderr) | 
|---|