home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.6) import __builtin__ import __main__ import Queue import inspect import os import sys import thread import threading import time from signal import signal, SIGINT try: import ctypes HAS_CTYPES = True except ImportError: HAS_CTYPES = False import IPython from IPython import ultraTB, ipapi from IPython.Magic import Magic from IPython.genutils import Term, warn, error, flag_calls, ask_yes_no from IPython.iplib import InteractiveShell from IPython.ipmaker import make_IPython from IPython.ipstruct import Struct from IPython.testing import decorators as testdec KBINT = False USE_TK = False MAIN_THREAD_ID = thread.get_ident() CODE_RUN = None GUI_TIMEOUT = 10 class IPShell: def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 1, shell_class = InteractiveShell): self.IP = make_IPython(argv, user_ns = user_ns, user_global_ns = user_global_ns, debug = debug, shell_class = shell_class) def mainloop(self, sys_exit = 0, banner = None): self.IP.mainloop(banner) if sys_exit: sys.exit() def kill_embedded(self, parameter_s = ''): kill = ask_yes_no('Are you sure you want to kill this embedded instance (y/n)? [y/N] ', 'n') if kill: self.shell.embedded_active = False print 'This embedded IPython will not reactivate anymore once you exit.' class IPShellEmbed: def __init__(self, argv = None, banner = '', exit_msg = None, rc_override = None, user_ns = None): self.set_banner(banner) self.set_exit_msg(exit_msg) self.set_dummy_mode(0) self.sys_displayhook_ori = sys.displayhook try: self.sys_ipcompleter_ori = sys.ipcompleter except: pass self.IP = make_IPython(argv, rc_override = rc_override, embedded = True, user_ns = user_ns) ip = ipapi.IPApi(self.IP) ip.expose_magic('kill_embedded', kill_embedded) self.sys_displayhook_embed = sys.displayhook sys.displayhook = self.sys_displayhook_ori sys.excepthook = ultraTB.FormattedTB(color_scheme = self.IP.rc.colors, mode = self.IP.rc.xmode, call_pdb = self.IP.rc.pdb) self.restore_system_completer() def restore_system_completer(self): try: self.IP.readline.set_completer(self.sys_ipcompleter_ori) sys.ipcompleter = self.sys_ipcompleter_ori except: pass def __call__(self, header = '', local_ns = None, global_ns = None, dummy = None): if not self.IP.embedded_active: return None self.IP.exit_now = False if (dummy or dummy != 0) and self._IPShellEmbed__dummy_mode: return None sys.displayhook = self.sys_displayhook_embed if self.banner and header: format = '%s\n%s\n' else: format = '%s%s\n' banner = format % (self.banner, header) self.IP.embed_mainloop(banner, local_ns, global_ns, stack_depth = 1) if self.exit_msg: print self.exit_msg sys.displayhook = self.sys_displayhook_ori self.restore_system_completer() def set_dummy_mode(self, dummy): if dummy not in [ 0, 1, False, True]: raise ValueError, 'dummy parameter must be boolean' dummy not in [ 0, 1, False, True] self._IPShellEmbed__dummy_mode = dummy def get_dummy_mode(self): return self._IPShellEmbed__dummy_mode def set_banner(self, banner): self.banner = banner def set_exit_msg(self, exit_msg): self.exit_msg = exit_msg if HAS_CTYPES: def _async_raise(tid, exctype): if not inspect.isclass(exctype): raise TypeError('Only types can be raised (not instances)') inspect.isclass(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), ctypes.py_object(exctype)) if res == 0: raise ValueError('invalid thread id') res == 0 if res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0) raise SystemError('PyThreadState_SetAsyncExc failed') res != 1 def sigint_handler(signum, stack_frame): global KBINT if CODE_RUN: _async_raise(MAIN_THREAD_ID, KeyboardInterrupt) else: KBINT = True print '\nKeyboardInterrupt - Press <Enter> to continue.', Term.cout.flush() else: def sigint_handler(signum, stack_frame): global KBINT print '\nKeyboardInterrupt - Press <Enter> to continue.', Term.cout.flush() KBINT = True class MTInteractiveShell(InteractiveShell): isthreaded = True def __init__(self, name, usage = None, rc = Struct(opts = None, args = None), user_ns = None, user_global_ns = None, banner2 = '', gui_timeout = GUI_TIMEOUT, **kw): InteractiveShell.__init__(self, name, usage, rc, user_ns, user_global_ns, banner2) self.gui_timeout = gui_timeout self.code_queue = Queue.Queue() self._kill = None on_kill = kw.get('on_kill', []) for t in on_kill: if not callable(t): raise TypeError, 'on_kill must be a list of callables' callable(t) self.on_kill = on_kill self.worker_ident = None def runsource(self, source, filename = '<input>', symbol = 'single'): global KBINT if KBINT: KBINT = False return False if self._kill: return True try: code = self.compile(source, filename, symbol) except (OverflowError, SyntaxError, ValueError): self._kill self._kill KBINT self.showsyntaxerror(filename) return False if code is None: return True if self.worker_ident is None or self.worker_ident == thread.get_ident(): InteractiveShell.runcode(self, code) return False completed_ev = threading.Event() received_ev = threading.Event() self.code_queue.put((code, completed_ev, received_ev)) received_ev.wait(self.gui_timeout) return False def runcode(self): global CODE_RUN, CODE_RUN self.worker_ident = thread.get_ident() if self._kill: print >>Term.cout, 'Closing threads...', Term.cout.flush() for tokill in self.on_kill: tokill() print >>Term.cout, 'Done.' self._kill.set() return True try: signal(SIGINT, sigint_handler) except SystemError: self._kill self._kill except: self._kill code_to_run = None while None: try: (code_to_run, completed_ev, received_ev) = self.code_queue.get_nowait() except Queue.Empty: self._kill self._kill break except: self._kill try: CODE_RUN = True InteractiveShell.runcode(self, code_to_run) except KeyboardInterrupt: self._kill self._kill print 'Keyboard interrupted in mainloop' while not self.code_queue.empty(): (code, ev1, ev2) = self.code_queue.get_nowait() ev1.set() ev2.set() break except: self._kill finally: CODE_RUN = False completed_ev.set() continue return True def kill(self): self._kill = threading.Event() self._kill.wait() class MatplotlibShellBase: def _matplotlib_config(self, name, user_ns, user_global_ns = None): import matplotlib backends = backends import matplotlib matplotlib.interactive(True) def use(arg): if arg in backends.interactive_bk and arg != self.mpl_backend: m = 'invalid matplotlib backend switch.\nThis script attempted to switch to the interactive backend: `%s`\nYour current choice of interactive backend is: `%s`\n\nSwitching interactive matplotlib backends at runtime\nwould crash the python interpreter, and IPython has blocked it.\n\nYou need to either change your choice of matplotlib backend\nby editing your .matplotlibrc file, or run this script as a \nstandalone file from the command line, not using IPython.\n' % (arg, self.mpl_backend) raise RuntimeError, m arg != self.mpl_backend self.mpl_use(arg) self.mpl_use._called = True self.matplotlib = matplotlib self.mpl_backend = matplotlib.rcParams['backend'] self.mpl_use = matplotlib.use self.mpl_use._called = False matplotlib.use = use import matplotlib.pylab as pylab self.pylab = pylab self.pylab.show._needmain = False self.pylab.draw_if_interactive = flag_calls(self.pylab.draw_if_interactive) (user_ns, user_global_ns) = IPython.ipapi.make_user_namespaces(user_ns, user_global_ns) exec 'import numpy\nimport numpy as np\nimport matplotlib\nimport matplotlib.pylab as pylab\ntry:\n import matplotlib.pyplot as plt\nexcept ImportError:\n pass\n' in user_ns b = "\n Welcome to pylab, a matplotlib-based Python environment.\n For more information, type 'help(pylab)'.\n" return (user_ns, user_global_ns, b) def mplot_exec(self, fname, *where, **kw): isInteractive = self.matplotlib.rcParams['interactive'] self.matplotlib.interactive(False) self.safe_execfile(fname, *where, **kw) self.matplotlib.interactive(isInteractive) if self.pylab.draw_if_interactive.called: self.pylab.draw() self.pylab.draw_if_interactive.called = False if self.mpl_use._called: self.matplotlib.rcParams['backend'] = self.mpl_backend def magic_run(self, parameter_s = ''): Magic.magic_run(self, parameter_s, runner = self.mplot_exec) magic_run = testdec.skip_doctest(magic_run) magic_run.__doc__ = '%s\n%s' % (Magic.magic_run.__doc__, '\n *** Modified %run for Matplotlib, with proper interactive handling ***') class MatplotlibShell(MatplotlibShellBase, InteractiveShell): def __init__(self, name, usage = None, rc = Struct(opts = None, args = None), user_ns = None, user_global_ns = None, **kw): (user_ns, user_global_ns, b2) = self._matplotlib_config(name, user_ns, user_global_ns) InteractiveShell.__init__(self, name, usage, rc, user_ns, user_global_ns, banner2 = b2, **kw) class MatplotlibMTShell(MatplotlibShellBase, MTInteractiveShell): def __init__(self, name, usage = None, rc = Struct(opts = None, args = None), user_ns = None, user_global_ns = None, **kw): (user_ns, user_global_ns, b2) = self._matplotlib_config(name, user_ns, user_global_ns) MTInteractiveShell.__init__(self, name, usage, rc, user_ns, user_global_ns, banner2 = b2, **kw) def get_tk(): if not USE_TK or sys.modules.has_key('Tkinter'): return None try: import Tkinter except ImportError: sys.modules.has_key('Tkinter') sys.modules.has_key('Tkinter') return None hijack_tk() r = Tkinter.Tk() r.withdraw() return r def hijack_tk(): def misc_mainloop(self, n = 0): pass def tkinter_mainloop(n = 0): pass import Tkinter Tkinter.Misc.mainloop = misc_mainloop Tkinter.mainloop = tkinter_mainloop def update_tk(tk): if tk: tk.update() def hijack_wx(): def dummy_mainloop(*args, **kw): pass try: import wx except ImportError: import wxPython as wx ver = wx.__version__ orig_mainloop = None if ver[:3] >= '2.5': import wx if hasattr(wx, '_core_'): core = getattr(wx, '_core_') elif hasattr(wx, '_core'): core = getattr(wx, '_core') else: raise AttributeError('Could not find wx core module') orig_mainloop = hasattr(wx, '_core_').PyApp_MainLoop core.PyApp_MainLoop = dummy_mainloop elif ver[:3] == '2.4': orig_mainloop = wx.wxc.wxPyApp_MainLoop wx.wxc.wxPyApp_MainLoop = dummy_mainloop else: warn('Unable to find either wxPython version 2.4 or >= 2.5.') return orig_mainloop def hijack_gtk(): def dummy_mainloop(*args, **kw): pass import gtk if gtk.pygtk_version >= (2, 4, 0): orig_mainloop = gtk.main else: orig_mainloop = gtk.mainloop gtk.mainloop = dummy_mainloop gtk.main = dummy_mainloop return orig_mainloop def hijack_qt(): def dummy_mainloop(*args, **kw): pass import qt orig_mainloop = qt.qApp.exec_loop qt.qApp.exec_loop = dummy_mainloop qt.QApplication.exec_loop = dummy_mainloop return orig_mainloop def hijack_qt4(): def dummy_mainloop(*args, **kw): pass QtGui = QtGui QtCore = QtCore import PyQt4 orig_mainloop = QtGui.qApp.exec_ QtGui.qApp.exec_ = dummy_mainloop QtGui.QApplication.exec_ = dummy_mainloop QtCore.QCoreApplication.exec_ = dummy_mainloop return orig_mainloop class IPThread(threading.Thread): def run(self): self.IP.mainloop(self._banner) self.IP.kill() class IPShellGTK(IPThread): TIMEOUT = 100 def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 1, shell_class = MTInteractiveShell): import gtk try: gtk.set_interactive(False) print 'Your PyGtk has set_interactive(), so you can use the' print 'more stable single-threaded Gtk mode.' print 'See https://bugs.launchpad.net/ipython/+bug/270856' except AttributeError: pass self.gtk = gtk self.gtk_mainloop = hijack_gtk() self.tk = get_tk() if gtk.pygtk_version >= (2, 4, 0): mainquit = self.gtk.main_quit else: mainquit = self.gtk.mainquit self.IP = make_IPython(argv, user_ns = user_ns, user_global_ns = user_global_ns, debug = debug, shell_class = shell_class, on_kill = [ mainquit]) self._banner = None threading.Thread.__init__(self) def mainloop(self, sys_exit = 0, banner = None): self._banner = banner if self.gtk.pygtk_version >= (2, 4, 0): import gobject gobject.idle_add(self.on_timer) else: self.gtk.idle_add(self.on_timer) if sys.platform != 'win32': try: if self.gtk.gtk_version[0] >= 2: self.gtk.gdk.threads_init() except AttributeError: pass except RuntimeError: error('Your pyGTK likely has not been compiled with threading support.\nThe exception printout is below.\nYou can either rebuild pyGTK with threads, or try using \nmatplotlib with a different backend (like Tk or WX).\nNote that matplotlib will most likely not work in its current state!') self.IP.InteractiveTB() except: None<EXCEPTION MATCH>AttributeError None<EXCEPTION MATCH>AttributeError self.start() self.gtk.gdk.threads_enter() self.gtk_mainloop() self.gtk.gdk.threads_leave() self.join() def on_timer(self): update_tk(self.tk) self.IP.runcode() time.sleep(0.01) return True class IPShellWX(IPThread): TIMEOUT = 100 def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 1, shell_class = MTInteractiveShell): self.IP = make_IPython(argv, user_ns = user_ns, user_global_ns = user_global_ns, debug = debug, shell_class = shell_class, on_kill = [ self.wxexit]) wantedwxversion = self.IP.rc.wxversion if wantedwxversion != '0': try: import wxversion except ImportError: error('The wxversion module is needed for WX version selection') try: wxversion.select(wantedwxversion) self.IP.InteractiveTB() error('Requested wxPython version %s could not be loaded' % wantedwxversion) import wx threading.Thread.__init__(self) self.wx = wx self.wx_mainloop = hijack_wx() self.tk = get_tk() self._banner = None self.app = None def wxexit(self, *args): if self.app is not None: self.app.agent.timer.Stop() self.app.ExitMainLoop() def mainloop(self, sys_exit = 0, banner = None): self._banner = banner self.start() class TimerAgent((self.wx.MiniFrame,)): wx = self.wx IP = self.IP tk = self.tk def __init__(self, parent, interval): style = self.wx.DEFAULT_FRAME_STYLE | self.wx.TINY_CAPTION_HORIZ self.wx.MiniFrame.__init__(self, parent, -1, ' ', pos = (200, 200), size = (100, 100), style = style) self.Show(False) self.interval = interval self.timerId = self.wx.NewId() def StartWork(self): self.timer = self.wx.Timer(self, self.timerId) self.wx.EVT_TIMER(self, self.timerId, self.OnTimer) self.timer.Start(self.interval) def OnTimer(self, event): update_tk(self.tk) self.IP.runcode() class App('App', (self.wx.App,)): wx = self.wx TIMEOUT = self.TIMEOUT def OnInit(self): self.agent = TimerAgent(None, self.TIMEOUT) self.agent.Show(False) self.agent.StartWork() return True self.app = App(redirect = False) self.wx_mainloop(self.app) self.join() class IPShellQt(IPThread): TIMEOUT = 100 def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 0, shell_class = MTInteractiveShell): import qt self.exec_loop = hijack_qt() self.tk = get_tk() self.IP = make_IPython(argv, user_ns = user_ns, user_global_ns = user_global_ns, debug = debug, shell_class = shell_class, on_kill = [ qt.qApp.exit]) self._banner = None threading.Thread.__init__(self) def mainloop(self, sys_exit = 0, banner = None): import qt self._banner = banner if qt.QApplication.startingUp(): a = qt.QApplication(sys.argv) self.timer = qt.QTimer() qt.QObject.connect(self.timer, qt.SIGNAL('timeout()'), self.on_timer) self.start() self.timer.start(self.TIMEOUT, True) while True: if self.IP._kill: break self.exec_loop() self.join() def on_timer(self): update_tk(self.tk) result = self.IP.runcode() self.timer.start(self.TIMEOUT, True) return result class IPShellQt4(IPThread): TIMEOUT = 100 def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 0, shell_class = MTInteractiveShell): QtCore = QtCore QtGui = QtGui import PyQt4 try: QtCore.pyqtRemoveInputHook() except AttributeError: pass if QtCore.PYQT_VERSION_STR == '4.3': warn('PyQt4 version 4.3 detected.\nIf you experience repeated threading warnings, please update PyQt4.\n') self.exec_ = hijack_qt4() self.tk = get_tk() self.IP = make_IPython(argv, user_ns = user_ns, user_global_ns = user_global_ns, debug = debug, shell_class = shell_class, on_kill = [ QtGui.qApp.exit]) self._banner = None threading.Thread.__init__(self) def mainloop(self, sys_exit = 0, banner = None): QtCore = QtCore QtGui = QtGui import PyQt4 self._banner = banner if QtGui.QApplication.startingUp(): a = QtGui.QApplication(sys.argv) self.timer = QtCore.QTimer() QtCore.QObject.connect(self.timer, QtCore.SIGNAL('timeout()'), self.on_timer) self.start() self.timer.start(self.TIMEOUT) while True: if self.IP._kill: break self.exec_() self.join() def on_timer(self): update_tk(self.tk) result = self.IP.runcode() self.timer.start(self.TIMEOUT) return result def _load_pylab(user_ns): ip = IPython.ipapi.get() if ip.options.pylab_import_all: ip.ex('from matplotlib.pylab import *') ip.IP.user_config_ns.update(ip.user_ns) class IPShellMatplotlib(IPShell): def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 1): IPShell.__init__(self, argv, user_ns, user_global_ns, debug, shell_class = MatplotlibShell) _load_pylab(self.IP.user_ns) class IPShellMatplotlibGTK(IPShellGTK): def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 1): IPShellGTK.__init__(self, argv, user_ns, user_global_ns, debug, shell_class = MatplotlibMTShell) _load_pylab(self.IP.user_ns) class IPShellMatplotlibWX(IPShellWX): def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 1): IPShellWX.__init__(self, argv, user_ns, user_global_ns, debug, shell_class = MatplotlibMTShell) _load_pylab(self.IP.user_ns) class IPShellMatplotlibQt(IPShellQt): def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 1): IPShellQt.__init__(self, argv, user_ns, user_global_ns, debug, shell_class = MatplotlibMTShell) _load_pylab(self.IP.user_ns) class IPShellMatplotlibQt4(IPShellQt4): def __init__(self, argv = None, user_ns = None, user_global_ns = None, debug = 1): IPShellQt4.__init__(self, argv, user_ns, user_global_ns, debug, shell_class = MatplotlibMTShell) _load_pylab(self.IP.user_ns) def _select_shell(argv): global USE_TK mpl_shell = { 'gthread': IPShellMatplotlibGTK, 'wthread': IPShellMatplotlibWX, 'qthread': IPShellMatplotlibQt, 'q4thread': IPShellMatplotlibQt4, 'tkthread': IPShellMatplotlib } th_shell = { 'gthread': IPShellGTK, 'wthread': IPShellWX, 'qthread': IPShellQt, 'q4thread': IPShellQt4, 'tkthread': IPShell } backends = { 'gthread': 'GTKAgg', 'wthread': 'WXAgg', 'qthread': 'QtAgg', 'q4thread': 'Qt4Agg', 'tkthread': 'TkAgg' } all_opts = set([ 'tk', 'pylab', 'gthread', 'qthread', 'q4thread', 'wthread', 'tkthread']) user_opts = []([ s.replace('-', '') for s in argv[:3] ]) special_opts = user_opts & all_opts if 'pylab' in special_opts: try: import matplotlib except ImportError: None if 'tk' in special_opts else set None if 'tk' in special_opts else set error('matplotlib could NOT be imported! Starting normal IPython.') return IPShell special_opts.remove('pylab') if special_opts: th_mode = special_opts.pop() matplotlib.rcParams['backend'] = backends[th_mode] else: backend = matplotlib.rcParams['backend'] if backend.startswith('GTK'): th_mode = 'gthread' elif backend.startswith('WX'): th_mode = 'wthread' elif backend.startswith('Qt4'): th_mode = 'q4thread' elif backend.startswith('Qt'): th_mode = 'qthread' else: th_mode = 'tkthread' return mpl_shell[th_mode] try: th_mode = special_opts.pop() except KeyError: 'pylab' in special_opts 'pylab' in special_opts None if 'tk' in special_opts else set th_mode = 'tkthread' except: 'pylab' in special_opts return th_shell[th_mode] def start(user_ns = None): shell = _select_shell(sys.argv) return shell(user_ns = user_ns) IPythonShell = IPShell IPythonShellEmbed = IPShellEmbed