home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) import os import re import sys import signal import tempfile pjoin = os.path.join from twisted.internet import reactor, defer from twisted.internet.protocol import ProcessProtocol from twisted.internet.error import ProcessDone, ProcessTerminated from twisted.internet.utils import getProcessOutput from twisted.python import failure, log from IPython.external import argparse from IPython.external import Itpl from IPython.genutils import get_ipython_dir, get_log_dir, get_security_dir, num_cpus from IPython.kernel.fcutil import have_crypto from IPython.iplib import user_setup if os.name == 'posix': rc_suffix = '' else: rc_suffix = '.ini' user_setup(get_ipython_dir(), rc_suffix, mode = 'install', interactive = False) get_log_dir() get_security_dir() from IPython.kernel.config import config_manager as kernel_config_manager from IPython.kernel.error import SecurityError, FileTimeoutError from IPython.kernel.fcutil import have_crypto from IPython.kernel.twistedutil import gatherBoth, wait_for_file from IPython.kernel.util import printer class ProcessStateError(Exception): pass class UnknownStatus(Exception): pass class LauncherProcessProtocol(ProcessProtocol): def __init__(self, process_launcher): self.process_launcher = process_launcher def connectionMade(self): self.process_launcher.fire_start_deferred(self.transport.pid) def processEnded(self, status): value = status.value if isinstance(value, ProcessDone): self.process_launcher.fire_stop_deferred(0) elif isinstance(value, ProcessTerminated): self.process_launcher.fire_stop_deferred({ 'exit_code': value.exitCode, 'signal': value.signal, 'status': value.status }) else: raise UnknownStatus('unknown exit status, this is probably a bug in Twisted') return isinstance(value, ProcessDone) def outReceived(self, data): log.msg(data) def errReceived(self, data): log.err(data) class ProcessLauncher(object): def __init__(self, cmd_and_args): self.cmd = cmd_and_args[0] self.args = cmd_and_args self._reset() def _reset(self): self.process_protocol = None self.pid = None self.start_deferred = None self.stop_deferreds = [] self.state = 'before' def running(self): if self.state == 'running': return True return False running = property(running) def fire_start_deferred(self, pid): self.pid = pid self.state = 'running' log.msg('Process %r has started with pid=%i' % (self.args, pid)) self.start_deferred.callback(pid) def start(self): if self.state == 'before': self.process_protocol = LauncherProcessProtocol(self) self.start_deferred = defer.Deferred() self.process_transport = reactor.spawnProcess(self.process_protocol, self.cmd, self.args, env = os.environ) return self.start_deferred s = 'the process has already been started and has state: %r' % self.state return defer.fail(ProcessStateError(s)) def get_stop_deferred(self): if self.state == 'running' or self.state == 'before': d = defer.Deferred() self.stop_deferreds.append(d) return d s = 'this process is already complete' return defer.fail(ProcessStateError(s)) def fire_stop_deferred(self, exit_code): log.msg('Process %r has stopped with %r' % (self.args, exit_code)) self.state = 'after' for d in self.stop_deferreds: d.callback(exit_code) def signal(self, sig): if self.state == 'running': self.process_transport.signalProcess(sig) def interrupt_then_kill(self, delay = 1): self.signal('INT') reactor.callLater(delay, self.signal, 'KILL') class ControllerLauncher(ProcessLauncher): def __init__(self, extra_args = None): if sys.platform == 'win32': ipcontroller = ipcontroller import IPython.kernel.scripts script_location = ipcontroller.__file__.replace('.pyc', '.py') args = [ sys.executable, '-u', script_location] else: args = [ 'ipcontroller'] self.extra_args = extra_args if extra_args is not None: args.extend(extra_args) ProcessLauncher.__init__(self, args) class EngineLauncher(ProcessLauncher): def __init__(self, extra_args = None): if sys.platform == 'win32': ipengine = ipengine import IPython.kernel.scripts script_location = ipengine.__file__.replace('.pyc', '.py') args = [ sys.executable, '-u', script_location] else: args = [ 'ipengine'] self.extra_args = extra_args if extra_args is not None: args.extend(extra_args) ProcessLauncher.__init__(self, args) class LocalEngineSet(object): def __init__(self, extra_args = None): self.extra_args = extra_args self.launchers = [] def start(self, n): dlist = [] for i in range(n): el = EngineLauncher(extra_args = self.extra_args) d = el.start() self.launchers.append(el) dlist.append(d) dfinal = gatherBoth(dlist, consumeErrors = True) dfinal.addCallback(self._handle_start) return dfinal def _handle_start(self, r): log.msg('Engines started with pids: %r' % r) return r def _handle_stop(self, r): log.msg('Engines received signal: %r' % r) return r def signal(self, sig): dlist = [] for el in self.launchers: d = el.get_stop_deferred() dlist.append(d) el.signal(sig) dfinal = gatherBoth(dlist, consumeErrors = True) dfinal.addCallback(self._handle_stop) return dfinal def interrupt_then_kill(self, delay = 1): dlist = [] for el in self.launchers: d = el.get_stop_deferred() dlist.append(d) el.interrupt_then_kill(delay) dfinal = gatherBoth(dlist, consumeErrors = True) dfinal.addCallback(self._handle_stop) return dfinal class BatchEngineSet(object): submit_command = '' delete_command = '' job_id_regexp = '' def __init__(self, template_file, **kwargs): self.template_file = template_file self.context = { } self.context.update(kwargs) self.batch_file = self.template_file + '-run' def parse_job_id(self, output): m = re.match(self.job_id_regexp, output) if m is not None: job_id = m.group() else: raise Exception("job id couldn't be determined: %s" % output) self.job_id = m is not None log.msg('Job started with job id: %r' % job_id) return job_id def write_batch_script(self, n): self.context['n'] = n template = open(self.template_file, 'r').read() log.msg('Using template for batch script: %s' % self.template_file) script_as_string = Itpl.itplns(template, self.context) log.msg('Writing instantiated batch script: %s' % self.batch_file) f = open(self.batch_file, 'w') f.write(script_as_string) f.close() def handle_error(self, f): f.printTraceback() f.raiseException() def start(self, n): self.write_batch_script(n) d = getProcessOutput(self.submit_command, [ self.batch_file], env = os.environ) d.addCallback(self.parse_job_id) d.addErrback(self.handle_error) return d def kill(self): d = getProcessOutput(self.delete_command, [ self.job_id], env = os.environ) return d class PBSEngineSet(BatchEngineSet): submit_command = 'qsub' delete_command = 'qdel' job_id_regexp = '\\d+' def __init__(self, template_file, **kwargs): BatchEngineSet.__init__(self, template_file, **kwargs) sshx_template = '#!/bin/sh\n"$@" &> /dev/null &\necho $!\n' engine_killer_template = "#!/bin/sh\nps -fu `whoami` | grep '[i]pengine' | awk '{print $2}' | xargs kill -TERM\n" class SSHEngineSet(object): sshx_template = sshx_template engine_killer_template = engine_killer_template def __init__(self, engine_hosts, sshx = None, ipengine = 'ipengine'): self.temp_dir = tempfile.gettempdir() if sshx is not None: self.sshx = sshx else: self.sshx = os.path.join(self.temp_dir, '%s-main-sshx.sh' % os.environ['USER']) f = open(self.sshx, 'w') f.writelines(self.sshx_template) f.close() self.engine_command = ipengine self.engine_hosts = engine_hosts self.engine_killer = os.path.join(self.temp_dir, '%s-local-engine_killer.sh' % os.environ['USER']) f = open(self.engine_killer, 'w') f.writelines(self.engine_killer_template) f.close() def start(self, send_furl = False): dlist = [] for host in self.engine_hosts.keys(): count = self.engine_hosts[host] d = self._start(host, count, send_furl) dlist.append(d) return gatherBoth(dlist, consumeErrors = True) def _start(self, hostname, count = 1, send_furl = False): if send_furl: d = self._scp_furl(hostname) else: d = defer.succeed(None) (None, d.addCallback)((lambda r: self._scp_sshx(hostname))) (None, None, d.addCallback)((lambda r: self._ssh_engine(hostname, count))) return d def _scp_furl(self, hostname): scp_cmd = 'scp ~/.ipython/security/ipcontroller-engine.furl %s:.ipython/security/' % hostname cmd_list = scp_cmd.split() cmd_list[1] = os.path.expanduser(cmd_list[1]) log.msg('Copying furl file: %s' % scp_cmd) d = getProcessOutput(cmd_list[0], cmd_list[1:], env = os.environ) return d def _scp_sshx(self, hostname): scp_cmd = 'scp %s %s:%s/%s-sshx.sh' % (self.sshx, hostname, self.temp_dir, os.environ['USER']) print log.msg('Copying sshx: %s' % scp_cmd) sshx_scp = scp_cmd.split() d = getProcessOutput(sshx_scp[0], sshx_scp[1:], env = os.environ) return d def _ssh_engine(self, hostname, count): exec_engine = 'ssh %s sh %s/%s-sshx.sh %s' % (hostname, self.temp_dir, os.environ['USER'], self.engine_command) cmds = exec_engine.split() dlist = [] log.msg('about to start engines...') for i in range(count): log.msg('Starting engines: %s' % exec_engine) d = getProcessOutput(cmds[0], cmds[1:], env = os.environ) dlist.append(d) return gatherBoth(dlist, consumeErrors = True) def kill(self): dlist = [] for host in self.engine_hosts.keys(): d = self._killall(host) dlist.append(d) return gatherBoth(dlist, consumeErrors = True) def _killall(self, hostname): d = self._scp_engine_killer(hostname) (None, d.addCallback)((lambda r: self._ssh_kill(hostname))) return d def _scp_engine_killer(self, hostname): scp_cmd = 'scp %s %s:%s/%s-engine_killer.sh' % (self.engine_killer, hostname, self.temp_dir, os.environ['USER']) cmds = scp_cmd.split() log.msg('Copying engine_killer: %s' % scp_cmd) d = getProcessOutput(cmds[0], cmds[1:], env = os.environ) return d def _ssh_kill(self, hostname): kill_cmd = 'ssh %s sh %s/%s-engine_killer.sh' % (hostname, self.temp_dir, os.environ['USER']) log.msg('Killing engine: %s' % kill_cmd) kill_cmd = kill_cmd.split() d = getProcessOutput(kill_cmd[0], kill_cmd[1:], env = os.environ) return d def _exec_err(self, r): log.msg(r) def check_security(args, cont_args): if (not (args.x) or not (args.y)) and not have_crypto: log.err("\nOpenSSL/pyOpenSSL is not available, so we can't run in secure mode.\nTry running ipcluster with the -xy flags: ipcluster local -xy -n 4") reactor.stop() return False if args.x: cont_args.append('-x') if args.y: cont_args.append('-y') return True def check_reuse(args, cont_args): if args.r: cont_args.append('-r') if args.client_port == 0 or args.engine_port == 0: log.err('\nTo reuse FURL files, you must also set the client and engine ports using\nthe --client-port and --engine-port options.') reactor.stop() return False cont_args.append('--client-port=%i' % args.client_port) cont_args.append('--engine-port=%i' % args.engine_port) return True def _err_and_stop(f): log.err(f) reactor.stop() def _delay_start(cont_pid, start_engines, furl_file, reuse): if not reuse: if os.path.isfile(furl_file): os.unlink(furl_file) log.msg('Waiting for controller to finish starting...') d = wait_for_file(furl_file, delay = 0.2, max_tries = 50) d.addCallback((lambda _: log.msg('Controller started'))) (None, d.addCallback)((lambda _: start_engines(cont_pid))) return d def main_local(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir, 'ipcontroller')) if not check_security(args, cont_args): return None if not check_reuse(args, cont_args): return None cl = ControllerLauncher(extra_args = cont_args) dstart = cl.start() def start_engines(cont_pid): engine_args = [] engine_args.append('--logfile=%s' % pjoin(args.logdir, 'ipengine%s-' % cont_pid)) eset = LocalEngineSet(extra_args = engine_args) def shutdown(signum, frame): log.msg('Stopping local cluster') eset.interrupt_then_kill(0.5) cl.interrupt_then_kill(0.5) reactor.callLater(1, reactor.stop) signal.signal(signal.SIGINT, shutdown) d = eset.start(args.n) return d config = kernel_config_manager.get_config_obj() furl_file = config['controller']['engine_furl_file'] dstart.addCallback(_delay_start, start_engines, furl_file, args.r) dstart.addErrback(_err_and_stop) def main_mpi(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir, 'ipcontroller')) if not check_security(args, cont_args): return None if not check_reuse(args, cont_args): return None cl = ControllerLauncher(extra_args = cont_args) dstart = cl.start() def start_engines(cont_pid): raw_args = [ args.cmd] raw_args.extend([ '-n', str(args.n)]) raw_args.append('ipengine') raw_args.append('-l') raw_args.append(pjoin(args.logdir, 'ipengine%s-' % cont_pid)) if args.mpi: raw_args.append('--mpi=%s' % args.mpi) eset = ProcessLauncher(raw_args) def shutdown(signum, frame): log.msg('Stopping local cluster') eset.interrupt_then_kill(1) cl.interrupt_then_kill(1) reactor.callLater(2, reactor.stop) signal.signal(signal.SIGINT, shutdown) d = eset.start() return d config = kernel_config_manager.get_config_obj() furl_file = config['controller']['engine_furl_file'] dstart.addCallback(_delay_start, start_engines, furl_file, args.r) dstart.addErrback(_err_and_stop) def main_pbs(args): cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir, 'ipcontroller')) if not check_security(args, cont_args): return None if not check_reuse(args, cont_args): return None cl = ControllerLauncher(extra_args = cont_args) dstart = cl.start() def start_engines(r): pbs_set = PBSEngineSet(args.pbsscript) def shutdown(signum, frame): log.msg('Stopping pbs cluster') d = pbs_set.kill() (d.addBoth,)((lambda _: cl.interrupt_then_kill(1))) d.addBoth((lambda _: reactor.callLater(2, reactor.stop))) signal.signal(signal.SIGINT, shutdown) d = pbs_set.start(args.n) return d config = kernel_config_manager.get_config_obj() furl_file = config['controller']['engine_furl_file'] dstart.addCallback(_delay_start, start_engines, furl_file, args.r) dstart.addErrback(_err_and_stop) def main_ssh(args): clusterfile = { } execfile(args.clusterfile, clusterfile) if not clusterfile.has_key('send_furl'): clusterfile['send_furl'] = False cont_args = [] cont_args.append('--logfile=%s' % pjoin(args.logdir, 'ipcontroller')) if not check_security(args, cont_args): return None if not check_reuse(args, cont_args): return None cl = ControllerLauncher(extra_args = cont_args) dstart = cl.start() def start_engines(cont_pid): ssh_set = SSHEngineSet(clusterfile['engines'], sshx = args.sshx) def shutdown(signum, frame): d = ssh_set.kill() cl.interrupt_then_kill(1) reactor.callLater(2, reactor.stop) signal.signal(signal.SIGINT, shutdown) d = ssh_set.start(clusterfile['send_furl']) return d config = kernel_config_manager.get_config_obj() furl_file = config['controller']['engine_furl_file'] dstart.addCallback(_delay_start, start_engines, furl_file, args.r) dstart.addErrback(_err_and_stop) def get_args(): base_parser = argparse.ArgumentParser(add_help = False) base_parser.add_argument('-r', action = 'store_true', dest = 'r', help = 'try to reuse FURL files. Use with --client-port and --engine-port') base_parser.add_argument('--client-port', type = int, dest = 'client_port', help = 'the port the controller will listen on for client connections', default = 0) base_parser.add_argument('--engine-port', type = int, dest = 'engine_port', help = 'the port the controller will listen on for engine connections', default = 0) base_parser.add_argument('-x', action = 'store_true', dest = 'x', help = 'turn off client security') base_parser.add_argument('-y', action = 'store_true', dest = 'y', help = 'turn off engine security') base_parser.add_argument('--logdir', type = str, dest = 'logdir', help = 'directory to put log files (default=$IPYTHONDIR/log)', default = pjoin(get_ipython_dir(), 'log')) base_parser.add_argument('-n', '--num', type = int, dest = 'n', default = 2, help = 'the number of engines to start') parser = argparse.ArgumentParser(description = 'IPython cluster startup. This starts a controller and engines using various approaches. Use the IPYTHONDIR environment variable to change your IPython directory from the default of .ipython or _ipython. The log and security subdirectories of your IPython directory will be used by this script for log files and security files.') subparsers = parser.add_subparsers(help = 'available cluster types. For help, do "ipcluster TYPE --help"') parser_local = subparsers.add_parser('local', help = 'run a local cluster', parents = [ base_parser]) parser_local.set_defaults(func = main_local) parser_mpirun = subparsers.add_parser('mpirun', help = 'run a cluster using mpirun (mpiexec also works)', parents = [ base_parser]) parser_mpirun.add_argument('--mpi', type = str, dest = 'mpi', help = 'how to call MPI_Init (default=mpi4py)') parser_mpirun.set_defaults(func = main_mpi, cmd = 'mpirun') parser_mpiexec = subparsers.add_parser('mpiexec', help = 'run a cluster using mpiexec (mpirun also works)', parents = [ base_parser]) parser_mpiexec.add_argument('--mpi', type = str, dest = 'mpi', help = 'how to call MPI_Init (default=mpi4py)') parser_mpiexec.set_defaults(func = main_mpi, cmd = 'mpiexec') parser_pbs = subparsers.add_parser('pbs', help = 'run a pbs cluster', parents = [ base_parser]) parser_pbs.add_argument('--pbs-script', type = str, dest = 'pbsscript', help = 'PBS script template', default = 'pbs.template') parser_pbs.set_defaults(func = main_pbs) parser_ssh = subparsers.add_parser('ssh', help = 'run a cluster using ssh, should have ssh-keys setup', parents = [ base_parser]) parser_ssh.add_argument('--clusterfile', type = str, dest = 'clusterfile', help = 'python file describing the cluster', default = 'clusterfile.py') parser_ssh.add_argument('--sshx', type = str, dest = 'sshx', help = 'sshx launcher helper') parser_ssh.set_defaults(func = main_ssh) args = parser.parse_args() return args def main(): args = get_args() reactor.callWhenRunning(args.func, args) log.startLogging(sys.stdout) reactor.run() if __name__ == '__main__': main()