home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
OS/2 Shareware BBS: 10 Tools
/
10-Tools.zip
/
fnb101.zip
/
Lib
/
site-packages
/
Fnorb
/
orb
/
GIOPClientWorkerReactive.py
< prev
next >
Wrap
Text File
|
1999-06-28
|
10KB
|
328 lines
#!/usr/bin/env python
#############################################################################
# Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
# All Rights Reserved.
#
# The software contained on this media is the property of the DSTC Pty
# Ltd. Use of this software is strictly in accordance with the
# license agreement in the accompanying LICENSE.HTML file. If your
# distribution of this software does not contain a LICENSE.HTML file
# then you have no rights to use this software in any manner and
# should contact DSTC at the address below to determine an appropriate
# licensing arrangement.
#
# DSTC Pty Ltd
# Level 7, GP South
# Staff House Road
# University of Queensland
# St Lucia, 4072
# Australia
# Tel: +61 7 3365 4310
# Fax: +61 7 3365 4311
# Email: enquiries@dstc.edu.au
#
# This software is being provided "AS IS" without warranty of any
# kind. In no event shall DSTC Pty Ltd be liable for damage of any
# kind arising out of or in connection with the use or performance of
# this software.
#
# Project: Fnorb
# File: $Source: /units/arch/src/Fnorb/orb/RCS/GIOPClientWorkerReactive.py,v $
# Version: @(#)$RCSfile: GIOPClientWorkerReactive.py,v $ $Revision: 1.6 $
#
#############################################################################
""" GIOPClientWorkerReactive class. """
# Fnorb modules.
import CORBA, EventHandler, GIOPClientWorker, GIOPConnectionHandler, Reactor
class GIOPClientWorkerReactive(GIOPClientWorker.GIOPClientWorker,
EventHandler.EventHandler):
""" GIOPClientWorkerReactive class.
A concrete 'EventHandler' in the 'Reactor' pattern.
This class is *not* thread safe, but since the whole point of the 'Reactor'
pattern is to work in a single-threaded environment, I don't see this as
much of a problem ;^)
"""
def __init__(self, protocol, address):
""" Provide an interface to a remote object!
'protocol' is the protocol used by this worker.
'address' is the address of the remote object.
"""
# fixme: We don't need to keep these as instance variables... (maybe
# for unpickling?
self.__protocol = protocol
self.__address = address
# This flag is set when we get a CloseConnection message, or an
# exception event.
self.__closed = 0
# Each request has a unique id. associated with it.
self.__request_id = 0
# Queue of outgoing messages.
self.__outgoing = [] # [(RequestId, Message)]
# Dictionary of complete replies.
self.__replies = {} # {RequestId: (ReplyHeader, Cursor)}
# Ask the protocol to create a connection and actually connect to the
# remote object.
self.__connection = protocol.create_connection()
self.__connection.connect(self.__address)
# Create a handler to look after the connection.
self.__handler = GIOPConnectionHandler.GIOPConnectionHandler \
(self, self.__connection)
# Get a reference to the active Reactor.
#
# fixme: Should the reactor be obtained from the protocol?
self.__reactor = Reactor.Reactor_init()
# Register our interest in read events with the Reactor.
self.__reactor.register_handler(self, Reactor.READ)
return
def pseudo__del__(self):
""" Pseudo destructor to remove circular references.
This method is called from the 'pseudo__del__' of the GIOPClient that
we belong to.
"""
# Close down the worker.
self.__close()
# The handler holds a (circular) reference to this instance so we
# have to explicitly clean it up.
self.__handler.pseudo__del__()
# Clean up *our* reference to *it*!
del self.__handler
return
#########################################################################
# GIOPClientWorker interface.
#########################################################################
def send(self, request_id, message):
""" Send an operation request to the remote object.
This method WAITS for the message to be sent.
"""
# Add the outgoing message to the queue.
self.__outgoing.append((request_id, message))
# Register my interest in write events with the Reactor.
self.__reactor.register_handler(self, Reactor.WRITE)
# Get the reactor to process events until the message has been sent.
while 1:
for (id, message) in self.__outgoing:
if request_id == id:
break
# Else, the request has been sent!
else:
break
# Get the reactor to wait for and process a single event.
self.__reactor.do_one_event()
return
def recv(self, request_id):
""" Wait for a specific reply. """
# Get the reactor to process events until we either get a close event
# or we get a reply.
while 1:
# Have we got our reply yet?
try:
reply = self.__replies[request_id]
del self.__replies[request_id]
break
# Err, nope!
except KeyError:
pass
# Get the reactor to wait for and process a single event.
self.__reactor.do_one_event()
return reply
def poll(self, request_id):
""" Poll for a reply to a specific request. """
# Get the reactor to poll for a single event.
self.__reactor.do_one_event(0)
# Have we got our reply yet?
return self.__replies.has_key(request_id)
def peek(self, request_id):
""" Peek at the reply for the specified request.
This method does *not* delete the reply from the client's queue.
"""
# Get the reply.
return self.__replies[request_id]
def delete_reply(self, request_id):
""" Delete the reply with the specified request id. """
# Delete the reply.
del self.__replies[request_id]
return
def next_request_id(self):
""" Return the next request id. """
# fixme: This will get stupidly big oneday!!!!
self.__request_id = self.__request_id + 1
return self.__request_id
def is_closed(self):
""" Has the worker received a close event? """
return self.__closed
def close_connection(self):
""" Orderly shutdown, in repsonse to a 'CloseConnection' message. """
if not self.__closed:
# Close down the worker.
self.__close()
# We notify our 'GIOPClient' that a 'CloseConnection' message
# was received by raising a 'CORBA.TRANSIENT' system exception.
raise CORBA.TRANSIENT()
return
#########################################################################
# EventHandler interface.
#########################################################################
def handle_event(self, mask):
""" Callback method to handle all events except close events. """
# Read event.
if mask & Reactor.READ:
self.__handler.recv()
# Write event.
if mask & Reactor.WRITE:
# Get the message at the head of the outgoing queue.
(request_id, message) = self.__outgoing[0]
# Send it!
self.__handler.send(message)
# Exception event.
if mask & Reactor.EXCEPTION:
# Close down the worker.
self.__close()
# Barf!
raise CORBA.COMM_FAILURE() # System exception.
return
def handle_close(self):
""" Callback method to handle close events. """
# Close down the worker.
self.__close()
return
def handle(self):
""" Return my underlying I/O handle.
In this case, my I/O handle is the file descriptor of my socket.
"""
return self.__connection.handle()
#########################################################################
# GIOPConnectionHandlerListener interface.
#########################################################################
def message_received(self, message):
""" Called when a complete GIOP message has been received. """
# Code re-use! The protected method '_message_received' is implemented
# in the 'GIOPClientWorker' class.
#
# If the message is a GIOP Reply message, then this method will call
# our '_reply_received' method.
self._message_received(message)
return
def message_sent(self):
""" Called when a complete GIOP message has been sent. """
# Get the details of the message that has just been sent.
(request_id, message) = self.__outgoing[0]
# Remove the message from the outgoing queue.
del self.__outgoing[0]
# If there are no other messages left to send then tell the Reactor
# that I am no longer interested in write events.
if len(self.__outgoing) == 0:
self.__reactor.unregister_handler(self, Reactor.WRITE)
return
#########################################################################
# Protected interface.
#########################################################################
def _reply_received(self, reply_header, cursor):
""" Called by the '_message_received' method of GIOPClientWorker. """
# Add the reply to our dictionary of complete replies.
self.__replies[reply_header.request_id] = (reply_header, cursor)
return
#########################################################################
# Private interface.
#########################################################################
def __close(self):
""" Close down the worker. """
# Withdraw all of my Reactor registrations.
self.__reactor.unregister_handler(self, Reactor.ALL)
# Close our connection.
self.__connection.disconnect()
# All done!
self.__closed = 1
return
#############################################################################