home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- __docformat__ = 'restructuredtext en'
- from types import FunctionType
- import __builtin__
- import codeop
- import compiler
- import sys
- import traceback
- from IPython import ultraTB
- from IPython.kernel.core.display_trap import DisplayTrap
- from IPython.kernel.core.macro import Macro
- from IPython.kernel.core.prompts import CachedOutput
- from IPython.kernel.core.traceback_trap import TracebackTrap
- from IPython.kernel.core.util import Bunch, system_shell
- from IPython.external.Itpl import ItplNS
- COMPILER_ERROR = 'error'
- INCOMPLETE_INPUT = 'incomplete'
- COMPLETE_INPUT = 'complete'
- rc = Bunch()
- rc.cache_size = 100
- rc.pprint = True
- rc.separate_in = '\n'
- rc.separate_out = '\n'
- rc.separate_out2 = ''
- rc.prompt_in1 = 'In [\\#]: '
- rc.prompt_in2 = ' .\\\\D.: '
- rc.prompt_out = ''
- rc.prompts_pad_left = False
-
- def default_display_formatters():
- PPrintDisplayFormatter = PPrintDisplayFormatter
- ReprDisplayFormatter = ReprDisplayFormatter
- import display_formatter
- return [
- PPrintDisplayFormatter(),
- ReprDisplayFormatter()]
-
-
- def default_traceback_formatters():
- PlainTracebackFormatter = PlainTracebackFormatter
- import traceback_formatter
- return [
- PlainTracebackFormatter()]
-
-
- class NotDefined(object):
- pass
-
-
- class Interpreter(object):
-
- def __init__(self, user_ns = None, global_ns = None, translator = None, magic = None, display_formatters = None, traceback_formatters = None, output_trap = None, history = None, message_cache = None, filename = '<string>', config = None):
- local_ns = user_ns
- if local_ns is None:
- local_ns = { }
-
- self.user_ns = local_ns
- if global_ns is None:
- global_ns = { }
-
- self.user_global_ns = global_ns
- self.translator = translator
- if magic is None:
- Magic = Magic
- import IPython.kernel.core.magic
- magic = Magic(self)
-
- self.magic = magic
- if display_formatters is None:
- display_formatters = default_display_formatters()
-
- self.display_formatters = display_formatters
- if traceback_formatters is None:
- traceback_formatters = default_traceback_formatters()
-
- self.traceback_formatters = traceback_formatters
- if output_trap is None:
- OutputTrap = OutputTrap
- import IPython.kernel.core.output_trap
- output_trap = OutputTrap()
-
- self.output_trap = output_trap
- if history is None:
- InterpreterHistory = InterpreterHistory
- import IPython.kernel.core.history
- history = InterpreterHistory()
-
- self.history = history
- self.get_history_item = history.get_history_item
- self.get_history_input_cache = history.get_input_cache
- self.get_history_input_after = history.get_input_after
- if message_cache is None:
- SimpleMessageCache = SimpleMessageCache
- import IPython.kernel.core.message_cache
- message_cache = SimpleMessageCache()
-
- self.message_cache = message_cache
- self.filename = filename
- if config is None:
- config = Bunch(ESC_MAGIC = '%')
-
- self.config = config
- self.display_trap = DisplayTrap(formatters = self.display_formatters, callbacks = [
- self._possible_macro])
- self.traceback_trap = TracebackTrap(formatters = self.traceback_formatters)
- self.tbHandler = ultraTB.FormattedTB(color_scheme = 'NoColor', mode = 'Context', tb_offset = 2)
- self.command_compiler = codeop.CommandCompiler()
- self.raw_input_builtin = raw_input
- self.input_builtin = input
- self.current_cell_number = 1
- self.outputcache = CachedOutput(self, rc.cache_size, rc.pprint, input_sep = rc.separate_in, output_sep = rc.separate_out, output_sep2 = rc.separate_out2, ps1 = rc.prompt_in1, ps2 = rc.prompt_in2, ps_out = rc.prompt_out, pad_left = rc.prompts_pad_left)
- sys.ps1 = self.outputcache.prompt1.p_str
- sys.ps2 = self.outputcache.prompt2.p_str
- self.message = None
- self.setup_namespace()
-
-
- def formatTraceback(self, et, ev, tb, message = ''):
- tbinfo = self.tbHandler.text(et, ev, tb)
- ev._ipython_traceback_text = tbinfo
- return (et, ev, tb)
-
-
- def execute(self, commands, raiseException = True):
- message = self.setup_message()
- user_input = dict(raw = commands)
- if self.translator is not None:
- python = self.translator(commands, message)
- if python is None:
- return message
- else:
- python = commands
- user_input['translated'] = python
- message['input'] = user_input
- self.message = message
- self.set_traps()
- status = self.execute_python(python)
- self.unset_traps()
- self.message = None
- if self.history is not None:
- self.history.update_history(self, python)
-
- self.output_trap.add_to_message(message)
- self.output_trap.clear()
- self.display_trap.add_to_message(message)
- self.display_trap.clear()
- self.traceback_trap.add_to_message(message)
- einfo = self.traceback_trap.args
- self.traceback_trap.clear()
- self.message_cache.add_message(self.current_cell_number, message)
- self.current_cell_number += 1
- if raiseException and einfo:
- raise einfo[0], einfo[1], einfo[2]
- einfo
- return message
-
-
- def generate_prompt(self, is_continuation):
- if is_continuation:
- return str(self.outputcache.prompt2)
- return str(self.outputcache.prompt1)
-
-
- def execute_python(self, python):
-
- try:
- commands = self.split_commands(python)
- except (SyntaxError, IndentationError):
- e = None
- self.traceback_trap.args = sys.exc_info()
- self.pack_exception(self.message, e)
- return None
-
- for cmd in commands:
-
- try:
- code = self.command_compiler(cmd, self.filename, 'single')
- except (SyntaxError, OverflowError, ValueError):
- e = None
- self.traceback_trap.args = sys.exc_info()
- self.pack_exception(self.message, e)
- return None
-
- self.execute_block(code)
-
-
-
- def execute_block(self, code):
- outflag = 1
-
- try:
- exec code in self.user_ns
- outflag = 0
- except SystemExit:
- self.resetbuffer()
- self.traceback_trap.args = sys.exc_info()
- except:
- self.traceback_trap.args = sys.exc_info()
-
- return outflag
-
-
- def execute_macro(self, macro):
- python = macro.value
- if self.translator is not None:
- python = self.translator(python)
-
- self.execute_python(python)
-
-
- def getCommand(self, i = None):
- return self.message_cache.get_message(i)
-
-
- def reset(self):
- self.user_ns.clear()
- self.setup_namespace()
-
-
- def complete(self, line, text = None, pos = None):
- raise NotImplementedError
-
-
- def push(self, ns):
- self.user_ns.update(ns)
-
-
- def push_function(self, ns):
- new_kwds = { }
- for k, v in ns.iteritems():
- if not isinstance(v, FunctionType):
- raise TypeError('function object expected')
- isinstance(v, FunctionType)
- new_kwds[k] = FunctionType(v.func_code, self.user_ns)
-
- self.user_ns.update(new_kwds)
-
-
- def pack_exception(self, message, exc):
- message['exception'] = exc.__class__
- message['exception_value'] = traceback.format_exception_only(exc.__class__, exc)
-
-
- def feed_block(self, source, filename = '<input>', symbol = 'single'):
- self.message = self.setup_message()
-
- try:
- code = self.command_compiler(source, filename, symbol)
- except (OverflowError, SyntaxError, IndentationError, ValueError):
- e = None
- self.traceback_trap.args = sys.exc_info()
- self.pack_exception(self.message, e)
- return (COMPILER_ERROR, False)
-
- if code is None:
- last_two = source.rsplit('\n', 2)[-2:]
- print 'last two:', last_two
- if len(last_two) == 2 and all((lambda .0: for s in .0:
- s.isspace())(last_two)):
- return (COMPLETE_INPUT, False)
- return (INCOMPLETE_INPUT, True)
- code is None
- return (COMPLETE_INPUT, False)
-
-
- def pull(self, keys):
- if isinstance(keys, str):
- result = self.user_ns.get(keys, NotDefined())
- if isinstance(result, NotDefined):
- raise NameError('name %s is not defined' % keys)
- isinstance(result, NotDefined)
- elif isinstance(keys, (list, tuple)):
- result = []
- for key in keys:
- if not isinstance(key, str):
- raise TypeError('objects must be keyed by strings.')
- isinstance(key, str)
- r = self.user_ns.get(key, NotDefined())
- if isinstance(r, NotDefined):
- raise NameError('name %s is not defined' % key)
- isinstance(r, NotDefined)
- result.append(r)
- if len(keys) == 1:
- result = result[0]
- continue
-
- else:
- raise TypeError('keys must be a strong or a list/tuple of strings')
- return isinstance(keys, str)
-
-
- def pull_function(self, keys):
- return self.pull(keys)
-
-
- def ipsystem(self, command):
- command = self.var_expand(command)
- system_shell(command, header = 'IPython system call: ', verbose = self.rc.system_verbose)
-
-
- def ipmagic(self, arg_string):
- raise NotImplementedError('Not ported yet')
- args = arg_string.split(' ', 1)
- magic_name = args[0]
- magic_name = magic_name.lstrip(self.config.ESC_MAGIC)
-
- try:
- magic_args = args[1]
- except IndexError:
- magic_args = ''
-
- fn = getattr(self.magic, 'magic_' + magic_name, None)
- if fn is None:
- self.error('Magic function `%s` not found.' % magic_name)
- else:
- magic_args = self.var_expand(magic_args)
- return fn(magic_args)
- return fn is None
-
-
- def setup_message(self):
- return dict(number = self.current_cell_number)
-
-
- def setup_namespace(self):
- self.user_ns.setdefault('__name__', '__main__')
- self.user_ns.setdefault('__builtins__', __builtin__)
- self.user_ns['__IP'] = self
- if self.raw_input_builtin is not None:
- self.user_ns['raw_input'] = self.raw_input_builtin
-
- if self.input_builtin is not None:
- self.user_ns['input'] = self.input_builtin
-
- builtin_additions = dict(ipmagic = self.ipmagic)
- __builtin__.__dict__.update(builtin_additions)
- if self.history is not None:
- self.history.setup_namespace(self.user_ns)
-
-
-
- def set_traps(self):
- self.output_trap.set()
- self.display_trap.set()
- self.traceback_trap.set()
-
-
- def unset_traps(self):
- self.output_trap.unset()
- self.display_trap.unset()
- self.traceback_trap.unset()
-
-
- def split_commands(self, python):
- python = python.strip()
- if isinstance(python, unicode):
- python = '\xef\xbb\xbf' + python.encode('utf-8')
-
- ast = compiler.parse(python)
- linenos = _[1]
- linenos.append(None)
- linenos[0] = 0
- lines = python.splitlines()
- cmds = []
- for i, j in zip(linenos[:-1], linenos[1:]):
- cmd = lines[i:j]
- if cmd:
- cmds.append('\n'.join(cmd) + '\n')
- continue
- []
-
- return cmds
-
-
- def error(self, text):
- errors = self.message.get('IPYTHON_ERROR', [])
- errors.append(text)
-
-
- def var_expand(self, template):
- return str(ItplNS(template, self.user_ns))
-
-
- def _possible_macro(self, obj):
- if isinstance(obj, Macro):
- self.execute_macro(obj)
-
-
-
-