home *** CD-ROM | disk | FTP | other *** search
- """Python Microthread Library, version 0.1
- Microthreads are useful when you want to program many behaviors
- happening simultaneously. Simulations and games often want to model
- the simultaneous and independent behavior of many people, many
- businesses, many monsters, many physical objects, many spaceships, and
- so forth. With microthreads, you can code these behaviors as Python
- functions. Microthreads use Stackless Python. For more details, see
- http://world.std.com/~wware/uthread.html"""
-
- __version__ = "0.1"
-
- __license__ = \
- """Python Microthread Library version 0.1
- Copyright (C)2000 Will Ware, Christian Tismer
-
- Permission to use, copy, modify, and distribute this software and its
- documentation for any purpose and without fee is hereby granted,
- provided that the above copyright notice appear in all copies and that
- both that copyright notice and this permission notice appear in
- supporting documentation, and that the names of the authors not be
- used in advertising or publicity pertaining to distribution of the
- software without specific, written prior permission.
-
- WILL WARE AND CHRISTIAN TISMER DISCLAIM ALL WARRANTIES WITH REGARD TO
- THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
- FITNESS. IN NO EVENT SHALL WILL WARE OR CHRISTIAN TISMER BE LIABLE FOR
- ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
- OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE."""
-
- import stackless
- import sys
- import blue
- import types
- import weakref
- import traceback
- import log
- import copy
-
- SEC = 10000000L
- MIN = 60 * SEC
-
- tasks = [] # bogus breyta
-
- # handled internally
- schedule = stackless.schedule
-
- def new(func, *args, **kw):
- return blue.pyos.CreateTasklet(func, args, kw)
- def newWithoutTheStars(func, args, kw):
- return blue.pyos.CreateTasklet(func, args, kw)
-
- idIndex = 0
-
- def uniqueId():
- """Microthread-safe way to get unique numbers, handy for
- giving things unique ID numbers"""
- global idIndex
- ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
- #tmp = stackless.atomic()
- z = idIndex
- idIndex += 1
- return z
-
- def irandom(n):
- """Microthread-safe version of random.randrange(0,n)"""
- import random
- ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
- #tmp = stackless.atomic()
- n = random.randrange(0, n)
- return n
-
- semaphores = weakref.WeakKeyDictionary({})
-
- def GetSemaphores():
- return semaphores
-
- class Semaphore:
- """Semaphores protect globally accessible resources from
- the effects of context switching."""
- __guid__ = 'uthread.Semaphore'
-
- def __init__(self, semaphoreName=None, maxcount=1):
- global semaphores
-
- semaphores[self] = 1
-
- self.semaphoreName = semaphoreName
- self.maxcount = maxcount
- self.count = maxcount
- self.waiting = stackless.channel()
- self.thread = None
- self.lockedWhen = None
-
- def IsCool(self):
- '''
- returns true if and only if nobody has, or is waiting for, this lock
- '''
- return (self.count==self.maxcount) and (not self.waiting.queue)
-
- def __str__(self):
- return "Semaphore("+str(self.semaphoreName)+")"
-
- def __del__(self):
- if not self.IsCool():
- log.general.Log("Semaphore '"+str(self.semaphoreName)+"' is being destroyed in a locked or waiting state",4,0)
-
- def acquire(self):
- if self.count == 0:
- self.waiting.receive()
- else:
- self.count -= 1
-
- self.lockedWhen = blue.os.GetTime()
- self.thread = stackless.getcurrent()
-
- claim = acquire
-
- def release(self):
- self.thread = None
- self.lockedWhen = None
- self.count += 1
- if self.waiting.queue and self.waiting.balance < 0:
- Pool("uthread::Semaphore::delayed_release",self.__delayed_release)
-
- def __delayed_release(self):
- if (self.waiting.queue) and (self.waiting.balance < 0) and (self.count==self.maxcount):
- self.count -= 1
- self.waiting.send(None)
-
- class CriticalSection(Semaphore):
-
- __guid__ = 'uthread.CriticalSection'
-
- def __init__(self, semaphoreName):
- Semaphore.__init__(self, semaphoreName)
- self.__reentrantRefs = 0
-
- def acquire(self):
- if (self.count==0) and (self.thread is stackless.getcurrent()):
- self.__reentrantRefs += 1
- else:
- Semaphore.acquire(self)
-
- def release(self):
- if self.__reentrantRefs and (self.thread is stackless.getcurrent()):
- self.__reentrantRefs -= 1
- return
- Semaphore.release(self)
-
- def FNext(f):
- first = stackless.getcurrent()
- try:
- cursor = first.next
- while cursor != first:
- if cursor.frame.f_back == f:
- return FNext(cursor.frame)
- cursor = cursor.next
- return f
- finally:
- first = None
- cursor = None
-
- class SquidgySemaphore:
- '''
- This is a semaphore which allows exclusive locking
- '''
-
- __guid__ = "uthread.SquidgySemaphore"
-
- def __init__(self, lockName):
- self.__outer__ = Semaphore(lockName)
- self.lockers = {}
- self.__wierdo__ = 0
-
- def IsCool(self):
- '''
- returns true if and only if nobody has, or is waiting for, this lock
- '''
- while 1:
- lockers = []
- try:
- for each in self.lockers:
- return 0
- break
- except:
- StackTrace()
- return self.__outer__.IsCool() and not self.__wierdo__
-
- def acquire_pre_friendly(self):
- '''
- Same as acquire, but with respect for pre_acquire_exclusive
- '''
- while 1:
- if self.__wierdo__:
- blue.pyos.synchro.Sleep(500)
- else:
- self.acquire()
- if self.__wierdo__:
- self.release()
- else:
- break
-
- def pre_acquire_exclusive(self):
- '''
- Prepares the lock for an acquire_exclusive call, so that
- acquire_pre_friendly will block on the dude.
- '''
- self.__wierdo__ += 1
-
- def acquire_exclusive(self):
- i = 0
- while 1:
- self.__outer__.acquire()
- theLocker = None
- try:
- # self.lockers is a dict, and we just want one entry from it.
- # for each in/break is a convenient way to get one entry.
- for each in self.lockers:
- theLocker = each
- break
- except:
- StackTrace()
-
- if theLocker is not None:
- self.__outer__.release() # yielding to the sucker is fine, since we're waiting for somebody anyhow.
- if i and ((i%(3*4*60))==0):
- log.general.Log("Acquire-exclusive is waiting for the inner lock (%d seconds total, lockcount=%d)"%(i/4,len(self.lockers)),4,0)
- log.LogTraceback(channel="general", extraText="This acquire_exclusive is taking a considerable amount of time", toConsole=0, toLogServer=1)
- log.general.Log("This dude has my lock:", 4)
- log.general.Log("tasklet: "+str(theLocker),4)
- for s in log.traceback.format_list(traceback.extract_stack(FNext(theLocker.frame),40)):
- for n in range(0,10120,253): # forty lines max.
- if n==0:
- if len(s)<=255:
- x = s
- else:
- x = s[:(n+253)]
- else:
- x = " - " + s[n:(n+253)]
- log.general.Log(x, 4)
- if (n+253)>=len(s):
- break
- blue.pyos.synchro.Sleep(500)
- else:
- break
- i += 1
-
- def release_exclusive(self):
- self.__outer__.release()
- self.__wierdo__ -= 1
-
- def acquire(self):
- # you don't need the outer lock to re-acquire
- self.__outer__.acquire()
- self.__acquire_inner()
- self.__outer__.release()
-
- def release(self, t=None):
- if t is None:
- t = stackless.getcurrent()
- self.__release_inner(t)
-
- def __acquire_inner(self):
- while 1:
- try:
- if self.lockers.has_key(stackless.getcurrent()):
- self.lockers[stackless.getcurrent()] += 1
- else:
- self.lockers[stackless.getcurrent()] = 1
- break
- except:
- StackTrace()
-
- def __release_inner(self, t):
- while 1:
- try:
- if self.lockers.has_key(t):
- self.lockers[t] -= 1
- if self.lockers[t]==0:
- del self.lockers[t]
- else:
- StackTrace("You can't release a lock you didn't acquire")
- break
- except:
- StackTrace()
-
- channels = weakref.WeakKeyDictionary({})
-
- def GetChannels():
- return channels
-
- class Channel:
- """
- A Channel is a stackless.channel() with administrative spunk
- """
- __guid__ = 'uthread.Channel'
-
- def __init__(self,channelName=None):
- global channels
- self.channelName = channelName
- self.channel = stackless.channel()
- self.send = self.channel.send
- self.send_exception = self.channel.send_exception
- channels[self] = 1
-
- def receive(self):
- return self.channel.receive()
-
- def __getattr__(self,k):
- if k in ('queue', 'balance'):
- return getattr(self.channel,k)
- else:
- return self.__dict__[k]
-
- queues = []
- def QueueCheck():
- blue.pyos.CreateTasklet("uthread::QueueCheck").become(None)
- while 1:
- done = 0
- try:
- while queues:
- done = 1
- queues.pop(0).pump()
- blue.pyos.BeNice()
- except ReferenceError:
- pass
- except:
- StackTrace()
- if done:
- blue.pyos.synchro.Yield()
- else:
- blue.pyos.synchro.Sleep(100)
-
- QueueCheck()
-
- class Queue:
- """A queue is a microthread-safe FIFO."""
- __guid__ = 'uthread.Queue'
-
- def __init__(self):
- self.contents = [ ]
- self.channel = stackless.channel()
-
- def put(self, x):
- ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
- #tmp = stackless.atomic()
- self.contents.append(x)
- self.pump()
-
- def pump(self):
- while self.channel.queue and self.contents and self.channel.balance < 0:
- self.channel.send(self.contents.pop(0))
-
- def non_blocking_put(self, x):
- self.contents.append(x)
- queues.append(self)
-
- def get(self):
- ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
- #tmp = stackless.atomic()
- if self.contents:
- return self.contents.pop(0)
- return self.channel.receive()
-
- def unget(self, x):
- ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
- #tmp = stackless.atomic()
- self.contents.insert(0, x)
-
- def cget(self):
- return self.contents.pop(0)
-
-
- # --------------------------------------------------------------------
- class Event:
-
- __guid__ = 'uthread.Event'
-
- # --------------------------------------------------------------------
- def __init__(self, manual=1, signaled=0):
- self.channel = stackless.channel()
- self.manual = manual
- self.signaled = signaled
-
- # --------------------------------------------------------------------
- def Wait(self, timeout=-1):
- if timeout != -1:
- raise RuntimeError("No timeouts supported in Event")
-
- if not self.signaled:
- self.channel.receive()
-
- # --------------------------------------------------------------------
- def SetEvent(self):
- if self.manual:
- self.signaled = 1
-
- while self.channel.queue:
- self.channel.send(None)
-
- # --------------------------------------------------------------------
- def ResetEvent(self):
- self.signaled = 0
-
-
-
- def LockCheck():
- blue.pyos.CreateTasklet("uthread::LockCheck").become(None)
- global semaphores
- while 1:
- each = None
- blue.pyos.synchro.Sleep(1000 * 60 * 5)
- now = blue.os.GetTime()
- try:
- for each in semaphores.keys():
- blue.pyos.BeNice()
- if (each.count==0) and (each.waiting.balance < 0) and ((now - each.lockedWhen)>=(5*MIN)):
- log.general.Log("These two threads are in a locking conflict: ",4)
- log.general.Log("thread 1:",4)
- for s in log.traceback.format_list(traceback.extract_stack(FNext(each.thread.frame),40)):
- log.general.Log(s,4)
- log.general.Log("thread 2:",4)
- for s in log.traceback.format_list(traceback.extract_stack(FNext(each.waiting.queue.frame),40)):
- log.general.Log(s,4)
- except:
- StackTrace()
- LockCheck()
-
- __uthread__queue__ = None
- def PoolHelper(queue):
- t = stackless.getcurrent()
- t.privateStorage = None
- t.localStorage = None
- try:
- try:
- while 1:
- blue.pyos.BeNice()
- ctx, callingContext, func, loc, args, keywords = queue.get()
- if (queue.channel.balance >= 0):
- new(PoolHelper, queue).context = "uthread::PoolHelper"
- t.privateStorage = None
- SetLocalStorage(loc)
- _tmpctx = t.PushTimer(ctx)
- try:
- apply( func, args, keywords )
- finally:
- ctx = None
- callingContext = None
- func = None
- t.privateStorage = None
- t.localStorage = None
- loc = None
- args = None
- keywords = None
- t.PopTimer(_tmpctx)
- except:
- if callingContext is not None:
- extra = "spawned at %s %s(%s)"%callingContext
- else:
- extra = ""
- StackTrace("general",0,"Unhandled exception in %s%s"%(ctx,extra))
- finally:
- del t
- new(PoolHelper, queue).context = "uthread::PoolHelper"
-
- def PoolWorker(ctx,func,*args,**keywords):
- '''
- Same as uthread.pool, but without copying local storage, thus resetting session, etc.
-
- Should be used for spawning worker threads.
- '''
- return PoolWithoutTheStars(ctx,func,args,keywords,0,1)
-
- def PoolWithoutTheStars(ctx,func,args,keywords,unsafe=0,worker=0):
- #newWithoutTheStars(func, args, keywords )
- #return None
- global __uthread__queue__
- callingContext = None
- if ctx is None:
- if unsafe:
- ctx = "uthread::PoolHelper::UnsafeCrap"
- else:
- c = stackless.getcurrent()
- ctx = getattr(c,"context","???")
- frame = c.frame.f_back
- callingContext = frame.f_code.co_name, frame.f_code.co_filename, traceback.f_lineno(frame)
- frame = None
- c = None
-
- if __uthread__queue__ is None:
- __uthread__queue__ = Queue()
- for i in range(60):
- new(PoolHelper, __uthread__queue__).context = "uthread::PoolHelper"
- if unsafe or worker:
- st = None
- else:
- st = copy.copy(GetLocalStorage())
- __uthread__queue__.non_blocking_put( (str(ctx), callingContext, func, st, args, keywords,) )
- return None
-
- def Pool(ctx,func,*args,**keywords):
- '''
- executes apply(args,keywords) on a new uthread. The uthread in question is taken
- from a thread pool, rather than created one-per-shot call. ctx is used as the
- thread context. This should generally be used for short-lived threads to reduce
- overhead.
- '''
- return PoolWithoutTheStars(ctx,func,args,keywords)
-
- def UnSafePool(ctx,func,*args,**keywords):
- '''
- uthread.pool, but without any dangerous calls to stackless.getcurrent(), which could
- have dramatic and drastic effects in the wrong context.
- '''
- return PoolWithoutTheStars(ctx,func,args,keywords,1)
-
- def ParallelHelper(ch,idx,what):
- ei = None
- try:
- if len(what)==3:
- ret = (idx, apply(what[0], what[1], what[2] ))
- if ch.balance < 0 :
- ch.send( (1, ret) )
- else:
- ret = (idx, apply(what[0], what[1] ))
- if ch.balance < 0:
- ch.send( (1, ret) )
- except:
- ei = sys.exc_info()
-
- if ei:
- if ch.balance < 0:
- ch.send((0,ei))
- del ei
-
- def Parallel(funcs,exceptionHandler=None,maxcount=30):
- '''
- Executes in parallel all the function calls specified in the list/tuple 'funcs', but returns the
- return values in the order of the funcs list/tuple. If an exception occurs, only the first exception
- will reach you. The rest will dissapear in a puff of logic.
-
- Each 'func' entry should be a tuple/list of:
- 1. a function to call
- 2. a tuple of arguments to call it with
- 3. optionally, a dict of keyword args to call it with.
- '''
- if not funcs:
- return
-
- context = "ParallelHelper::"+getattr(stackless.getcurrent(),"context","???")
- ch = stackless.channel()
- ret = [ None ] * len(funcs)
- n = len(funcs)
- if n > maxcount:
- n = maxcount
- for i in range(n):
- if type(funcs[i]) != types.TupleType:
- raise RuntimeError("Parallel requires a list/tuple of (function, args tuple, optional keyword dict,)")
- Pool(context, ParallelHelper, ch, i, funcs[i])
- for i in range(len(funcs)):
- ok, bunch = ch.receive()
- if ok:
- idx,val = bunch
- if len(funcs[i])==4:
- ret[idx] = (funcs[i][3], val,)
- else:
- ret[idx] = val
- else:
- try:
- raise bunch[0],bunch[1],bunch[2]
- except:
- if exceptionHandler:
- exctype, exc, tb = sys.exc_info()
- try:
- try:
- apply( exceptionHandler, (exc,) )
- except:
- raise exc, None, tb
- finally:
- exctype, exc, tb = None, None, None
- else:
- StackTrace()
- raise
-
- if n<len(funcs):
- if type(funcs[n]) != types.TupleType:
- raise RuntimeError("Parallel requires a list/tuple of (function, args tuple, optional keyword dict,)")
- Pool(context, ParallelHelper, ch, n, funcs[n])
- n+=1
- return ret
-
-
- exports = {
- "uthread.parallel": Parallel,
- "uthread.worker": PoolWorker,
- "uthread.new": new,
- "uthread.unsafepool": UnSafePool,
- "uthread.pool": Pool,
- "uthread.poolWithoutTheStars": PoolWithoutTheStars,
- "uthread.newWithoutTheStars": newWithoutTheStars,
- "uthread.irandom": irandom,
- "uthread.uniqueId": uniqueId,
- "uthread.schedule": schedule,
- "uthread.GetChannels": GetChannels,
- "uthread.GetSemaphores": GetSemaphores,
- "uthread.FNext": FNext,
- }
-
-