home *** CD-ROM | disk | FTP | other *** search
/ Enter 2004 April / enter-2004-04.iso / files / EVE_1424_100181.exe / uthread.py < prev    next >
Encoding:
Python Source  |  2004-04-20  |  20.4 KB  |  611 lines

  1. """Python Microthread Library, version 0.1
  2. Microthreads are useful when you want to program many behaviors
  3. happening simultaneously. Simulations and games often want to model
  4. the simultaneous and independent behavior of many people, many
  5. businesses, many monsters, many physical objects, many spaceships, and
  6. so forth. With microthreads, you can code these behaviors as Python
  7. functions. Microthreads use Stackless Python. For more details, see
  8. http://world.std.com/~wware/uthread.html"""
  9.  
  10. __version__ = "0.1"
  11.  
  12. __license__ = \
  13. """Python Microthread Library version 0.1
  14. Copyright (C)2000  Will Ware, Christian Tismer
  15.  
  16. Permission to use, copy, modify, and distribute this software and its
  17. documentation for any purpose and without fee is hereby granted,
  18. provided that the above copyright notice appear in all copies and that
  19. both that copyright notice and this permission notice appear in
  20. supporting documentation, and that the names of the authors not be
  21. used in advertising or publicity pertaining to distribution of the
  22. software without specific, written prior permission.
  23.  
  24. WILL WARE AND CHRISTIAN TISMER DISCLAIM ALL WARRANTIES WITH REGARD TO
  25. THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
  26. FITNESS. IN NO EVENT SHALL WILL WARE OR CHRISTIAN TISMER BE LIABLE FOR
  27. ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
  28. WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
  29. ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
  30. OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE."""
  31.  
  32. import stackless
  33. import sys
  34. import blue
  35. import types
  36. import weakref
  37. import traceback
  38. import log
  39. import copy
  40.  
  41. SEC  = 10000000L
  42. MIN  = 60 * SEC
  43.  
  44. tasks = [] # bogus breyta
  45.  
  46. # handled internally
  47. schedule = stackless.schedule
  48.  
  49. def new(func, *args, **kw):
  50.     return blue.pyos.CreateTasklet(func, args, kw)
  51. def newWithoutTheStars(func, args, kw):
  52.     return blue.pyos.CreateTasklet(func, args, kw)
  53.  
  54. idIndex = 0
  55.  
  56. def uniqueId():
  57.     """Microthread-safe way to get unique numbers, handy for
  58.     giving things unique ID numbers"""
  59.     global idIndex
  60.     ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
  61.     #tmp = stackless.atomic()
  62.     z = idIndex
  63.     idIndex += 1
  64.     return z
  65.  
  66. def irandom(n):
  67.     """Microthread-safe version of random.randrange(0,n)"""
  68.     import random
  69.     ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
  70.     #tmp = stackless.atomic()
  71.     n = random.randrange(0, n)
  72.     return n
  73.  
  74. semaphores               = weakref.WeakKeyDictionary({})
  75.  
  76. def GetSemaphores():
  77.     return semaphores
  78.  
  79. class Semaphore:
  80.     """Semaphores protect globally accessible resources from
  81.     the effects of context switching."""
  82.     __guid__ = 'uthread.Semaphore'
  83.  
  84.     def __init__(self, semaphoreName=None, maxcount=1):
  85.         global semaphores
  86.  
  87.         semaphores[self] = 1
  88.  
  89.         self.semaphoreName  = semaphoreName
  90.         self.maxcount       = maxcount
  91.         self.count          = maxcount
  92.         self.waiting        = stackless.channel()
  93.         self.thread         = None
  94.         self.lockedWhen     = None
  95.  
  96.     def IsCool(self):
  97.         '''
  98.             returns true if and only if nobody has, or is waiting for, this lock
  99.         '''
  100.         return (self.count==self.maxcount) and (not self.waiting.queue)
  101.  
  102.     def __str__(self):
  103.         return "Semaphore("+str(self.semaphoreName)+")"
  104.  
  105.     def __del__(self):
  106.         if not self.IsCool():
  107.             log.general.Log("Semaphore '"+str(self.semaphoreName)+"' is being destroyed in a locked or waiting state",4,0)
  108.  
  109.     def acquire(self):
  110.         if self.count == 0:
  111.             self.waiting.receive()
  112.         else:
  113.             self.count -= 1
  114.  
  115.         self.lockedWhen = blue.os.GetTime()
  116.         self.thread = stackless.getcurrent()
  117.  
  118.     claim = acquire
  119.  
  120.     def release(self):
  121.         self.thread     =   None
  122.         self.lockedWhen =   None
  123.         self.count      +=  1
  124.         if self.waiting.queue and self.waiting.balance < 0:
  125.             Pool("uthread::Semaphore::delayed_release",self.__delayed_release)
  126.  
  127.     def __delayed_release(self):
  128.         if (self.waiting.queue) and (self.waiting.balance < 0) and (self.count==self.maxcount):
  129.             self.count -= 1
  130.             self.waiting.send(None)
  131.  
  132. class CriticalSection(Semaphore):
  133.  
  134.     __guid__ = 'uthread.CriticalSection'
  135.  
  136.     def __init__(self, semaphoreName):
  137.         Semaphore.__init__(self, semaphoreName)
  138.         self.__reentrantRefs = 0
  139.  
  140.     def acquire(self):
  141.         if (self.count==0) and (self.thread is stackless.getcurrent()):
  142.             self.__reentrantRefs += 1
  143.         else:
  144.             Semaphore.acquire(self)
  145.  
  146.     def release(self):
  147.         if self.__reentrantRefs and (self.thread is stackless.getcurrent()):
  148.             self.__reentrantRefs -= 1
  149.             return
  150.         Semaphore.release(self)
  151.  
  152. def FNext(f):
  153.     first  = stackless.getcurrent()
  154.     try:
  155.         cursor = first.next
  156.         while cursor != first:
  157.             if cursor.frame.f_back == f:
  158.                 return FNext(cursor.frame)
  159.             cursor = cursor.next
  160.         return f
  161.     finally:
  162.         first  = None
  163.         cursor = None
  164.  
  165. class SquidgySemaphore:
  166.     '''
  167.         This is a semaphore which allows exclusive locking
  168.     '''
  169.  
  170.     __guid__ = "uthread.SquidgySemaphore"
  171.  
  172.     def __init__(self, lockName):
  173.         self.__outer__  = Semaphore(lockName)
  174.         self.lockers    = {}
  175.         self.__wierdo__ = 0
  176.  
  177.     def IsCool(self):
  178.         '''
  179.             returns true if and only if nobody has, or is waiting for, this lock
  180.         '''
  181.         while 1:
  182.             lockers = []
  183.             try:
  184.                 for each in self.lockers:
  185.                     return 0
  186.                 break
  187.             except:
  188.                 StackTrace()
  189.         return self.__outer__.IsCool() and not self.__wierdo__
  190.  
  191.     def acquire_pre_friendly(self):
  192.         '''
  193.             Same as acquire, but with respect for pre_acquire_exclusive
  194.         '''
  195.         while 1:
  196.             if self.__wierdo__:
  197.                 blue.pyos.synchro.Sleep(500)
  198.             else:
  199.                 self.acquire()
  200.                 if self.__wierdo__:
  201.                     self.release()
  202.                 else:
  203.                     break
  204.  
  205.     def pre_acquire_exclusive(self):
  206.         '''
  207.             Prepares the lock for an acquire_exclusive call, so that
  208.             acquire_pre_friendly will block on the dude.
  209.         '''
  210.         self.__wierdo__ += 1
  211.  
  212.     def acquire_exclusive(self):
  213.         i = 0
  214.         while 1:
  215.             self.__outer__.acquire()
  216.             theLocker = None
  217.             try:
  218.                 # self.lockers is a dict, and we just want one entry from it.
  219.                 # for each in/break is a convenient way to get one entry.
  220.                 for each in self.lockers:
  221.                     theLocker = each
  222.                     break
  223.             except:
  224.                 StackTrace()
  225.  
  226.             if theLocker is not None:
  227.                 self.__outer__.release() # yielding to the sucker is fine, since we're waiting for somebody anyhow.
  228.                 if i and ((i%(3*4*60))==0):
  229.                     log.general.Log("Acquire-exclusive is waiting for the inner lock (%d seconds total, lockcount=%d)"%(i/4,len(self.lockers)),4,0)
  230.                     log.LogTraceback(channel="general", extraText="This acquire_exclusive is taking a considerable amount of time", toConsole=0, toLogServer=1)
  231.                     log.general.Log("This dude has my lock:", 4)
  232.                     log.general.Log("tasklet: "+str(theLocker),4)
  233.                     for s in log.traceback.format_list(traceback.extract_stack(FNext(theLocker.frame),40)):
  234.                         for n in range(0,10120,253): # forty lines max.
  235.                             if n==0:
  236.                                 if len(s)<=255:
  237.                                     x = s
  238.                                 else:
  239.                                     x = s[:(n+253)]
  240.                             else:
  241.                                 x = " - " + s[n:(n+253)]
  242.                             log.general.Log(x, 4)
  243.                             if (n+253)>=len(s):
  244.                                 break
  245.                 blue.pyos.synchro.Sleep(500)
  246.             else:
  247.                 break
  248.             i += 1
  249.  
  250.     def release_exclusive(self):
  251.         self.__outer__.release()
  252.         self.__wierdo__ -= 1
  253.  
  254.     def acquire(self):
  255.         # you don't need the outer lock to re-acquire
  256.         self.__outer__.acquire()
  257.         self.__acquire_inner()
  258.         self.__outer__.release()
  259.  
  260.     def release(self, t=None):
  261.         if t is None:
  262.             t = stackless.getcurrent()
  263.         self.__release_inner(t)
  264.  
  265.     def __acquire_inner(self):
  266.         while 1:
  267.             try:
  268.                 if self.lockers.has_key(stackless.getcurrent()):
  269.                     self.lockers[stackless.getcurrent()] += 1
  270.                 else:
  271.                     self.lockers[stackless.getcurrent()] = 1
  272.                 break
  273.             except:
  274.                 StackTrace()
  275.  
  276.     def __release_inner(self, t):
  277.         while 1:
  278.             try:
  279.                 if self.lockers.has_key(t):
  280.                     self.lockers[t] -= 1
  281.                     if self.lockers[t]==0:
  282.                         del self.lockers[t]
  283.                 else:
  284.                     StackTrace("You can't release a lock you didn't acquire")
  285.                 break
  286.             except:
  287.                 StackTrace()
  288.  
  289. channels            = weakref.WeakKeyDictionary({})
  290.  
  291. def GetChannels():
  292.     return channels
  293.  
  294. class Channel:
  295.     """
  296.         A Channel is a stackless.channel() with administrative spunk
  297.     """
  298.     __guid__ = 'uthread.Channel'
  299.  
  300.     def __init__(self,channelName=None):
  301.         global channels
  302.         self.channelName = channelName
  303.         self.channel = stackless.channel()
  304.         self.send = self.channel.send
  305.         self.send_exception = self.channel.send_exception
  306.         channels[self] = 1
  307.  
  308.     def receive(self):
  309.         return self.channel.receive()
  310.  
  311.     def __getattr__(self,k):
  312.         if k in ('queue', 'balance'):
  313.             return getattr(self.channel,k)
  314.         else:
  315.             return self.__dict__[k]
  316.  
  317. queues = []
  318. def QueueCheck():
  319.     blue.pyos.CreateTasklet("uthread::QueueCheck").become(None)
  320.     while 1:
  321.         done = 0
  322.         try:
  323.             while queues:
  324.                 done = 1
  325.                 queues.pop(0).pump()
  326.                 blue.pyos.BeNice()
  327.         except ReferenceError:
  328.             pass
  329.         except:
  330.             StackTrace()
  331.         if done:
  332.             blue.pyos.synchro.Yield()
  333.         else:
  334.             blue.pyos.synchro.Sleep(100)
  335.  
  336. QueueCheck()
  337.  
  338. class Queue:
  339.     """A queue is a microthread-safe FIFO."""
  340.     __guid__ = 'uthread.Queue'
  341.  
  342.     def __init__(self):
  343.         self.contents = [ ]
  344.         self.channel  = stackless.channel()
  345.  
  346.     def put(self, x):
  347.         ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
  348.         #tmp = stackless.atomic()
  349.         self.contents.append(x)
  350.         self.pump()
  351.  
  352.     def pump(self):
  353.         while self.channel.queue and self.contents and self.channel.balance < 0:
  354.             self.channel.send(self.contents.pop(0))
  355.  
  356.     def non_blocking_put(self, x):
  357.         self.contents.append(x)
  358.         queues.append(self)
  359.  
  360.     def get(self):
  361.         ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
  362.         #tmp = stackless.atomic()
  363.         if self.contents:
  364.             return self.contents.pop(0)
  365.         return self.channel.receive()
  366.  
  367.     def unget(self, x):
  368.         ## CCP is cutting out atomic as we never preemtivly schedule and stackless was crashing there
  369.         #tmp = stackless.atomic()
  370.         self.contents.insert(0, x)
  371.  
  372.     def cget(self):
  373.         return self.contents.pop(0)
  374.  
  375.  
  376. # --------------------------------------------------------------------
  377. class Event:
  378.  
  379.     __guid__ = 'uthread.Event'
  380.  
  381.     # --------------------------------------------------------------------
  382.     def __init__(self, manual=1, signaled=0):
  383.         self.channel = stackless.channel()
  384.         self.manual = manual
  385.         self.signaled = signaled
  386.  
  387.     # --------------------------------------------------------------------
  388.     def Wait(self, timeout=-1):
  389.         if timeout != -1:
  390.             raise RuntimeError("No timeouts supported in Event")
  391.  
  392.         if not self.signaled:
  393.             self.channel.receive()
  394.  
  395.     # --------------------------------------------------------------------
  396.     def SetEvent(self):
  397.         if self.manual:
  398.             self.signaled = 1
  399.  
  400.         while self.channel.queue:
  401.             self.channel.send(None)
  402.  
  403.     # --------------------------------------------------------------------
  404.     def ResetEvent(self):
  405.         self.signaled = 0
  406.  
  407.  
  408.  
  409. def LockCheck():
  410.     blue.pyos.CreateTasklet("uthread::LockCheck").become(None)
  411.     global semaphores
  412.     while 1:
  413.         each = None
  414.         blue.pyos.synchro.Sleep(1000 * 60 * 5)
  415.         now = blue.os.GetTime()
  416.         try:
  417.             for each in semaphores.keys():
  418.                 blue.pyos.BeNice()
  419.                 if (each.count==0) and (each.waiting.balance < 0) and ((now - each.lockedWhen)>=(5*MIN)):
  420.                     log.general.Log("These two threads are in a locking conflict: ",4)
  421.                     log.general.Log("thread 1:",4)
  422.                     for s in log.traceback.format_list(traceback.extract_stack(FNext(each.thread.frame),40)):
  423.                         log.general.Log(s,4)
  424.                     log.general.Log("thread 2:",4)
  425.                     for s in log.traceback.format_list(traceback.extract_stack(FNext(each.waiting.queue.frame),40)):
  426.                         log.general.Log(s,4)
  427.         except:
  428.             StackTrace()
  429. LockCheck()
  430.  
  431. __uthread__queue__          = None
  432. def PoolHelper(queue):
  433.     t = stackless.getcurrent()
  434.     t.privateStorage = None
  435.     t.localStorage   = None
  436.     try:
  437.         try:
  438.             while 1:
  439.                 blue.pyos.BeNice()
  440.                 ctx, callingContext, func, loc, args, keywords = queue.get()
  441.                 if (queue.channel.balance >= 0):
  442.                     new(PoolHelper, queue).context = "uthread::PoolHelper"
  443.                 t.privateStorage = None
  444.                 SetLocalStorage(loc)
  445.                 _tmpctx = t.PushTimer(ctx)
  446.                 try:
  447.                     apply( func, args, keywords )
  448.                 finally:
  449.                     ctx                 = None
  450.                     callingContext      = None
  451.                     func                = None
  452.                     t.privateStorage    = None
  453.                     t.localStorage      = None
  454.                     loc                 = None
  455.                     args                = None
  456.                     keywords            = None
  457.                     t.PopTimer(_tmpctx)
  458.         except:
  459.             if callingContext is not None:
  460.                 extra = "spawned at %s %s(%s)"%callingContext
  461.             else:
  462.                 extra = ""
  463.             StackTrace("general",0,"Unhandled exception in %s%s"%(ctx,extra))
  464.     finally:
  465.         del t
  466.         new(PoolHelper, queue).context = "uthread::PoolHelper"
  467.  
  468. def PoolWorker(ctx,func,*args,**keywords):
  469.     '''
  470.         Same as uthread.pool, but without copying local storage, thus resetting session, etc.
  471.  
  472.         Should be used for spawning worker threads.
  473.     '''
  474.     return PoolWithoutTheStars(ctx,func,args,keywords,0,1)
  475.  
  476. def PoolWithoutTheStars(ctx,func,args,keywords,unsafe=0,worker=0):
  477.     #newWithoutTheStars(func, args, keywords )
  478.     #return None
  479.     global __uthread__queue__
  480.     callingContext = None
  481.     if ctx is None:
  482.         if unsafe:
  483.             ctx = "uthread::PoolHelper::UnsafeCrap"
  484.         else:
  485.             c = stackless.getcurrent()
  486.             ctx = getattr(c,"context","???")
  487.             frame = c.frame.f_back
  488.             callingContext = frame.f_code.co_name, frame.f_code.co_filename, traceback.f_lineno(frame)
  489.             frame = None
  490.             c = None
  491.  
  492.     if __uthread__queue__ is None:
  493.         __uthread__queue__ = Queue()
  494.         for i in range(60):
  495.             new(PoolHelper, __uthread__queue__).context = "uthread::PoolHelper"
  496.     if unsafe or worker:
  497.         st = None
  498.     else:
  499.         st = copy.copy(GetLocalStorage())
  500.     __uthread__queue__.non_blocking_put( (str(ctx), callingContext, func, st, args, keywords,) )
  501.     return None
  502.  
  503. def Pool(ctx,func,*args,**keywords):
  504.     '''
  505.         executes apply(args,keywords) on a new uthread.  The uthread in question is taken
  506.         from a thread pool, rather than created one-per-shot call.  ctx is used as the
  507.         thread context.  This should generally be used for short-lived threads to reduce
  508.         overhead.
  509.     '''
  510.     return PoolWithoutTheStars(ctx,func,args,keywords)
  511.  
  512. def UnSafePool(ctx,func,*args,**keywords):
  513.     '''
  514.         uthread.pool, but without any dangerous calls to stackless.getcurrent(), which could
  515.         have dramatic and drastic effects in the wrong context.
  516.     '''
  517.     return PoolWithoutTheStars(ctx,func,args,keywords,1)
  518.  
  519. def ParallelHelper(ch,idx,what):
  520.     ei = None
  521.     try:
  522.         if len(what)==3:
  523.             ret = (idx, apply(what[0], what[1], what[2] ))
  524.             if ch.balance < 0 :
  525.                 ch.send( (1, ret) )
  526.         else:
  527.             ret = (idx, apply(what[0], what[1] ))
  528.             if ch.balance < 0:
  529.                 ch.send( (1, ret) )
  530.     except:
  531.         ei = sys.exc_info()
  532.  
  533.     if ei:
  534.         if ch.balance < 0:
  535.             ch.send((0,ei))
  536.     del ei
  537.  
  538. def Parallel(funcs,exceptionHandler=None,maxcount=30):
  539.     '''
  540.         Executes in parallel all the function calls specified in the list/tuple 'funcs', but returns the
  541.         return values in the order of the funcs list/tuple.  If an exception occurs, only the first exception
  542.         will reach you.  The rest will dissapear in a puff of logic.
  543.  
  544.         Each 'func' entry should be a tuple/list of:
  545.         1.  a function to call
  546.         2.  a tuple of arguments to call it with
  547.         3.  optionally, a dict of keyword args to call it with.
  548.     '''
  549.     if not funcs:
  550.         return
  551.  
  552.     context = "ParallelHelper::"+getattr(stackless.getcurrent(),"context","???")
  553.     ch = stackless.channel()
  554.     ret = [ None ] * len(funcs)
  555.     n = len(funcs)
  556.     if n > maxcount:
  557.         n = maxcount
  558.     for i in range(n):
  559.         if type(funcs[i]) != types.TupleType:
  560.             raise RuntimeError("Parallel requires a list/tuple of (function, args tuple, optional keyword dict,)")
  561.         Pool(context, ParallelHelper, ch, i, funcs[i])
  562.     for i in range(len(funcs)):
  563.         ok, bunch = ch.receive()
  564.         if ok:
  565.             idx,val = bunch
  566.             if len(funcs[i])==4:
  567.                 ret[idx] = (funcs[i][3], val,)
  568.             else:
  569.                 ret[idx] = val
  570.         else:
  571.             try:
  572.                 raise bunch[0],bunch[1],bunch[2]
  573.             except:
  574.                 if exceptionHandler:
  575.                     exctype, exc, tb = sys.exc_info()
  576.                     try:
  577.                         try:
  578.                             apply( exceptionHandler, (exc,) )
  579.                         except:
  580.                             raise exc, None, tb
  581.                     finally:
  582.                         exctype, exc, tb = None, None, None
  583.                 else:
  584.                     StackTrace()
  585.                     raise
  586.  
  587.         if n<len(funcs):
  588.             if type(funcs[n]) != types.TupleType:
  589.                 raise RuntimeError("Parallel requires a list/tuple of (function, args tuple, optional keyword dict,)")
  590.             Pool(context, ParallelHelper, ch, n, funcs[n])
  591.             n+=1
  592.     return ret
  593.  
  594.  
  595. exports = {
  596.     "uthread.parallel":                 Parallel,
  597.     "uthread.worker":                   PoolWorker,
  598.     "uthread.new":                      new,
  599.     "uthread.unsafepool":               UnSafePool,
  600.     "uthread.pool":                     Pool,
  601.     "uthread.poolWithoutTheStars":      PoolWithoutTheStars,
  602.     "uthread.newWithoutTheStars":       newWithoutTheStars,
  603.     "uthread.irandom":                  irandom,
  604.     "uthread.uniqueId":                 uniqueId,
  605.     "uthread.schedule":                 schedule,
  606.     "uthread.GetChannels":              GetChannels,
  607.     "uthread.GetSemaphores":            GetSemaphores,
  608.     "uthread.FNext":                    FNext,
  609.     }
  610.  
  611.