home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
OS/2 Shareware BBS: 10 Tools
/
10-Tools.zip
/
fnb101.zip
/
Lib
/
site-packages
/
Fnorb
/
orb
/
GIOPClientWorkerThreaded.py
< prev
next >
Wrap
Text File
|
1999-06-28
|
11KB
|
393 lines
#!/usr/bin/env python
#############################################################################
# Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
# All Rights Reserved.
#
# The software contained on this media is the property of the DSTC Pty
# Ltd. Use of this software is strictly in accordance with the
# license agreement in the accompanying LICENSE.HTML file. If your
# distribution of this software does not contain a LICENSE.HTML file
# then you have no rights to use this software in any manner and
# should contact DSTC at the address below to determine an appropriate
# licensing arrangement.
#
# DSTC Pty Ltd
# Level 7, GP South
# Staff House Road
# University of Queensland
# St Lucia, 4072
# Australia
# Tel: +61 7 3365 4310
# Fax: +61 7 3365 4311
# Email: enquiries@dstc.edu.au
#
# This software is being provided "AS IS" without warranty of any
# kind. In no event shall DSTC Pty Ltd be liable for damage of any
# kind arising out of or in connection with the use or performance of
# this software.
#
# Project: Fnorb
# File: $Source: /units/arch/src/Fnorb/orb/RCS/GIOPClientWorkerThreaded.py,v $
# Version: @(#)$RCSfile: GIOPClientWorkerThreaded.py,v $ $Revision: 1.7 $
#
#############################################################################
""" GIOPClientWorkerThreaded class. """
# Standard/built-in modules.
import thread
# Fnorb modules.
import CORBA, GIOPClientWorker, GIOPConnectionHandler
# Fnorb threading modules.
import condvar
class GIOPClientWorkerThreaded(GIOPClientWorker.GIOPClientWorker):
""" GIOPClientWorkerThreaded class. """
def __init__(self, protocol, address):
""" Provide an interface to a remote object!
'protocol' is the protocol used by the client.
'address' is the address of the remote object.
"""
# fixme: We don't need to keep these as instance variables... (maybe
# for unpickling?),
self.__protocol = protocol
self.__address = address
# This flag is set when we get a CloseConnection message, or an
# exception event.
self.__closed = 0
# Each request has a unique id. associated with it.
self.__request_id = 0
# Mutex to make access to the request id and the closed flag
# thread-safe.
self.__lk = thread.allocate_lock()
# Condition variable for outgoing messages.
self.__outgoing_cv = condvar.condvar()
# Dictionary of condition variables for threads that are blocked
# waiting for replies.
self.__blocked_pending = {} # {RequestId: condvar}
# Dictionary of complete replies.
self.__replies = {} # {RequestId: Message}
# Mutex for incoming messages.
self.__incoming_lk = thread.allocate_lock()
# Ask the protocol to create a connection and actually connect to the
# remote object.
self.__connection = protocol.create_connection()
self.__connection.connect(self.__address)
# Set the connection to BLOCKING mode.
self.__connection.blocking(1)
# Create a handler to look after the connection.
self.__handler = GIOPConnectionHandler.GIOPConnectionHandler \
(self, self.__connection)
# Start the read thread.
thread.start_new_thread(self.__read_thread, ())
return
def pseudo__del__(self):
""" Pseudo destructor to remove circular references. """
# Close down the worker.
self.__close()
# The handler holds a (circular) reference to this instance so we
# have to explicitly clean it up.
self.__handler.pseudo__del__()
# Clean up *our* reference to *it*!
del self.__handler
return
#########################################################################
# GIOPClientWorker interface.
#########################################################################
def send(self, request_id, message):
""" Send an operation request to the remote object.
This method BLOCKS until the message has been sent.
"""
# Block until the connection is available.
self.__outgoing_cv.acquire()
try:
# Send the entire message.
n = 0
while n < len(message):
n = n + self.__handler.send(message)
except CORBA.COMM_FAILURE, ex:
# Let somebody else have a go ;^)
self.__outgoing_cv.release()
# If the connection has been closed then we can try again.
if self.is_closed():
raise CORBA.TRANSIENT()
else:
raise ex
# Let somebody else have a go ;^)
self.__outgoing_cv.release()
return
def recv(self, request_id):
""" Wait for a specific reply. """
# Make sure that we aren't interfering with the read thread.
self.__incoming_lk.acquire()
# See if the reply has already arrived.
reply = self.__replies.get(request_id)
if reply is None:
# Create a new condition variable.
cv = condvar.condvar()
# Add the condition variable to the dictionary of blocked threads.
# The read thread will 'signal' the condition variable (if and)
# when the corresponding reply is received.
self.__blocked_pending[request_id] = cv
# Acquire the condition variable so that if the read thread
# receives the reply before we have called 'wait', it will be
# blocked.
cv.acquire()
# Let the read thread do its thang!
self.__incoming_lk.release()
# Wait on the condition variable ('wait' releases the lock on the
# condition variable first).
cv.wait()
cv.release()
self.__incoming_lk.acquire()
reply = self.__replies[request_id]
del self.__replies[request_id]
self.__incoming_lk.release()
# If the condition variable was signalled because we received a
# 'CloseConnection' message, then the reply will be a
# 'CORBA.TRANSIENT' exception. If the condition variable was
# signalled because of some other failure then the reply will be
# an instance of the appropriate CORBA system exception.
if isinstance(reply, CORBA.SystemException):
raise reply
# The reply has already arrived - now that's what I call service ;^)
else:
del self.__replies[request_id]
self.__incoming_lk.release()
return reply
def poll(self, request_id):
""" Poll for a reply to a specific request. """
# Have we got our reply yet
self.__incoming_lk.acquire()
result = self.__replies.has_key(request_id)
self.__incoming_lk.release()
return result
def peek(self, request_id):
""" Peek at the reply for the specified request.
This method does *not* delete the reply from the client's queue.
"""
# Get the reply.
self.__incoming_lk.acquire()
reply = self.__replies[request_id]
self.__incoming_lk.release()
return reply
def delete_reply(self, request_id):
""" Delete the reply with the specified request id. """
# Delete the reply.
self.__incoming_lk.acquire()
del self.__replies[request_id]
self.__incoming_lk.release()
return
def next_request_id(self):
""" Return the next request id. """
# fixme: This will get stupidly big oneday!!!!
self.__lk.acquire()
request_id = self.__request_id = self.__request_id + 1
self.__lk.release()
return request_id
def is_closed(self):
""" Has the worker received a close event? """
self.__lk.acquire()
closed = self.__closed
self.__lk.release()
return closed
def close_connection(self):
""" Close down the connection.
This method is only ever called from the read thread.
"""
# Unblock all threads that are waiting for replies.
self.__unblock_all(CORBA.TRANSIENT())
# Close down the worker.
self.__close()
return
#########################################################################
# GIOPConnectionHandlerListener interface.
#########################################################################
def message_received(self, message):
""" Called when a complete GIOP message has been received. """
# Code re-use! The protected method '_message_received' is implemented
# in the 'GIOPClientWorker' class.
#
# If the message is a GIOP Reply message, then this method will call
# our '_reply_received' method.
self._message_received(message)
return
def message_sent(self):
""" Called when a complete GIOP message has been sent.
In the threaded worker, we block until the message has been sent
anyway, so we don't have anything to do here!
"""
pass
#########################################################################
# Protected interface.
#########################################################################
def _reply_received(self, reply_header, cursor):
""" Called by the '_message_received' method of GIOPClientWorker. """
self.__incoming_lk.acquire()
# Add the reply to our dictionary of complete replies.
self.__replies[reply_header.request_id] = (reply_header, cursor)
# See if a thread is blocked waiting for this reply.
cv = self.__blocked_pending.get(reply_header.request_id)
if cv is not None:
# Signal the blocked thread.
cv.acquire()
cv.signal()
# Clean up.
del self.__blocked_pending[reply_header.request_id]
self.__incoming_lk.release()
return
#########################################################################
# Private interface.
#########################################################################
def __read_thread(self):
""" Read messages from the server. """
try:
while not self.is_closed():
# Blocking receive.
self.__handler.recv()
# If we have received a 'CloseConnection' message, then we notify
# the GIOPClient by raising a 'TRANSIENT' system exception.
ex = CORBA.TRANSIENT()
except CORBA.SystemException, ex:
# If we have received a 'CloseConnection' message, then we notify
# the GIOPClient by raising a 'TRANSIENT' system exception.
if self.is_closed():
ex = CORBA.TRANSIENT()
# Otherwise, it was some other system exception, so close the
# connection down.
else:
self.__close()
# Unblock all threads that are waiting for replies.
self.__unblock_all(ex)
# Explicitly exit the thread!
thread.exit()
def __close(self):
""" Close down the worker. """
self.__lk.acquire()
if not self.__closed:
# All done!
self.__closed = 1
self.__lk.release()
# Close our connection.
self.__connection.disconnect()
else:
self.__lk.release()
return
def __unblock_all(self, exception):
""" Unblock all waiting threads with the specified exception. """
self.__incoming_lk.acquire()
# Set the reply for each blocked thread to be the specified exception.
for request_id in self.__blocked_pending.keys():
self.__replies[request_id] = exception
# Get a list of condition variables to signal.
blocked_pending = self.__blocked_pending.values()
# Clean them up!
self.__blocked_pending = {}
self.__incoming_lk.release()
# Unblock each thread.
for cv in blocked_pending:
cv.acquire()
cv.signal()
return
#############################################################################