home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
OS/2 Shareware BBS: 10 Tools
/
10-Tools.zip
/
fnb101.zip
/
Lib
/
site-packages
/
Fnorb
/
orb
/
condvar.py
< prev
next >
Wrap
Text File
|
1999-06-28
|
10KB
|
360 lines
#############################################################################
# Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1995, 1996, 1997, 1998, 1999
# Unpublished work. All Rights Reserved.
#
# The software contained on this media is the property of the
# DSTC Pty Ltd. Use of this software is strictly in accordance
# with the license agreement in the accompanying LICENSE.DOC file.
# If your distribution of this software does not contain a
# LICENSE.DOC file then you have no rights to use this software
# in any manner and should contact DSTC at the address below
# to determine an appropriate licensing arrangement.
#
# DSTC Pty Ltd
# Level 7, GP South
# Staff House Road
# University of Queensland
# St Lucia, 4072
# Australia
# Tel: +61 7 3365 4310
# Fax: +61 7 3365 4311
# Email: enquiries@dstc.edu.au
#
# This software is being provided "AS IS" without warranty of
# any kind. In no event shall DSTC Pty Ltd be liable for
# damage of any kind arising out of or in connection with
# the use or performance of this software.
#
# Project: Hector
# File: $Source: /units/arch/src/Fnorb/orb/RCS/condvar.py,v $
#
#############################################################################
"""
Condition variables
"""
#############################################################################
import signal, thread, time
#############################################################################
class condvar:
"""Condition variables.
A condition variable class with a built-in mutex. To use,
create and associate a variable with it. Others, interested in
the state of that variable, can "wait()" on the condvar and
will be suspended, to be resumed whe another thread "signal()"s
or "broadcast()"s.
You must have acquired the condvar prior to any of "release()",
"signal()" or "broadcast()". All of these operations release
the acquired mutex.
You must also have acquired the condvar prior to "wait()"ing or
"timedwait()"ing, which release and then reacquire the
mutex."""
#-- timedwait timer table
d_timer = {}
def __init__(self):
"""Create a new condition variable."""
self.lk = thread.allocate_lock()
self.q_waiting = []
def __del__(self):
"""Destroy a condition variable."""
self.lk.acquire()
for thr in self.q_waiting:
thr.release()
def acquire(self):
"""Acquire the lock on a condition variable.
Return value none
Pre-Locking caller must NOT hold the CV lock
Post-Locking caller holds the CV lock
Condition variables must be locked before calling "signal()",
"broadcast()", "wait()", "timedwait()" or "release()"."""
self.lk.acquire()
def release(self):
"""Release the currently held lock on a condition variable.
Return value none
Pre-Locking caller must hold the CV lock
Post-Locking callers lock is released
The condition variable lock must be currently held by the
caller."""
self.lk.release()
def signal(self):
"""Inform the first waiting thread that the condition is altered.
Return value none
Pre-Locking caller MUST hold the CV lock
Post-Locking callers lock is released
The first thread queued awaiting changes in the condition is
unblocked. Note that the condition variable lock must be held
by the caller."""
#-- check for waiting threads
if len(self.q_waiting) > 0:
#-- remove head of waiting queue
thr = self.q_waiting[0]
self.q_waiting = self.q_waiting[1:]
#-- remove timeout record (if present)
if self.__class__.d_timer.has_key(id(thr)):
del self.__class__.d_timer[id(thr)]
#-- unblock thread
thr.release()
#-- unlock CV
self.lk.release()
def broadcast(self):
"""Inform all waiting threads that the condition is altered.
Return value none
Pre-Locking caller MUST hold the CV lock
Post-Locking callers lock is released
All threads queued awaiting a change in the condition are
unblocked. Note that the condition variable lock must be held
by the caller."""
for thr in self.q_waiting:
#-- remove timeout record (if present)
if self.__class__.d_timer.has_key(id(thr)):
del self.__class__.d_timer[id(thr)]
#-- unblock thread
thr.release()
#-- reset waiting queue
self.q_waiting = []
#-- unlock CV
self.lk.release()
def wait(self):
"""Wait for notification of a change in the condition.
Return value none
Pre-Locking caller MUST hold the CV lock
Post-Locking caller holds the CV lock (see note)
The caller is blocked until notified of a change in the
condition (from either "signal()" or "broadcast()").
Note: the caller must hold the condition variable lock,
which will be released and reacquired before returning."""
#-- create the blocking lock and add to the CV queue
lk = thread.allocate_lock()
lk.acquire()
self.q_waiting.append(lk)
#-- release the CV lock
self.lk.release()
#-- block the thread
lk.acquire()
#-- reacquire the CV lock
self.lk.acquire()
def timedwait(self, timeout = 0):
"""Wait for notification of a change in the condition with a timeout.
"timeout" period to wait, in seconds. 0 means return
immediately.
Return value 0 - a change has been notified
-1 - the timeout period expired
Exceptions ?
Pre-Locking caller MUST hold the CV lock
Post-Locking caller holds the CV lock (see note)
The caller is blocked until notified of a change in the
condition (from "signal()" or "broadcast()") or the expiry of
the timeout period.
Note: the caller must hold the condition variable lock,
which will be released and reacquired before returning."""
res = 0
#-- create the blocking lock and add to the CV queue
lk = thread.allocate_lock()
lk.acquire()
self.q_waiting.append(lk)
#-- add this thread to the set of timers
lk_id = id(lk)
self.__class__.d_timer[lk_id] = lk
thread.start_new_thread(self._wakeup, (lk_id, timeout))
#-- release the CV lock
self.lk.release()
#-- block the thread
lk.acquire()
#-- reacquire the CV lock
self.lk.acquire()
#-- check to see if we timed out
if self.__class__.d_timer.has_key(lk_id):
res = -1
del self.__class__.d_timer[lk_id]
return res
def _wakeup(self, lk_id, timeout):
"""Wakeup a timedwait on a condition variable.
The workings of "timedwait()" are a little obscure: when a
thread calls "timedwait()" an entry is placed into the timer
table "d_timer" which is shared by all instances of the class.
This entry is keyed by the ID of the lock used to suspend the
thread, and contains the lock instance.
When a thread is woken by "signal()" or "broadcast()" this
entry is deleted. Note that it will only be there if
"timedwait()" has been called.
"timedwait()" checks for the presence of the table entry when
it is unblocked. If it is not there, then the thread was
woken by a "signal()" or "broadcast()" (which will have
deleted it).
"_wakeup()" unblocks the thread using the lock instance in the
timer table, but does not delete the entry. When
"timedwait()" is unblocked then, it notices that the timer
table entry is still there, and can determine that it was
unblocked by the timer expiry, not a "signal()" or
"broadcast()".
ok?"""
time.sleep(timeout)
#-- check if thread is still blocked
if self.__class__.d_timer.has_key(lk_id):
#-- unblock thread
self.__class__.d_timer[lk_id].release()
thread.exit()
#############################################################################
def cvtest():
"""Test function."""
cv = condvar()
var = 1
print "master: starting threads."
for id in [1, 2, 3, 4, 5]:
thread.start_new_thread(tester, (cv, id))
time.sleep(0)
print
time.sleep(5)
print
print "master: about to signal"
cv.acquire()
cv.signal()
time.sleep(5)
print
print "master: about to broadcast"
cv.acquire()
cv.broadcast()
time.sleep(5)
print
print "master: starting timedwait threads."
print
thread.start_new_thread(time_tester, (cv, 6, 0))
thread.start_new_thread(time_tester, (cv, 7, 3))
thread.start_new_thread(time_tester, (cv, 8, 10))
thread.start_new_thread(time_tester, (cv, 9, 15))
time.sleep(5)
cv.acquire()
print "master: about to broadcast."
cv.broadcast()
time.sleep(5)
print
print "press C-c to exit."
print
signal.pause()
def tester(cv, id):
print "thread %d: Started tester." % (id)
print "thread %d: About to wait." % (id)
cv.acquire()
cv.wait()
cv.release()
print "thread %d: Released." % (id)
thread.exit()
def time_tester(cv, id, timeout):
print "thread %d: Started time tester - waiting %d secs." % (id, timeout)
print "thread %d: About to wait." % (id)
cv.acquire()
if cv.timedwait(timeout) == 0:
print "thread %d: Signalled." % (id)
else:
print "thread %d: Timeout expired." % (id)
cv.release()
thread.exit()
#############################################################################
if __name__ == "__main__":
cvtest()
#############################################################################