| 735 | 1 #!/usr/bin/env python | 
|  | 2 | 
|  | 3 """ | 
|  | 4 multiprocessing/subprocess front-end | 
|  | 5 """ | 
|  | 6 | 
|  | 7 # imports | 
|  | 8 import argparse | 
|  | 9 import atexit | 
| 178 | 10 import os | 
| 735 | 11 import signal | 
|  | 12 import subprocess # http://bugs.python.org/issue1731717 | 
| 178 | 13 import sys | 
| 735 | 14 import time | 
|  | 15 import tempfile | 
|  | 16 | 
|  | 17 | 
|  | 18 # globals | 
|  | 19 __all__ = ['Process'] | 
|  | 20 string = (str, unicode) | 
|  | 21 PIDS = set() | 
|  | 22 | 
|  | 23 # ensure subprocesses gets killed on exit | 
|  | 24 def killall(): | 
|  | 25     """kill all subprocess PIDs""" | 
|  | 26     global PIDS | 
|  | 27     for pid in PIDS.copy(): | 
|  | 28         try: | 
|  | 29             os.kill(pid, 9) # SIGKILL | 
|  | 30             PIDS.discard(pid) | 
|  | 31         except: | 
|  | 32             sys.stderr.write("Unable to kill PID {}\n".format(pid)) | 
|  | 33 atexit.register(killall) | 
|  | 34 | 
|  | 35 | 
|  | 36 signals = (signal.SIGHUP, signal.SIGINT, signal.SIGQUIT, signal.SIGSEGV, signal.SIGTERM) # signals to handle | 
|  | 37 fatal = set([signal.SIGINT, signal.SIGSEGV, signal.SIGKILL, signal.SIGTERM]) | 
|  | 38 # ensure subprocesses get killed on signals | 
|  | 39 def sighandler(signum, frame): | 
|  | 40     """https://docs.python.org/2/library/signal.html""" | 
|  | 41     sys.stderr.write('Signal handler called with signal {}\n; terminating subprocesses: {}'.format(signum, | 
|  | 42                                                                                                    ', '.join([str(pid) for pid in sorted(PIDS)]))) | 
|  | 43     killall() | 
|  | 44     if signum in fatal: | 
|  | 45         print ("Caught signal {}; exiting".format(signum)) | 
|  | 46         sys.exit() | 
|  | 47 for signum in signals: | 
|  | 48     try: | 
|  | 49         signal.signal(signum, sighandler) | 
|  | 50     except RuntimeError as e: | 
|  | 51         print ('[{}] {}'.format(signum, e)) | 
|  | 52         raise | 
|  | 53 | 
|  | 54 class Process(subprocess.Popen): | 
|  | 55     """why would you name a subprocess object Popen?""" | 
|  | 56 | 
|  | 57     # http://docs.python.org/2/library/subprocess.html#popen-constructor | 
|  | 58     defaults = {'bufsize': 1, # line buffered | 
|  | 59                 'store_output': True, # store stdout | 
|  | 60                 } | 
|  | 61 | 
|  | 62     def __init__(self, command, **kwargs): | 
|  | 63 | 
|  | 64         # get verbosity | 
|  | 65         self.verbose = kwargs.pop('verbose', False) | 
|  | 66 | 
|  | 67         # setup arguments | 
|  | 68         self.command = command | 
|  | 69         _kwargs = self.defaults.copy() | 
|  | 70         _kwargs.update(kwargs) | 
|  | 71 | 
|  | 72 | 
|  | 73         # on unix, ``shell={True|False}`` should always come from the | 
|  | 74         # type of command (string or list) | 
|  | 75         if not subprocess.mswindows: | 
|  | 76             _kwargs['shell'] = isinstance(command, string) | 
|  | 77 | 
|  | 78         # output buffer | 
|  | 79         self.location = 0 | 
|  | 80         self.output_buffer = tempfile.SpooledTemporaryFile() | 
|  | 81         self.output = '' if _kwargs.pop('store_output') else None | 
|  | 82         _kwargs['stdout'] = self.output_buffer | 
|  | 83 | 
|  | 84         # ensure child in process group | 
|  | 85         # see : | 
|  | 86         # - http://pymotw.com/2/subprocess/#process-groups-sessions | 
|  | 87         # - http://ptspts.blogspot.com/2012/11/how-to-start-and-kill-unix-process-tree.html | 
|  | 88         _kwargs['preexec_fn'] = os.setpgrp | 
|  | 89 | 
|  | 90         # runtime | 
|  | 91         self.start = time.time() | 
|  | 92         self.end = None | 
|  | 93 | 
|  | 94         if self.verbose: | 
|  | 95             # print useful info | 
|  | 96             print ("Running `{}`; started: {}".format(str(self), self.start)) | 
|  | 97 | 
|  | 98         # launch subprocess | 
|  | 99         try: | 
|  | 100             subprocess.Popen.__init__(self, command, **_kwargs) | 
|  | 101             PIDS.add(self.pid) | 
|  | 102             if self.verbose: | 
|  | 103                 # print the PID | 
|  | 104                 print ("PID: {}".format(self.pid)) | 
|  | 105         except: | 
|  | 106             # print the command | 
|  | 107             print ("Failure to run:") | 
|  | 108             print (self.command) | 
|  | 109 | 
|  | 110             # reraise the hard way: | 
|  | 111             # http://www.ianbicking.org/blog/2007/09/re-raising-exceptions.html | 
|  | 112             exc = sys.exc_info() | 
|  | 113             raise exc[0], exc[1], exc[2] | 
|  | 114 | 
|  | 115 | 
|  | 116     def _finalize(self, process_output): | 
|  | 117         """internal function to finalize""" | 
|  | 118 | 
|  | 119         # read final output | 
|  | 120         if process_output is not None: | 
|  | 121             self.read(process_output) | 
|  | 122 | 
|  | 123         # reset output buffer location | 
|  | 124         self.output_buffer.seek(0) | 
|  | 125 | 
|  | 126         # set end time | 
|  | 127         self.end = time.time() | 
|  | 128 | 
|  | 129         # remove PID from list | 
|  | 130         PIDS.discard(self.pid) | 
|  | 131 | 
|  | 132     def poll(self, process_output=None): | 
| 178 | 133 | 
| 735 | 134         if process_output is not None: | 
|  | 135             self.read(process_output) # read from output buffer | 
|  | 136         poll = subprocess.Popen.poll(self) | 
|  | 137         if poll is not None: | 
|  | 138             self._finalize(process_output) | 
|  | 139         return poll | 
|  | 140 | 
|  | 141     def wait(self, maxtime=None, sleep=1., process_output=None): | 
|  | 142         """ | 
|  | 143         maxtime -- timeout in seconds | 
|  | 144         sleep -- number of seconds to sleep between polling | 
|  | 145         """ | 
|  | 146         while self.poll(process_output) is None: | 
|  | 147 | 
|  | 148             # check for timeout | 
|  | 149             curr_time = time.time() | 
|  | 150             run_time = self.runtime() | 
|  | 151             if maxtime is not None and run_time > maxtime: | 
|  | 152                 self.kill() | 
|  | 153                 self._finalize(process_output) | 
|  | 154                 return | 
|  | 155 | 
|  | 156             # naptime | 
|  | 157             if sleep: | 
|  | 158                 time.sleep(sleep) | 
|  | 159 | 
|  | 160         # finalize | 
|  | 161         self._finalize(process_output) | 
|  | 162 | 
|  | 163         return self.returncode # set by ``.poll()`` | 
|  | 164 | 
|  | 165     def read(self, process_output=None): | 
|  | 166         """read from the output buffer""" | 
|  | 167 | 
|  | 168         self.output_buffer.seek(self.location) | 
|  | 169         read = self.output_buffer.read() | 
|  | 170         if self.output is not None: | 
|  | 171             self.output += read | 
|  | 172         if process_output: | 
|  | 173             process_output(read) | 
|  | 174         self.location += len(read) | 
|  | 175         return read | 
|  | 176 | 
|  | 177     def commandline(self): | 
|  | 178         """returns string of command line""" | 
|  | 179 | 
|  | 180         if isinstance(self.command, string): | 
|  | 181             return self.command | 
|  | 182         return subprocess.list2cmdline(self.command) | 
|  | 183 | 
|  | 184     __str__ = commandline | 
|  | 185 | 
|  | 186     def runtime(self): | 
|  | 187         """returns time spent running or total runtime if completed""" | 
|  | 188 | 
|  | 189         if self.end is None: | 
|  | 190             return time.time() - self.start | 
|  | 191         return self.end - self.start | 
|  | 192 | 
|  | 193 | 
|  | 194 def main(args=sys.argv[1:]): | 
|  | 195     """CLI""" | 
|  | 196 | 
|  | 197     description = """demonstration of how to do things with subprocess""" | 
|  | 198 | 
|  | 199     # available programs | 
|  | 200     progs = {'yes': ["yes"], | 
|  | 201              'ping': ['ping', 'google.com']} | 
| 178 | 202 | 
| 735 | 203     # parse command line | 
|  | 204     parser = argparse.ArgumentParser(description=description) | 
|  | 205     parser.add_argument("-t", "--time", dest="time", | 
|  | 206                         type=float, default=4., | 
|  | 207                         help="seconds to run for (or <= 0 for forever)") | 
|  | 208     parser.add_argument("-s", "--sleep", dest="sleep", | 
|  | 209                         type=float, default=1., | 
|  | 210                         help="sleep this number of seconds between polling") | 
|  | 211     parser.add_argument("-p", "--prog", dest='program', | 
|  | 212                         choices=progs.keys(), default='ping', | 
|  | 213                         help="subprocess to run") | 
|  | 214     parser.add_argument("--list-programs", dest='list_programs', | 
|  | 215                         action='store_true', default=False, | 
|  | 216                         help="list available programs") | 
|  | 217     parser.add_argument("--wait", dest='wait', | 
|  | 218                         action='store_true', default=False, | 
|  | 219                         help="run with ``.wait()`` and a callback") | 
|  | 220     parser.add_argument("--callback", dest='callback', | 
|  | 221                         action='store_true', default=False, | 
|  | 222                         help="run with polling and a callback") | 
|  | 223     options = parser.parse_args(args) | 
|  | 224 | 
|  | 225     # list programs | 
|  | 226     if options.list_programs: | 
|  | 227         for key in sorted(progs.keys()): | 
|  | 228             print ('{}: {}'.format(key, subprocess.list2cmdline(progs[key]))) | 
|  | 229         sys.exit(0) | 
|  | 230 | 
|  | 231     # select program | 
|  | 232     prog = progs[options.program] | 
|  | 233 | 
|  | 234     # start process | 
|  | 235     proc = Process(prog) | 
|  | 236 | 
|  | 237     # demo function for processing output | 
|  | 238     def output_processor(output): | 
|  | 239         print ('[{}]:\n{}\n{}'.format(proc.runtime(), | 
|  | 240                                       output.upper(), | 
|  | 241                                       '-==-'*10)) | 
|  | 242     if options.callback: | 
|  | 243         process_output = output_processor | 
|  | 244     else: | 
|  | 245         process_output = None | 
|  | 246 | 
|  | 247     if options.wait: | 
|  | 248         # wait for being done | 
|  | 249         proc.wait(maxtime=options.time, sleep=options.sleep, process_output=output_processor) | 
|  | 250     else: | 
|  | 251         # start the main subprocess loop | 
|  | 252         while proc.poll(process_output) is None: | 
|  | 253 | 
|  | 254             if options.time > 0 and proc.runtime() > options.time: | 
|  | 255                 proc.kill() | 
|  | 256 | 
|  | 257             if options.sleep: | 
|  | 258                 time.sleep(options.sleep) | 
|  | 259 | 
|  | 260             if process_output is None: | 
|  | 261                 # process the output with ``.read()`` call | 
|  | 262                 read = proc.read() | 
|  | 263                 output_processor(read) | 
|  | 264 | 
|  | 265     # correctness tests | 
|  | 266     assert proc.end is not None | 
|  | 267 | 
|  | 268     # print summary | 
|  | 269     output = proc.output | 
|  | 270     n_lines = len(output.splitlines()) | 
|  | 271     print ("{}: {} lines, ran for {} seconds".format(subprocess.list2cmdline(prog), n_lines, proc.runtime())) | 
| 178 | 272 | 
|  | 273 if __name__ == '__main__': | 
| 735 | 274     main() |