home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyo (Python 2.4) """ Contains three groups of classes. Events, listeners and operations. The Operation is used in the OperationsQueue, which is an Operation itself, to process other operations in a sequenced way. They use gobject's main loop. """ import gobject class OperationListener(object): def on_finished(self, event): pass SUCCESSFUL = 0 ABORTED = 1 ERROR = 2 class Event(object): def __init__(self, source): self.source = source class FinishedEvent(Event): def __init__(self, source, id, error = None): Event.__init__(self, source) self.id = id self.error = error class Listenable(object): def __init__(self): self._Listenable__listeners = [] listeners = property(fget = (lambda self: self._Listenable__listeners), doc = 'The list of available listeners.') class Operation(Listenable): """ An operation is run assynchrounously. It is a Listenable and as such classes which extend this one should call the event on_finished of it's listeners when this operation is finished. """ title = None description = None can_start = property((lambda self: not (self.running)), doc = "Checks if the operation can start. By default you can start an operation when it's not running.") can_stop = property((lambda self: self.running), doc = 'Checks if this operation can stop. By default you can stop operations that are running.') running = property(doc = 'Tests if the operation is running.') def start(self): pass def stop(self): pass def _notify(self, method_name, *args, **kw): for l in self.listeners: meth = getattr(l, method_name, None) if meth: meth(*args, **kw) continue def _send_finished_event(self, status, error = None, source = None): ''' Broadcasts to all listeners the finished event. Simplifies the task of creating the event and iterating over listeners. ''' if source is None: source = self e = FinishedEvent(source, status, error) for l in self.listeners: if hasattr(l, 'on_finished'): l.on_finished(e) continue def _propagate(self, evt, source = None): self._send_finished_event(evt.id, evt.error, source) class FailledOperation(Operation): def __init__(self, error = None, source = None): if source is None: source = self self.error = error self.source = source super(FailledOperation, self).__init__() def start(self): self._send_finished_event(ERROR, self.error, self.source) can_start = property((lambda : True)) can_stop = property((lambda : False)) running = property((lambda : False)) def operation_factory(func): ''' This decorator protects operation factories (functions wich return operators by wrapping a try catch, if the function, by any chance, raises an exception a FailledOperation is returned instead. ''' def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception: err = None return FailledOperation(error = err) wrapper.func_name = func.func_name return wrapper class CallableOperation(Operation): '''Simple operations that takes a callable object (ie: function) and creates an non-measurable Operation.''' can_start = property((lambda : True)) can_stop = property((lambda : False)) running = property((lambda : False)) def __init__(self, callable): self.callable = callable Operation.__init__(self) def start(self): self.callable() self._send_finished_event(SUCCESSFUL) try: import subprocess import os class SubprocessOperation(Operation): def __init__(self, *args, **kwargs): super(SubprocessOperation, self).__init__() self.args = args self.kwargs = kwargs pid = property((lambda self: self._SubprocessOperation__pid)) can_start = property((lambda self: self.pid is not None)) can_stop = property((lambda self: self.pid is not None)) running = property((lambda self: self.pid is not None)) def start(self): try: proc = subprocess.Popen(*self.args, **self.kwargs) self._SubprocessOperation__pid = proc.pid self._SubprocessOperation__id = gobject.child_watch_add(self.pid, self._SubprocessOperation__on_finish) except Exception: e = None print 'Error:', e def stop(self): try: os.kill(self.pid, 9) except OSError: pass def __on_finish(self, pid, status): if status == 0: status = SUCCESSFUL else: status = ERROR self._send_finished_event(status) self._SubprocessOperation__pid = None except ImportError: pass class MeasurableOperation(Operation): progress = property(doc = "Returns the operation's progress.") class OperationsQueueListener(OperationListener): def before_operation_starts(self, event, operation): pass class OperationsQueue(MeasurableOperation, OperationListener): ''' Operation Queuees allow a user to enqueue a number of operations and run them sequentially. If one of the operations is aborted or has an error the whole queue is aborted or returns an error too. The error returned is the same returned by the problematic operation. All the elements remaining on the queue are removed. ''' def __init__(self, operations = None): Operation.__init__(self) if operations is None: operations = [] self._OperationsQueue__operations = operations self._OperationsQueue__done = 0 self._OperationsQueue__curr_oper = None self._OperationsQueue__progress = 0.0 self._OperationsQueue__total = 0 self._OperationsQueue__abort_on_failure = True def __is_running(self): return self._OperationsQueue__curr_oper != None running = property(__is_running) def __get_progress(self): if not self.running: if len(self._OperationsQueue__operations) == 0 and self._OperationsQueue__done: return 1.0 else: return 0.0 total = self._OperationsQueue__total partial = 0.0 if isinstance(self._OperationsQueue__curr_oper, MeasurableOperation): partial = self._OperationsQueue__curr_oper.progress return (self._OperationsQueue__done + partial) / total progress = property(__get_progress) def __set_abort_on_failure(self, val): self._OperationsQueue__abort_on_failure = val abort_on_failure = property((lambda self: self._OperationsQueue__abort_on_failure), __set_abort_on_failure, doc = 'If one operation stops abort progress and propagate event.') def start(self): ''' Starts all the operations on queue, sequentially. ''' self._OperationsQueue__done = 0 self._OperationsQueue__total = len(self._OperationsQueue__operations) self._OperationsQueue__progress = 0.0 self._OperationsQueue__started = True gobject.idle_add(self._OperationsQueue__start_next) def append(self, oper): self._OperationsQueue__operations.append(oper) def insert(self, index, oper): self._OperationsQueue__operations.insert(index, oper) def __start_next(self): if len(self._OperationsQueue__operations): oper = self._OperationsQueue__operations[0] del self._OperationsQueue__operations[0] e = Event(self) for l in self.listeners: if hasattr(l, 'before_operation_starts'): l.before_operation_starts(e, oper) continue oper.listeners.append(self) self._OperationsQueue__curr_oper = oper oper.start() else: self._OperationsQueue__started = False e = FinishedEvent(self, SUCCESSFUL) for l in self.listeners: if hasattr(l, 'on_finished'): l.on_finished(e) continue def on_finished(self, evt): evt.source.listeners.remove(self) self._OperationsQueue__done += 1 self._OperationsQueue__curr_oper = None if self.abort_on_failure and evt.id != SUCCESSFUL: self._OperationsQueue__operations = [] evt.source = self for l in self.listeners: l.on_finished(evt) else: self._OperationsQueue__start_next() can_stop = property((lambda self: if self.running: passself._OperationsQueue__curr_oper.can_stop)) def stop(self): self._OperationsQueue__curr_oper.stop() __len__ = lambda self: len(self._OperationsQueue__operations) def __repr__(self): return '{%s: %s}' % (super(OperationsQueue, self).__repr__(), self._OperationsQueue__operations.__repr__()) class SyncListener(OperationListener): def __init__(self, mainloop): self.mainloop = mainloop def on_finished(self, event): self.result = event self.mainloop.quit() def syncOperation(oper): ''' This function can run an operation synchronously and returns the event object. This only affects GObject related operations. ''' mainloop = gobject.MainLoop() listener = SyncListener(mainloop) oper.listeners.append(listener) oper.start() mainloop.run() return listener.result def syncableMethod(kwarg = 'sync', default_value = False): """ This is a decorator that accepts a keyword argument (defaults to 'sync') with a kwarg default value (defaults to `False`). When you call the method you can use the extra keyword argument to specify if the method call is going to be sync or async. The decorated method should be one that returns an operation. """ def decorator(func): def wrapper(*args, **kwargs): is_sync = kwargs.get(kwarg, default_value) if is_sync: del kwargs[kwarg] return syncOperation(func(*args, **kwargs)) else: return func(*args, **kwargs) return wrapper return decorator sync = syncableMethod(default_value = True) async = syncableMethod() class MapFunctor(object): def __init__(self, funcs): self._MapFunctor__funcs = funcs def __call__(self, *args, **keyws): r = [] for f in self._MapFunctor__funcs: r.append(f(*args, **keyws)) return tuple(r) class MapProxy(object): ''' This class acts as a hub or a proxy for calling methods on multiple objects. The method called from an instance of this class will be transparently called in all elements contained in this instance. The added elements is of a dictionary type and can be accessed by the __getitem__ and __setitem__ of this instance. ''' def __init__(self, elements): self._MapProxy__elements = elements def __getattr__(self, attr): funcs = [] for key in self._MapProxy__elements: funcs.append(getattr(self._MapProxy__elements[key], attr)) return MapFunctor(funcs) def __getitem__(self, key): return self._MapProxy__elements[key] def __setitem__(self, key, value): self._MapProxy__elements[key] = value def __delitem__(self, key): del self._MapProxy__elements[key] def has_key(self, key): return self._MapProxy__elements.has_key(key) if __name__ == '__main__': import sys import gtk class Listener: def on_finished(self, evt): gtk.main_quit() oper = SubprocessOperation(sys.argv[1:]) oper.listeners.append(Listener()) oper.start() gtk.main()