home *** CD-ROM | disk | FTP | other *** search
- #!/usr/bin/env python
- #############################################################################
- # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
- # All Rights Reserved.
- #
- # The software contained on this media is the property of the DSTC Pty
- # Ltd. Use of this software is strictly in accordance with the
- # license agreement in the accompanying LICENSE.HTML file. If your
- # distribution of this software does not contain a LICENSE.HTML file
- # then you have no rights to use this software in any manner and
- # should contact DSTC at the address below to determine an appropriate
- # licensing arrangement.
- #
- # DSTC Pty Ltd
- # Level 7, GP South
- # Staff House Road
- # University of Queensland
- # St Lucia, 4072
- # Australia
- # Tel: +61 7 3365 4310
- # Fax: +61 7 3365 4311
- # Email: enquiries@dstc.edu.au
- #
- # This software is being provided "AS IS" without warranty of any
- # kind. In no event shall DSTC Pty Ltd be liable for damage of any
- # kind arising out of or in connection with the use or performance of
- # this software.
- #
- # Project: Fnorb
- # File: $Source: /units/arch/src/Fnorb/orb/RCS/GIOPClient.py,v $
- # Version: @(#)$RCSfile: GIOPClient.py,v $ $Revision: 1.9 $
- #
- #############################################################################
- """ GIOPClient class. """
-
-
- # Standard/built-in modules.
- import new
-
- # Fnorb modules.
- import fnorb_thread, CORBA, GIOP, GIOPClientWorker, IOP, OctetStream, Util
-
-
- class GIOPClient:
- """ GIOPClient class. """
-
- def __init__(self, protocol, address):
- """ Constructor.
-
- 'protocol' is the protocol creating this client.
- 'address' is the address of the remote object.
-
- """
- self.__protocol = protocol
- self.__address = address
-
- # We only get a worker when the first request is made.
- self.__worker = None
-
- # Mutex for the worker. Amongst other things, this makes sure that we
- # don't create more than one worker for the same client in
- # multi-threaded environments!
- self.__lk = fnorb_thread.allocate_lock()
-
- return
-
- def pseudo__del__(self):
- """ Pseudo destructor to remove circular references.
-
- This method is called by the GIOPClientManager when it determines that
- there are no more clients using this address.
-
- """
- # Clean up our worker (required because of circular references between
- # the worker and its connection handler).
- self.__lk.acquire()
- if self.__worker is not None:
- self.__worker.pseudo__del__()
- self.__worker = None
- self.__lk.release()
-
- return
-
- #########################################################################
- # GIOPClient interface.
- #########################################################################
-
- def synchronous(self, request, parameters):
- """ Send a 'synchronous' GIOP request message & wait for the reply. """
-
- # Get our worker.
- worker = self.__get_worker()
-
- # Get a unique request id.
- request_id = self.__worker.next_request_id()
-
- # Get the active object key.
- object_key = request.object()._fnorb_object_key()
-
- # Start a new GIOP message.
- message = OctetStream.GIOPMessage(type=GIOP.Request)
- cursor = message.cursor()
-
- # Create a GIOP request header.
- request_header = GIOP.RequestHeader(request.context(),
- request_id,
- 1, # Response expected.
- object_key,
- request.operation(),
- '')
-
- # Marshal the request header onto the octet stream.
- tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader:1.0')
- tc._fnorb_marshal_value(cursor, request_header)
-
- # Marshal the request parameters onto the octet stream.
- self.__marshal_parameters(cursor, request.inputs(), parameters)
-
- try:
- # Send the request.
- worker.send(request_id, message)
-
- # Wait for the reply.
- reply = worker.recv(request_id)
-
- # Unpack the reply.
- (reply_header, cursor) = reply
-
- # If we have received a 'LOCATION_FORWARD' message.
- if reply_header.reply_status == GIOP.LOCATION_FORWARD:
- # Unmarshal the IOR that contains the address to forward
- # operation requests to.
- ior = new.instance(IOP.IOR, {})
- ior._fnorb_unmarshal(cursor)
-
- # Update the object reference to use the forwarded IOR.
- request.object()._fnorb_forward(ior)
-
- # Try again!
- forwarded = 1
- result = None
-
- # The reply was *not* a 'LOCATION_FORWARD' so lets deal with it!
- else:
- forwarded = 0
- result = self.__process_reply(request, reply)
-
- # Retry on transient failures (this includes 'CloseConnection' messages
- # from the server.
- except CORBA.TRANSIENT, ex:
- forwarded = 1
- result = None
-
- # If a communication (ie. socket!) error occurred then see if we have
- # previously been 'forwarded'.
- except CORBA.COMM_FAILURE, ex:
- # If we *have* been 'forwarded' then try again using the object's
- # original address.
- if request.object()._fnorb_forwarded():
- # Update the object reference to use its original IOR.
- request.object()._fnorb_unforward()
-
- # Try again!
- forwarded = 1
- result = None
-
- # Otherwise, if we haven't been forwarded then give up!
- else:
- raise ex
-
- return (forwarded, result)
-
- def deferred(self, request, parameters):
- """ Create and send send a 'deferred' GIOP request message. """
-
- # Get our worker.
- worker = self.__get_worker()
-
- # Get a unique request id.
- request_id = self.__worker.next_request_id()
-
- # Get the active object key.
- object_key = request.object()._fnorb_object_key()
-
- # Start a new GIOP message.
- message = OctetStream.GIOPMessage(type=GIOP.Request)
- cursor = message.cursor()
-
- # Create a GIOP request header.
- request_header = GIOP.RequestHeader(request.context(),
- request_id,
- 1, # Response expected.
- object_key,
- request.operation(),
- '')
-
- # Marshal the request header onto the octet stream.
- tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader:1.0')
- tc._fnorb_marshal_value(cursor, request_header)
-
- # Marshal the request parameters onto the octet stream.
- self.__marshal_parameters(cursor, request.inputs(), parameters)
-
- # Send the request.
- try:
- self.__worker.send(request_header.request_id, message)
-
- # Retry on transient failures (this includes 'CloseConnection' messages
- # from the server.
- except CORBA.TRANSIENT:
- return self.deferred(request, parameters)
-
- return request_header.request_id
-
- def oneway(self, request, parameters):
- """ Create and send a 'oneway' GIOP request message. """
-
- # Get our worker.
- worker = self.__get_worker()
-
- # Get a unique request id.
- request_id = self.__worker.next_request_id()
-
- # Get the active object key.
- object_key = request.object()._fnorb_object_key()
-
- # Start a new GIOP message.
- message = OctetStream.GIOPMessage(type=GIOP.Request)
- cursor = message.cursor()
-
- # Create a GIOP request header.
- request_header = GIOP.RequestHeader(request.context(),
- request_id,
- 0, # No response expected.
- object_key,
- request.operation(),
- '')
-
- # Marshal the request header onto the octet stream.
- tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader:1.0')
- tc._fnorb_marshal_value(cursor, request_header)
-
- # Marshal the request parameters onto the octet stream.
- self.__marshal_parameters(cursor, request.inputs(), parameters)
-
- # Send the request.
- try:
- self.__worker.send(request_header.request_id, message)
-
- # Retry on transient failures (this includes 'CloseConnection' messages
- # from the server.
- except CORBA.TRANSIENT:
- return self.oneway(request, parameters)
-
- return
-
- def reply(self, request):
- """ Get the reply to a 'deferred' operation request. """
-
- # Get our worker.
- worker = self.__get_worker()
- try:
- # Wait for the reply.
- reply = worker.recv(request._fnorb_request_id())
-
- # Unpack the reply.
- (reply_header, cursor) = reply
-
- # If we have received a 'LOCATION_FORWARD' message.
- if reply_header.reply_status == GIOP.LOCATION_FORWARD:
- # Unmarshal the IOR that contains the address to forward
- # operation requests to.
- ior = new.instance(IOP.IOR, {})
- ior._fnorb_unmarshal(cursor)
-
- # Update the object reference to use the forwarded IOR.
- request.object()._fnorb_forward(ior)
-
- # Try again!
- forwarded = 1
- result = None
-
- # The reply was *not* a 'LOCATION_FORWARD' so lets deal with it!
- else:
- forwarded = 0
- result = self.__process_reply(request, reply)
-
- # Retry on transient failures (this includes 'CloseConnection' messages
- # from the server.
- except CORBA.TRANSIENT, ex:
- forwarded = 1
- result = None
-
- # If a communication (ie. socket!) error occurred then see if we have
- # previously been 'forwarded'.
- except CORBA.COMM_FAILURE, ex:
- # If we *have* been 'forwarded' then try again using the object's
- # original address.
- if request.object()._fnorb_forwarded():
- # Update the object reference to use its original IOR.
- request.object()._fnorb_unforward()
-
- # Try again!
- forwarded = 1
- result = None
-
- # Otherwise, if we haven't been forwarded then give up!
- else:
- raise ex
-
- return (forwarded, result)
-
- def poll(self, request):
- """ Has a reply been received for the specified request? """
-
- # Get our worker.
- worker = self.__get_worker()
-
- # Get the request id.
- request_id = request._fnorb_request_id()
- try:
- if worker.poll(request_id):
- # Take a peek at the reply.
- (reply_header, cursor) = worker.peek(request_id)
-
- # If we have received a 'LOCATION_FORWARD' message.
- if reply_header.reply_status == GIOP.LOCATION_FORWARD:
- # Delete the reply.
- worker.delete_reply(self.__request_id)
-
- # Unmarshal the IOR that contains the address to forward
- # operation requests to.
- ior = new.instance(IOP.IOR, {})
- ior._fnorb_unmarshal(cursor)
-
- # Update the object reference to use the forwarded IOR.
- self.__object._fnorb_forward(ior)
-
- # Try again!
- forwarded = 1
- result = 0
-
- # If a CORBA system exception occurred...
- elif reply_header.reply_status == GIOP.SYSTEM_EXCEPTION:
- # Unmarshal and raise the system exception.
- raise self.__unmarshal_system_exception(cursor)
-
- # The reply is either a 'NO_EXCEPTION' or a 'USER_EXCEPTION'
- # message.
- else:
- forwarded = 0
- result = 1
-
- # The reply has not arrived yet 8^(
- else:
- forwarded = 0
- result = 0
-
- # Retry on transient failures (this includes 'CloseConnection' messages
- # from the server.
- except CORBA.TRANSIENT, ex:
- forwarded = 1
- result = None
-
- # If a communication (ie. socket!) error occurred then see if we have
- # previously been 'forwarded'.
- except CORBA.COMM_FAILURE, ex:
- # If we *have* been 'forwarded' then try again using the object's
- # original address.
- if self.__object._fnorb_forwarded():
- # Update the object reference to use its original IOR.
- self.__object._fnorb_unforward()
-
- # Try again!
- forwarded = 1
- result = 0
-
- # Otherwise, if we haven't been forwarded then give up!
- else:
- raise ex
-
- return (forwarded, result)
-
- #########################################################################
- # Private interface.
- #########################################################################
-
- def __get_worker(self):
- """ Get our GIOP client worker. """
-
- self.__lk.acquire()
- try:
- # If this is the first operation request then get a worker.
- if self.__worker is None:
- # Get a reference to the factory for GIOP client workers.
- cwf = GIOPClientWorker.GIOPClientWorkerFactory_init()
-
- # Create a new GIOP client worker (the concrete type of which
- # will be determined by the threading model).
- self.__worker = cwf.create_worker(self.__protocol,
- self.__address)
-
- # If the worker has received a 'CloseConnection' message, then
- # we need a new one!
- elif self.__worker.is_closed():
- # Clean up the old worker.
- self.__worker.pseudo__del__()
-
- # Get a reference to the factory for GIOP client workers.
- cwf = GIOPClientWorker.GIOPClientWorkerFactory_init()
-
- # Create a new GIOP client worker (the concrete type of which
- # will be determined by the threading model).
- self.__worker = cwf.create_worker(self.__protocol,
- self.__address)
- # Return value.
- worker = self.__worker
-
- finally:
- self.__lk.release()
-
- return worker
-
- def __process_reply(self, request, (reply_header, cursor)):
- """ Process a GIOP reply. """
-
- # If a CORBA system exception occurred...
- if reply_header.reply_status == GIOP.SYSTEM_EXCEPTION:
- # Unmarshal and raise the system exception.
- raise self.__unmarshal_system_exception(cursor)
-
- # Else if a user exception occurred...
- elif reply_header.reply_status == GIOP.USER_EXCEPTION:
- # Get the repository id of the exception.
- repository_id = cursor.unmarshal('s')
-
- # Try to find a matching exception in the request's list of
- # exception typecodes.
- for typecode in request.exceptions():
- if typecode.id() == repository_id:
- break
-
- # If there is no matching exception then raise an UNKNOWN.
- else:
- raise CORBA.UNKNOWN() # System exception.
-
- # Unmarshal and raise the user exception.
- raise typecode._fnorb_unmarshal_value(cursor)
-
- # If we get here then the operation was successful!
- #
- # Unmarshal the return value and any 'inout' and 'out' parameters.
- return self.__unmarshal_results(cursor, request.outputs())
-
- def __marshal_parameters(self, cursor, typecodes, parameters):
- """ Marshal the parameters onto an octet stream. """
-
- # Marshal each parameter according to its typecode.
- i = 0
- for typecode in typecodes:
- typecode._fnorb_marshal_value(cursor, parameters[i])
- i = i + 1
-
- return
-
- def __unmarshal_results(self, cursor, typecodes):
- """ Unmarshal the result and any 'inout', and 'out' parameters. """
-
- # In the spirit of the ILU mapping, we have the following semantics:-
- #
- # If the operation has no outputs (ie. no result, 'inout', or 'out'
- # parameters) then we return 'None'.
- #
- # If the operation has exactly ONE output then we return just that
- # value.
- #
- # If the operation has more than one output then we return a TUPLE
- # with the result first followed by any 'inout' and 'out' parameters
- # in the order that they appear in the IDL definition.
- no_of_outputs = len(typecodes)
-
- if no_of_outputs == 0:
- results = None
-
- elif no_of_outputs == 1:
- results = typecodes[0]._fnorb_unmarshal_value(cursor)
-
- else:
- outputs = [None] * no_of_outputs
- for i in range(no_of_outputs):
- outputs[i] = typecodes[i]._fnorb_unmarshal_value(cursor)
-
- results = tuple(outputs)
-
- return results
-
- def __unmarshal_system_exception(self, cursor):
- """ Unmarshal a system exception from an octet stream. """
-
- # Unmarshal the repository id of the system exception.
- intrep_id = Util.RepositoryId(cursor.unmarshal('s'))
-
- # Get the scoped name from the repository id.
- scoped_name = intrep_id.scoped_name()
-
- # The last two components of the scoped name make up the name of the
- # Python class that represents the system exception.
- #
- # e.g. For the COMM_FAILURE exception, the repository id is:-
- #
- # IDL:omg.org/CORBA/COMM_FAILURE:1.0
- #
- # The scoped name is therefore:-
- #
- # omg.org/CORBA/COMM_FAILURE
- #
- # And the Python class name is:-
- #
- # CORBA.COMM_FAILURE
- klass = eval(scoped_name[-2:].join('.'))
-
- # Create an uninitialised exception instance, and then unmarshal the
- # exception details!
- ex = new.instance(klass, {})
- ex._fnorb_unmarshal(cursor)
-
- return ex
-
- #############################################################################
-