home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.6)
-
- import weakref
- import sys
- import heapq
- import UserList
- import UserDict
- from peak.util.extremes import Max
- from peak.util import decorators
-
- try:
- import threading
- except ImportError:
- import dummy_threading as threading
-
- __all__ = [
- 'STMHistory',
- 'AbstractSubject',
- 'Link',
- 'AbstractListener',
- 'Controller',
- 'CircularityError',
- 'LocalController']
-
- class CircularityError(Exception):
- pass
-
-
- class AbstractSubject(object):
- __slots__ = ()
- manager = None
- layer = 0
-
- def __init__(self):
- self.next_listener = None
-
-
- def iter_listeners(self):
- link = self.next_listener
- while link is not None:
- nxt = link.next_listener
- ob = link()
- if ob is not None:
- yield ob
-
- link = nxt
-
-
-
- class AbstractListener(object):
- __slots__ = ()
- layer = 0
-
- def __init__(self):
- self.next_subject = None
-
-
- def iter_subjects(self):
- link = self.next_subject
- while link is not None:
- nxt = link.next_subject
- if link.subject is not None:
- yield link.subject
-
- link = nxt
-
-
- def dirty(self):
- return True
-
-
- def run(self):
- raise NotImplementedError
-
-
-
- try:
-
- class Link(weakref.ref):
- pass
-
- except TypeError:
-
- class link_base(object):
- __slots__ = 'weakref'
-
- def __new__(cls, ob, callback):
- self = object.__new__(cls)
- self.weakref = None((weakref.ref, ob), (lambda r: callback(self)))
- return self
-
-
- def __call__(self):
- return self.weakref()
-
-
-
- link_base = weakref.ref
-
- try:
- from threading import local
- except ImportError:
- from _threading_local import local
- threading.local = local
-
-
- try:
- set
- except NameError:
- from sets import Set as set
-
-
- class Link(link_base):
- __slots__ = [
- 'subject',
- 'next_subject',
- 'prev_subject',
- 'next_listener',
- 'prev_listener']
-
- def __new__(cls, subject, listener):
- self = link_base.__new__(Link, listener, _unlink_fn)
- self.subject = self.prev_listener = subject
- self.prev_subject = None
- nxt = self.next_subject = listener.next_subject
- if nxt is not None:
- nxt.prev_subject = self
-
- nxt = self.next_listener = subject.next_listener
- if nxt is not None:
- nxt.prev_listener = self
-
- listener.next_subject = self
- subject.next_listener = self
- return self
-
-
- def unlink(self):
- nxt = self.next_listener
- prev = self.prev_listener
- if nxt is not None:
- nxt.prev_listener = prev
-
- if prev is not None and prev.next_listener is self:
- prev.next_listener = nxt
-
- prev = self.prev_subject
- nxt = self.next_subject
- if nxt is not None:
- nxt.prev_subject = prev
-
- if prev is None:
- prev = self()
-
- if prev is not None and prev.next_subject is self:
- prev.next_subject = nxt
-
- self.subject = None
- self.next_subject = None
- self.prev_subject = None
- self.next_listener = None
- self.prev_listener = None
-
-
- _unlink_fn = Link.unlink
-
- class STMHistory(object):
- active = in_cleanup = undoing = False
-
- def __init__(self):
- self.undo = []
- self.at_commit = []
- self.managers = { }
-
-
- def atomically(self, func = (lambda : pass), *args, **kw):
- if self.active:
- return func(*args, **kw)
- self.active = True
-
- try:
- retval = func(*args, **kw)
- self.cleanup()
- return retval
- except:
- self.active
- self.cleanup(*sys.exc_info())
- finally:
- self.active = False
-
-
-
- def manage(self, mgr):
- if mgr not in self.managers:
- mgr.__enter__()
- self.managers[mgr] = len(self.managers)
-
-
-
- def on_undo(self, func, *args):
- if not self.undoing:
- self.undo.append((func, args))
-
-
-
- def savepoint(self):
- return len(self.undo)
-
-
- def cleanup(self, typ = None, val = None, tb = None):
- self.in_cleanup = True
- if typ is None:
-
- try:
- self.checkpoint()
- (typ, val, tb) = sys.exc_info()
-
-
- if typ is not None:
-
- try:
- self.rollback_to(0)
- (typ, val, tb) = sys.exc_info()
-
-
- managers = [ (posn, mgr) for mgr, posn in self.managers.items() ]
- managers.sort()
- self.managers.clear()
-
- try:
- while managers:
-
- try:
- managers.pop()[1].__exit__(typ, val, tb)
- continue
- []
- (typ, val, tb) = sys.exc_info()
- continue
-
- if typ is not None:
- raise typ, val, tb
- typ is not None
- finally:
- del self.at_commit[:]
- del self.undo[:]
- self.in_cleanup = False
- typ = None
- val = None
- tb = None
-
-
-
- def change_attr(self, ob, attr, val):
- self.on_undo(setattr, ob, attr, getattr(ob, attr))
- setattr(ob, attr, val)
-
-
- def rollback_to(self, sp = 0):
- undo = self.undo
- self.undoing = True
- rb = self.rollback_to
-
- try:
- while len(undo) > sp:
- (f, a) = undo.pop()
- if f == rb and a:
- sp = min(sp, a[0])
- continue
- f(*a)
- finally:
- self.undoing = False
-
-
-
- def on_commit(self, func, *args):
- s = slice(len(self.at_commit), None)
- self.undo.append((self.at_commit.__delitem__, (s,)))
- self.at_commit.append((func, args))
-
-
- def checkpoint(self):
- for f, a in self.at_commit:
- f(*a)
-
- del self.at_commit[:]
-
-
-
- class Controller(STMHistory):
- current_listener = None
- destinations = None
- routes = None
- newcells = None
- readonly = False
-
- def __init__(self):
- super(Controller, self).__init__()
- self.reads = { }
- self.writes = { }
- self.has_run = { }
- self.layers = []
- self.queues = { }
- self.to_retry = { }
-
-
- def checkpoint(self):
- self.has_run.clear()
- return super(Controller, self).checkpoint()
-
-
- def _retry(self):
-
- try:
- self.destinations = set(self.to_retry)
- self.routes = { }
- []([]([ self.has_run[r] for r in self.to_retry ]))
- for item in self.to_retry:
- if item in self.routes:
- path = check_circularity(item, self.routes)
- if path:
- raise CircularityError(self.routes, path)
- path
- continue
- min
- finally:
- self.to_retry.clear()
- self.destinations = None
- self.routes = None
-
-
-
- def __getattr__(self, name):
- if name == 'pulse':
- Value = Value
- import peak.events.trellis
- self.pulse = Value(0)
- return self.pulse
- raise AttributeError(name)
-
-
- def _unrun(self, listener, notified):
- destinations = self.destinations
- if destinations is not None:
- via = destinations.intersection(notified)
- if via:
- self.routes[listener] = via
- destinations.add(listener)
-
-
-
-
- def run_rule(self, listener, initialized = True):
- if listener.layer is Max and not (self.readonly):
- return self.with_readonly(self.run_rule, listener, initialized)
- old = self.current_listener
- self.current_listener = listener
-
- try:
- if old is not None:
- old_reads = self.reads
- self.reads = { }
-
- try:
- listener.run()
- self._process_reads(listener)
- finally:
- self.reads = old_reads
-
- elif initialized:
- self.has_run[listener] = self.savepoint()
- self.on_undo(self.has_run.pop, listener, None)
-
-
- try:
- listener.run()
- self._process_writes(listener)
- self._process_reads(listener)
- except:
- self.reads.clear()
- self.writes.clear()
- raise
-
- finally:
- self.current_listener = old
-
-
-
- def _process_writes(self, listener):
- notified = { }
- writes = self.writes
- layer = listener.layer
- while writes:
- (subject, writer) = writes.popitem()
- for dependent in subject.iter_listeners():
- if dependent is not listener:
- if dependent.dirty():
- self.schedule(dependent, layer)
- notified[dependent] = 1
-
- dependent.dirty()
-
- if notified:
- self.on_undo(self._unrun, listener, notified)
-
-
-
- def _process_reads(self, listener):
- subjects = self.reads
- link = listener.next_subject
- while link is not None:
- nxt = link.next_subject
- if link.subject in subjects:
- del subjects[link.subject]
- else:
- self.undo.append((Link, (link.subject, listener)))
- link.unlink()
- link = nxt
- while subjects:
- link = Link(subjects.popitem()[0], listener)
- self.undo.append((link.unlink, ()))
-
-
- def schedule(self, listener, source_layer = None):
- new = old = listener.layer
- get = self.queues.get
- if source_layer is not None and source_layer >= listener.layer:
- new = source_layer + 1
-
- if listener in self.has_run:
- self.to_retry[listener] = 1
-
- q = get(old)
- if q and listener in q:
- if new is not old:
- self.cancel(listener)
-
- elif self.active and not (self.undoing):
- self.on_undo(self.cancel, listener)
-
- if new is not old:
- listener.layer = new
- q = get(new)
-
- if q is None:
- q = self.queues[new] = {
- listener: 1 }
- heapq.heappush(self.layers, new)
- else:
- q[listener] = 1
-
-
- def cancel(self, listener):
- q = self.queues.get(listener.layer)
- if q and listener in q:
- del q[listener]
- if not q:
- del self.queues[listener.layer]
- self.layers.remove(listener.layer)
- self.layers.sort()
-
-
-
-
- def atomically(self, func = (lambda : pass), *args, **kw):
- if self.active:
- return func(*args, **kw)
- return super(Controller, self).atomically(self._process, func, args, kw)
-
-
- def _process(self, func, args, kw):
-
- try:
- retval = func(*args, **kw)
- layers = self.layers
- queues = self.queues
- while layers or self.at_commit:
- self.pulse.value += 1
- while layers:
- if self.to_retry:
- self._retry()
-
- q = queues[layers[0]]
- if q:
- listener = q.popitem()[0]
- self.on_undo(self.schedule, listener)
- self.run_rule(listener)
- continue
- del queues[layers[0]]
- heapq.heappop(layers)
- self.checkpoint()
- return retval
- except:
- del self.layers[:]
- self.queues.clear()
- raise
-
-
-
- def lock(self, subject):
- manager = subject.manager
- if manager is not None and manager not in self.managers:
- self.manage(manager)
-
-
-
- def used(self, subject):
- self.lock(subject)
- cl = self.current_listener
- if cl is not None and subject not in self.reads:
- self.reads[subject] = 1
- if cl is not subject and subject.layer >= cl.layer:
- cl.layer = subject.layer + 1
-
-
-
-
- def changed(self, subject):
- self.lock(subject)
- cl = self.current_listener
- if self.readonly and subject is not cl:
- if self.newcells is None or subject not in self.newcells:
- raise RuntimeError("Can't change objects during @perform or @compute")
- subject not in self.newcells
- if cl is not None:
- self.writes[subject] = cl
- else:
- for listener in subject.iter_listeners():
- if listener.dirty():
- self.schedule(listener)
- continue
-
-
-
- def initialize(self, listener):
- self.run_rule(listener, False)
-
-
- def with_readonly(self, func, *args):
- if self.readonly:
- return func(*args)
- self.readonly = True
-
- try:
- return func(*args)
- finally:
- self.readonly = False
-
-
-
- def new_cell(self, cell):
- if self.newcells is not None:
- self.newcells[cell] = 1
-
-
-
- def with_new(self, func, *args, **kw):
- old = self.newcells
- if old is None:
- self.newcells = { }
-
-
- try:
- return func(*args, **kw)
- finally:
- self.newcells = old
-
-
-
-
- def check_circularity(item, routes, start = None, seen = None):
- if seen is None:
- seen = { }
-
- if start is None:
- start = item
-
- for via in routes.get(item, ()):
- if via is start:
- return (via,)
- if via not in seen:
- seen[via] = 1
- path = check_circularity(via, routes, start, seen)
- if path:
- return (via,) + path
- continue
- path
-
- return ()
-
-
- class LocalController(Controller, threading.local):
- pass
-
-