home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
OS/2 Shareware BBS: 10 Tools
/
10-Tools.zip
/
fnb101.zip
/
Lib
/
site-packages
/
Fnorb
/
orb
/
GIOPClient.py
< prev
next >
Wrap
Text File
|
1999-06-28
|
16KB
|
530 lines
#!/usr/bin/env python
#############################################################################
# Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
# All Rights Reserved.
#
# The software contained on this media is the property of the DSTC Pty
# Ltd. Use of this software is strictly in accordance with the
# license agreement in the accompanying LICENSE.HTML file. If your
# distribution of this software does not contain a LICENSE.HTML file
# then you have no rights to use this software in any manner and
# should contact DSTC at the address below to determine an appropriate
# licensing arrangement.
#
# DSTC Pty Ltd
# Level 7, GP South
# Staff House Road
# University of Queensland
# St Lucia, 4072
# Australia
# Tel: +61 7 3365 4310
# Fax: +61 7 3365 4311
# Email: enquiries@dstc.edu.au
#
# This software is being provided "AS IS" without warranty of any
# kind. In no event shall DSTC Pty Ltd be liable for damage of any
# kind arising out of or in connection with the use or performance of
# this software.
#
# Project: Fnorb
# File: $Source: /units/arch/src/Fnorb/orb/RCS/GIOPClient.py,v $
# Version: @(#)$RCSfile: GIOPClient.py,v $ $Revision: 1.9 $
#
#############################################################################
""" GIOPClient class. """
# Standard/built-in modules.
import new
# Fnorb modules.
import fnorb_thread, CORBA, GIOP, GIOPClientWorker, IOP, OctetStream, Util
class GIOPClient:
""" GIOPClient class. """
def __init__(self, protocol, address):
""" Constructor.
'protocol' is the protocol creating this client.
'address' is the address of the remote object.
"""
self.__protocol = protocol
self.__address = address
# We only get a worker when the first request is made.
self.__worker = None
# Mutex for the worker. Amongst other things, this makes sure that we
# don't create more than one worker for the same client in
# multi-threaded environments!
self.__lk = fnorb_thread.allocate_lock()
return
def pseudo__del__(self):
""" Pseudo destructor to remove circular references.
This method is called by the GIOPClientManager when it determines that
there are no more clients using this address.
"""
# Clean up our worker (required because of circular references between
# the worker and its connection handler).
self.__lk.acquire()
if self.__worker is not None:
self.__worker.pseudo__del__()
self.__worker = None
self.__lk.release()
return
#########################################################################
# GIOPClient interface.
#########################################################################
def synchronous(self, request, parameters):
""" Send a 'synchronous' GIOP request message & wait for the reply. """
# Get our worker.
worker = self.__get_worker()
# Get a unique request id.
request_id = self.__worker.next_request_id()
# Get the active object key.
object_key = request.object()._fnorb_object_key()
# Start a new GIOP message.
message = OctetStream.GIOPMessage(type=GIOP.Request)
cursor = message.cursor()
# Create a GIOP request header.
request_header = GIOP.RequestHeader(request.context(),
request_id,
1, # Response expected.
object_key,
request.operation(),
'')
# Marshal the request header onto the octet stream.
tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader:1.0')
tc._fnorb_marshal_value(cursor, request_header)
# Marshal the request parameters onto the octet stream.
self.__marshal_parameters(cursor, request.inputs(), parameters)
try:
# Send the request.
worker.send(request_id, message)
# Wait for the reply.
reply = worker.recv(request_id)
# Unpack the reply.
(reply_header, cursor) = reply
# If we have received a 'LOCATION_FORWARD' message.
if reply_header.reply_status == GIOP.LOCATION_FORWARD:
# Unmarshal the IOR that contains the address to forward
# operation requests to.
ior = new.instance(IOP.IOR, {})
ior._fnorb_unmarshal(cursor)
# Update the object reference to use the forwarded IOR.
request.object()._fnorb_forward(ior)
# Try again!
forwarded = 1
result = None
# The reply was *not* a 'LOCATION_FORWARD' so lets deal with it!
else:
forwarded = 0
result = self.__process_reply(request, reply)
# Retry on transient failures (this includes 'CloseConnection' messages
# from the server.
except CORBA.TRANSIENT, ex:
forwarded = 1
result = None
# If a communication (ie. socket!) error occurred then see if we have
# previously been 'forwarded'.
except CORBA.COMM_FAILURE, ex:
# If we *have* been 'forwarded' then try again using the object's
# original address.
if request.object()._fnorb_forwarded():
# Update the object reference to use its original IOR.
request.object()._fnorb_unforward()
# Try again!
forwarded = 1
result = None
# Otherwise, if we haven't been forwarded then give up!
else:
raise ex
return (forwarded, result)
def deferred(self, request, parameters):
""" Create and send send a 'deferred' GIOP request message. """
# Get our worker.
worker = self.__get_worker()
# Get a unique request id.
request_id = self.__worker.next_request_id()
# Get the active object key.
object_key = request.object()._fnorb_object_key()
# Start a new GIOP message.
message = OctetStream.GIOPMessage(type=GIOP.Request)
cursor = message.cursor()
# Create a GIOP request header.
request_header = GIOP.RequestHeader(request.context(),
request_id,
1, # Response expected.
object_key,
request.operation(),
'')
# Marshal the request header onto the octet stream.
tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader:1.0')
tc._fnorb_marshal_value(cursor, request_header)
# Marshal the request parameters onto the octet stream.
self.__marshal_parameters(cursor, request.inputs(), parameters)
# Send the request.
try:
self.__worker.send(request_header.request_id, message)
# Retry on transient failures (this includes 'CloseConnection' messages
# from the server.
except CORBA.TRANSIENT:
return self.deferred(request, parameters)
return request_header.request_id
def oneway(self, request, parameters):
""" Create and send a 'oneway' GIOP request message. """
# Get our worker.
worker = self.__get_worker()
# Get a unique request id.
request_id = self.__worker.next_request_id()
# Get the active object key.
object_key = request.object()._fnorb_object_key()
# Start a new GIOP message.
message = OctetStream.GIOPMessage(type=GIOP.Request)
cursor = message.cursor()
# Create a GIOP request header.
request_header = GIOP.RequestHeader(request.context(),
request_id,
0, # No response expected.
object_key,
request.operation(),
'')
# Marshal the request header onto the octet stream.
tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader:1.0')
tc._fnorb_marshal_value(cursor, request_header)
# Marshal the request parameters onto the octet stream.
self.__marshal_parameters(cursor, request.inputs(), parameters)
# Send the request.
try:
self.__worker.send(request_header.request_id, message)
# Retry on transient failures (this includes 'CloseConnection' messages
# from the server.
except CORBA.TRANSIENT:
return self.oneway(request, parameters)
return
def reply(self, request):
""" Get the reply to a 'deferred' operation request. """
# Get our worker.
worker = self.__get_worker()
try:
# Wait for the reply.
reply = worker.recv(request._fnorb_request_id())
# Unpack the reply.
(reply_header, cursor) = reply
# If we have received a 'LOCATION_FORWARD' message.
if reply_header.reply_status == GIOP.LOCATION_FORWARD:
# Unmarshal the IOR that contains the address to forward
# operation requests to.
ior = new.instance(IOP.IOR, {})
ior._fnorb_unmarshal(cursor)
# Update the object reference to use the forwarded IOR.
request.object()._fnorb_forward(ior)
# Try again!
forwarded = 1
result = None
# The reply was *not* a 'LOCATION_FORWARD' so lets deal with it!
else:
forwarded = 0
result = self.__process_reply(request, reply)
# Retry on transient failures (this includes 'CloseConnection' messages
# from the server.
except CORBA.TRANSIENT, ex:
forwarded = 1
result = None
# If a communication (ie. socket!) error occurred then see if we have
# previously been 'forwarded'.
except CORBA.COMM_FAILURE, ex:
# If we *have* been 'forwarded' then try again using the object's
# original address.
if request.object()._fnorb_forwarded():
# Update the object reference to use its original IOR.
request.object()._fnorb_unforward()
# Try again!
forwarded = 1
result = None
# Otherwise, if we haven't been forwarded then give up!
else:
raise ex
return (forwarded, result)
def poll(self, request):
""" Has a reply been received for the specified request? """
# Get our worker.
worker = self.__get_worker()
# Get the request id.
request_id = request._fnorb_request_id()
try:
if worker.poll(request_id):
# Take a peek at the reply.
(reply_header, cursor) = worker.peek(request_id)
# If we have received a 'LOCATION_FORWARD' message.
if reply_header.reply_status == GIOP.LOCATION_FORWARD:
# Delete the reply.
worker.delete_reply(self.__request_id)
# Unmarshal the IOR that contains the address to forward
# operation requests to.
ior = new.instance(IOP.IOR, {})
ior._fnorb_unmarshal(cursor)
# Update the object reference to use the forwarded IOR.
self.__object._fnorb_forward(ior)
# Try again!
forwarded = 1
result = 0
# If a CORBA system exception occurred...
elif reply_header.reply_status == GIOP.SYSTEM_EXCEPTION:
# Unmarshal and raise the system exception.
raise self.__unmarshal_system_exception(cursor)
# The reply is either a 'NO_EXCEPTION' or a 'USER_EXCEPTION'
# message.
else:
forwarded = 0
result = 1
# The reply has not arrived yet 8^(
else:
forwarded = 0
result = 0
# Retry on transient failures (this includes 'CloseConnection' messages
# from the server.
except CORBA.TRANSIENT, ex:
forwarded = 1
result = None
# If a communication (ie. socket!) error occurred then see if we have
# previously been 'forwarded'.
except CORBA.COMM_FAILURE, ex:
# If we *have* been 'forwarded' then try again using the object's
# original address.
if self.__object._fnorb_forwarded():
# Update the object reference to use its original IOR.
self.__object._fnorb_unforward()
# Try again!
forwarded = 1
result = 0
# Otherwise, if we haven't been forwarded then give up!
else:
raise ex
return (forwarded, result)
#########################################################################
# Private interface.
#########################################################################
def __get_worker(self):
""" Get our GIOP client worker. """
self.__lk.acquire()
try:
# If this is the first operation request then get a worker.
if self.__worker is None:
# Get a reference to the factory for GIOP client workers.
cwf = GIOPClientWorker.GIOPClientWorkerFactory_init()
# Create a new GIOP client worker (the concrete type of which
# will be determined by the threading model).
self.__worker = cwf.create_worker(self.__protocol,
self.__address)
# If the worker has received a 'CloseConnection' message, then
# we need a new one!
elif self.__worker.is_closed():
# Clean up the old worker.
self.__worker.pseudo__del__()
# Get a reference to the factory for GIOP client workers.
cwf = GIOPClientWorker.GIOPClientWorkerFactory_init()
# Create a new GIOP client worker (the concrete type of which
# will be determined by the threading model).
self.__worker = cwf.create_worker(self.__protocol,
self.__address)
# Return value.
worker = self.__worker
finally:
self.__lk.release()
return worker
def __process_reply(self, request, (reply_header, cursor)):
""" Process a GIOP reply. """
# If a CORBA system exception occurred...
if reply_header.reply_status == GIOP.SYSTEM_EXCEPTION:
# Unmarshal and raise the system exception.
raise self.__unmarshal_system_exception(cursor)
# Else if a user exception occurred...
elif reply_header.reply_status == GIOP.USER_EXCEPTION:
# Get the repository id of the exception.
repository_id = cursor.unmarshal('s')
# Try to find a matching exception in the request's list of
# exception typecodes.
for typecode in request.exceptions():
if typecode.id() == repository_id:
break
# If there is no matching exception then raise an UNKNOWN.
else:
raise CORBA.UNKNOWN() # System exception.
# Unmarshal and raise the user exception.
raise typecode._fnorb_unmarshal_value(cursor)
# If we get here then the operation was successful!
#
# Unmarshal the return value and any 'inout' and 'out' parameters.
return self.__unmarshal_results(cursor, request.outputs())
def __marshal_parameters(self, cursor, typecodes, parameters):
""" Marshal the parameters onto an octet stream. """
# Marshal each parameter according to its typecode.
i = 0
for typecode in typecodes:
typecode._fnorb_marshal_value(cursor, parameters[i])
i = i + 1
return
def __unmarshal_results(self, cursor, typecodes):
""" Unmarshal the result and any 'inout', and 'out' parameters. """
# In the spirit of the ILU mapping, we have the following semantics:-
#
# If the operation has no outputs (ie. no result, 'inout', or 'out'
# parameters) then we return 'None'.
#
# If the operation has exactly ONE output then we return just that
# value.
#
# If the operation has more than one output then we return a TUPLE
# with the result first followed by any 'inout' and 'out' parameters
# in the order that they appear in the IDL definition.
no_of_outputs = len(typecodes)
if no_of_outputs == 0:
results = None
elif no_of_outputs == 1:
results = typecodes[0]._fnorb_unmarshal_value(cursor)
else:
outputs = [None] * no_of_outputs
for i in range(no_of_outputs):
outputs[i] = typecodes[i]._fnorb_unmarshal_value(cursor)
results = tuple(outputs)
return results
def __unmarshal_system_exception(self, cursor):
""" Unmarshal a system exception from an octet stream. """
# Unmarshal the repository id of the system exception.
intrep_id = Util.RepositoryId(cursor.unmarshal('s'))
# Get the scoped name from the repository id.
scoped_name = intrep_id.scoped_name()
# The last two components of the scoped name make up the name of the
# Python class that represents the system exception.
#
# e.g. For the COMM_FAILURE exception, the repository id is:-
#
# IDL:omg.org/CORBA/COMM_FAILURE:1.0
#
# The scoped name is therefore:-
#
# omg.org/CORBA/COMM_FAILURE
#
# And the Python class name is:-
#
# CORBA.COMM_FAILURE
klass = eval(scoped_name[-2:].join('.'))
# Create an uninitialised exception instance, and then unmarshal the
# exception details!
ex = new.instance(klass, {})
ex._fnorb_unmarshal(cursor)
return ex
#############################################################################