home *** CD-ROM | disk | FTP | other *** search
/ OS/2 Shareware BBS: 10 Tools / 10-Tools.zip / fnb101.zip / Lib / site-packages / Fnorb / orb / GIOPClient.py < prev    next >
Text File  |  1999-06-28  |  16KB  |  530 lines

  1. #!/usr/bin/env python
  2. #############################################################################
  3. # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
  4. # All Rights Reserved.
  5. #
  6. # The software contained on this media is the property of the DSTC Pty
  7. # Ltd.  Use of this software is strictly in accordance with the
  8. # license agreement in the accompanying LICENSE.HTML file.  If your
  9. # distribution of this software does not contain a LICENSE.HTML file
  10. # then you have no rights to use this software in any manner and
  11. # should contact DSTC at the address below to determine an appropriate
  12. # licensing arrangement.
  13. #      DSTC Pty Ltd
  14. #      Level 7, GP South
  15. #      Staff House Road
  16. #      University of Queensland
  17. #      St Lucia, 4072
  18. #      Australia
  19. #      Tel: +61 7 3365 4310
  20. #      Fax: +61 7 3365 4311
  21. #      Email: enquiries@dstc.edu.au
  22. # This software is being provided "AS IS" without warranty of any
  23. # kind.  In no event shall DSTC Pty Ltd be liable for damage of any
  24. # kind arising out of or in connection with the use or performance of
  25. # this software.
  26. #
  27. # Project:      Fnorb
  28. # File:         $Source: /units/arch/src/Fnorb/orb/RCS/GIOPClient.py,v $
  29. # Version:      @(#)$RCSfile: GIOPClient.py,v $ $Revision: 1.9 $
  30. #
  31. #############################################################################
  32. """ GIOPClient class. """
  33.  
  34.  
  35. # Standard/built-in modules.
  36. import new
  37.  
  38. # Fnorb modules.
  39. import fnorb_thread, CORBA, GIOP, GIOPClientWorker, IOP, OctetStream, Util
  40.  
  41.  
  42. class GIOPClient:
  43.     """ GIOPClient class. """
  44.  
  45.     def __init__(self, protocol, address):
  46.     """ Constructor.
  47.  
  48.     'protocol' is the protocol creating this client.
  49.     'address'  is the address of the remote object.
  50.  
  51.     """
  52.     self.__protocol = protocol
  53.     self.__address  = address
  54.  
  55.     # We only get a worker when the first request is made.
  56.     self.__worker = None
  57.  
  58.     # Mutex for the worker. Amongst other things, this makes sure that we
  59.     # don't create more than one worker for the same client in
  60.     # multi-threaded environments!
  61.     self.__lk = fnorb_thread.allocate_lock()
  62.  
  63.     return
  64.  
  65.     def pseudo__del__(self):
  66.     """ Pseudo destructor to remove circular references.
  67.  
  68.     This method is called by the GIOPClientManager when it determines that
  69.     there are no more clients using this address.
  70.  
  71.     """
  72.     # Clean up our worker (required because of circular references between
  73.     # the worker and its connection handler).
  74.     self.__lk.acquire()
  75.     if self.__worker is not None:
  76.         self.__worker.pseudo__del__()
  77.         self.__worker = None
  78.     self.__lk.release()
  79.  
  80.     return
  81.  
  82.     #########################################################################
  83.     # GIOPClient interface.
  84.     #########################################################################
  85.  
  86.     def synchronous(self, request, parameters):
  87.     """ Send a 'synchronous' GIOP request message & wait for the reply. """
  88.  
  89.     # Get our worker.
  90.     worker = self.__get_worker()
  91.  
  92.         # Get a unique request id.
  93.         request_id = self.__worker.next_request_id()
  94.  
  95.     # Get the active object key.
  96.     object_key = request.object()._fnorb_object_key()
  97.  
  98.         # Start a new GIOP message.
  99.         message = OctetStream.GIOPMessage(type=GIOP.Request)
  100.         cursor = message.cursor()
  101.  
  102.     # Create a GIOP request header.
  103.     request_header = GIOP.RequestHeader(request.context(),
  104.                         request_id,
  105.                         1, # Response expected.
  106.                         object_key,
  107.                         request.operation(),
  108.                         '')
  109.  
  110.     # Marshal the request header onto the octet stream.
  111.     tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader:1.0')
  112.     tc._fnorb_marshal_value(cursor, request_header)
  113.  
  114.     # Marshal the request parameters onto the octet stream.
  115.     self.__marshal_parameters(cursor, request.inputs(), parameters)
  116.  
  117.     try:
  118.         # Send the request.
  119.         worker.send(request_id, message)
  120.  
  121.         # Wait for the reply.
  122.         reply = worker.recv(request_id)
  123.  
  124.         # Unpack the reply.
  125.         (reply_header, cursor) = reply
  126.  
  127.         # If we have received a 'LOCATION_FORWARD' message.
  128.         if reply_header.reply_status == GIOP.LOCATION_FORWARD:
  129.         # Unmarshal the IOR that contains the address to forward
  130.         # operation requests to.
  131.         ior = new.instance(IOP.IOR, {})
  132.         ior._fnorb_unmarshal(cursor)
  133.  
  134.         # Update the object reference to use the forwarded IOR.
  135.         request.object()._fnorb_forward(ior)
  136.  
  137.         # Try again!
  138.         forwarded = 1
  139.         result = None
  140.         
  141.         # The reply was *not* a 'LOCATION_FORWARD' so lets deal with it!
  142.             else:
  143.         forwarded = 0
  144.         result = self.__process_reply(request, reply)
  145.  
  146.     # Retry on transient failures (this includes 'CloseConnection' messages
  147.     # from the server.
  148.     except CORBA.TRANSIENT, ex:
  149.         forwarded = 1
  150.         result = None
  151.  
  152.     # If a communication (ie. socket!) error occurred then see if we have
  153.     # previously been 'forwarded'.
  154.     except CORBA.COMM_FAILURE, ex:
  155.         # If we *have* been 'forwarded' then try again using the object's
  156.         # original address.
  157.         if request.object()._fnorb_forwarded():
  158.         # Update the object reference to use its original IOR.
  159.         request.object()._fnorb_unforward()
  160.  
  161.         # Try again!
  162.         forwarded = 1
  163.         result = None
  164.  
  165.          # Otherwise, if we haven't been forwarded then give up!
  166.          else:
  167.         raise ex
  168.  
  169.     return (forwarded, result)
  170.  
  171.     def deferred(self, request, parameters):
  172.     """ Create and send send a 'deferred' GIOP request message. """
  173.  
  174.     # Get our worker.
  175.     worker = self.__get_worker()
  176.  
  177.         # Get a unique request id.
  178.         request_id = self.__worker.next_request_id()
  179.  
  180.     # Get the active object key.
  181.     object_key = request.object()._fnorb_object_key()
  182.  
  183.         # Start a new GIOP message.
  184.         message = OctetStream.GIOPMessage(type=GIOP.Request)
  185.         cursor = message.cursor()
  186.  
  187.     # Create a GIOP request header.
  188.     request_header = GIOP.RequestHeader(request.context(),
  189.                         request_id,
  190.                         1, # Response expected.
  191.                         object_key,
  192.                         request.operation(),
  193.                         '')
  194.  
  195.     # Marshal the request header onto the octet stream.
  196.     tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader:1.0')
  197.     tc._fnorb_marshal_value(cursor, request_header)
  198.  
  199.     # Marshal the request parameters onto the octet stream.
  200.     self.__marshal_parameters(cursor, request.inputs(), parameters)
  201.  
  202.     # Send the request.
  203.     try:
  204.         self.__worker.send(request_header.request_id, message)
  205.  
  206.     # Retry on transient failures (this includes 'CloseConnection' messages
  207.     # from the server.
  208.     except CORBA.TRANSIENT:
  209.         return self.deferred(request, parameters)
  210.  
  211.     return request_header.request_id
  212.  
  213.     def oneway(self, request, parameters):
  214.     """ Create and send a 'oneway' GIOP request message. """
  215.  
  216.     # Get our worker.
  217.     worker = self.__get_worker()
  218.  
  219.         # Get a unique request id.
  220.         request_id = self.__worker.next_request_id()
  221.  
  222.     # Get the active object key.
  223.     object_key = request.object()._fnorb_object_key()
  224.  
  225.         # Start a new GIOP message.
  226.         message = OctetStream.GIOPMessage(type=GIOP.Request)
  227.         cursor = message.cursor()
  228.  
  229.     # Create a GIOP request header.
  230.     request_header = GIOP.RequestHeader(request.context(),
  231.                         request_id,
  232.                         0, # No response expected.
  233.                         object_key,
  234.                         request.operation(),
  235.                         '')
  236.  
  237.     # Marshal the request header onto the octet stream.
  238.     tc = CORBA.typecode('IDL:omg.org/GIOP/RequestHeader:1.0')
  239.     tc._fnorb_marshal_value(cursor, request_header)
  240.  
  241.     # Marshal the request parameters onto the octet stream.
  242.     self.__marshal_parameters(cursor, request.inputs(), parameters)
  243.  
  244.     # Send the request.
  245.     try:
  246.         self.__worker.send(request_header.request_id, message)
  247.  
  248.     # Retry on transient failures (this includes 'CloseConnection' messages
  249.     # from the server.
  250.     except CORBA.TRANSIENT:
  251.         return self.oneway(request, parameters)
  252.  
  253.     return
  254.  
  255.     def reply(self, request):
  256.     """ Get the reply to a 'deferred' operation request. """
  257.  
  258.     # Get our worker.
  259.     worker = self.__get_worker()
  260.     try:
  261.         # Wait for the reply.
  262.         reply = worker.recv(request._fnorb_request_id())
  263.  
  264.         # Unpack the reply.
  265.         (reply_header, cursor) = reply
  266.  
  267.         # If we have received a 'LOCATION_FORWARD' message.
  268.         if reply_header.reply_status == GIOP.LOCATION_FORWARD:
  269.         # Unmarshal the IOR that contains the address to forward
  270.         # operation requests to.
  271.         ior = new.instance(IOP.IOR, {})
  272.         ior._fnorb_unmarshal(cursor)
  273.  
  274.         # Update the object reference to use the forwarded IOR.
  275.         request.object()._fnorb_forward(ior)
  276.  
  277.         # Try again!
  278.         forwarded = 1
  279.         result = None
  280.         
  281.         # The reply was *not* a 'LOCATION_FORWARD' so lets deal with it!
  282.             else:
  283.         forwarded = 0
  284.         result = self.__process_reply(request, reply)
  285.  
  286.     # Retry on transient failures (this includes 'CloseConnection' messages
  287.     # from the server.
  288.     except CORBA.TRANSIENT, ex:
  289.         forwarded = 1
  290.         result = None
  291.  
  292.     # If a communication (ie. socket!) error occurred then see if we have
  293.     # previously been 'forwarded'.
  294.     except CORBA.COMM_FAILURE, ex:
  295.         # If we *have* been 'forwarded' then try again using the object's
  296.         # original address.
  297.         if request.object()._fnorb_forwarded():
  298.         # Update the object reference to use its original IOR.
  299.         request.object()._fnorb_unforward()
  300.  
  301.         # Try again!
  302.         forwarded = 1
  303.         result = None
  304.  
  305.          # Otherwise, if we haven't been forwarded then give up!
  306.          else:
  307.         raise ex
  308.  
  309.     return (forwarded, result)
  310.  
  311.     def poll(self, request):
  312.     """ Has a reply been received for the specified request? """
  313.  
  314.     # Get our worker.
  315.     worker = self.__get_worker()
  316.  
  317.     # Get the request id.
  318.     request_id = request._fnorb_request_id()
  319.     try:
  320.         if worker.poll(request_id):
  321.         # Take a peek at the reply.
  322.         (reply_header, cursor) = worker.peek(request_id)
  323.  
  324.         # If we have received a 'LOCATION_FORWARD' message.
  325.         if reply_header.reply_status == GIOP.LOCATION_FORWARD:
  326.             # Delete the reply.
  327.             worker.delete_reply(self.__request_id)
  328.  
  329.             # Unmarshal the IOR that contains the address to forward
  330.             # operation requests to.
  331.             ior = new.instance(IOP.IOR, {})
  332.             ior._fnorb_unmarshal(cursor)
  333.  
  334.             # Update the object reference to use the forwarded IOR.
  335.             self.__object._fnorb_forward(ior)
  336.  
  337.             # Try again!
  338.             forwarded = 1
  339.             result = 0
  340.  
  341.         # If a CORBA system exception occurred...
  342.         elif reply_header.reply_status == GIOP.SYSTEM_EXCEPTION:
  343.             # Unmarshal and raise the system exception.
  344.             raise self.__unmarshal_system_exception(cursor)
  345.  
  346.         # The reply is either a 'NO_EXCEPTION' or a 'USER_EXCEPTION'
  347.         # message.
  348.         else:
  349.             forwarded = 0
  350.             result = 1
  351.  
  352.         # The reply has not arrived yet 8^(
  353.         else:
  354.         forwarded = 0
  355.         result = 0
  356.  
  357.     # Retry on transient failures (this includes 'CloseConnection' messages
  358.     # from the server.
  359.     except CORBA.TRANSIENT, ex:
  360.         forwarded = 1
  361.         result = None
  362.  
  363.     # If a communication (ie. socket!) error occurred then see if we have
  364.     # previously been 'forwarded'.
  365.     except CORBA.COMM_FAILURE, ex:
  366.         # If we *have* been 'forwarded' then try again using the object's
  367.         # original address.
  368.         if self.__object._fnorb_forwarded():
  369.         # Update the object reference to use its original IOR.
  370.         self.__object._fnorb_unforward()
  371.  
  372.         # Try again!
  373.         forwarded = 1
  374.         result = 0
  375.  
  376.         # Otherwise, if we haven't been forwarded then give up!
  377.         else:
  378.         raise ex
  379.  
  380.     return (forwarded, result)
  381.  
  382.     #########################################################################
  383.     # Private interface.
  384.     #########################################################################
  385.  
  386.     def __get_worker(self):
  387.     """ Get our GIOP client worker. """
  388.  
  389.     self.__lk.acquire()
  390.     try:
  391.         # If this is the first operation request then get a worker.
  392.         if self.__worker is None:
  393.         # Get a reference to the factory for GIOP client workers.
  394.         cwf = GIOPClientWorker.GIOPClientWorkerFactory_init()
  395.  
  396.         # Create a new GIOP client worker (the concrete type of which
  397.         # will be determined by the threading model).
  398.         self.__worker = cwf.create_worker(self.__protocol,
  399.                           self.__address)
  400.  
  401.         # If the worker has received a 'CloseConnection' message, then
  402.         # we need a new one!
  403.         elif self.__worker.is_closed():
  404.         # Clean up the old worker.
  405.         self.__worker.pseudo__del__()
  406.  
  407.         # Get a reference to the factory for GIOP client workers.
  408.         cwf = GIOPClientWorker.GIOPClientWorkerFactory_init()
  409.  
  410.         # Create a new GIOP client worker (the concrete type of which
  411.         # will be determined by the threading model).
  412.         self.__worker = cwf.create_worker(self.__protocol,
  413.                           self.__address)
  414.         # Return value.
  415.         worker = self.__worker
  416.  
  417.     finally:
  418.         self.__lk.release()
  419.  
  420.       return worker
  421.  
  422.     def __process_reply(self, request, (reply_header, cursor)):
  423.     """ Process a GIOP reply. """
  424.  
  425.     # If a CORBA system exception occurred...
  426.     if reply_header.reply_status == GIOP.SYSTEM_EXCEPTION:
  427.         # Unmarshal and raise the system exception.
  428.         raise self.__unmarshal_system_exception(cursor)
  429.  
  430.     # Else if a user exception occurred...
  431.     elif reply_header.reply_status == GIOP.USER_EXCEPTION:
  432.         # Get the repository id of the exception.
  433.         repository_id = cursor.unmarshal('s')
  434.  
  435.         # Try to find a matching exception in the request's list of
  436.         # exception typecodes.
  437.         for typecode in request.exceptions():
  438.         if typecode.id() == repository_id:
  439.             break
  440.  
  441.         # If there is no matching exception then raise an UNKNOWN.
  442.         else:
  443.         raise CORBA.UNKNOWN() # System exception.
  444.  
  445.         # Unmarshal and raise the user exception.
  446.         raise typecode._fnorb_unmarshal_value(cursor)
  447.  
  448.     # If we get here then the operation was successful!
  449.     #
  450.     # Unmarshal the return value and any 'inout' and 'out' parameters.
  451.     return self.__unmarshal_results(cursor, request.outputs())
  452.  
  453.     def __marshal_parameters(self, cursor, typecodes, parameters):
  454.     """ Marshal the parameters onto an octet stream. """
  455.  
  456.     # Marshal each parameter according to its typecode.
  457.     i = 0
  458.     for typecode in typecodes:
  459.         typecode._fnorb_marshal_value(cursor, parameters[i])
  460.         i = i + 1
  461.  
  462.     return
  463.  
  464.     def __unmarshal_results(self, cursor, typecodes):
  465.     """ Unmarshal the result and any 'inout', and 'out' parameters. """
  466.  
  467.     # In the spirit of the ILU mapping, we have the following semantics:-
  468.     #
  469.     # If the operation has no outputs (ie. no result, 'inout', or 'out'
  470.     # parameters) then we return 'None'.
  471.     #
  472.     # If the operation has exactly ONE output then we return just that
  473.     # value.
  474.     #
  475.     # If the operation has more than one output then we return a TUPLE
  476.     # with the result first followed by any 'inout' and 'out' parameters
  477.     # in the order that they appear in the IDL definition.
  478.     no_of_outputs = len(typecodes)
  479.  
  480.     if no_of_outputs == 0:
  481.         results = None
  482.  
  483.     elif no_of_outputs == 1:
  484.         results = typecodes[0]._fnorb_unmarshal_value(cursor)
  485.  
  486.     else:
  487.         outputs = [None] * no_of_outputs
  488.         for i in range(no_of_outputs):
  489.         outputs[i] = typecodes[i]._fnorb_unmarshal_value(cursor)
  490.  
  491.         results = tuple(outputs)
  492.  
  493.         return results
  494.  
  495.     def __unmarshal_system_exception(self, cursor):
  496.     """ Unmarshal a system exception from an octet stream. """
  497.  
  498.     # Unmarshal the repository id of the system exception.
  499.     intrep_id = Util.RepositoryId(cursor.unmarshal('s'))
  500.  
  501.     # Get the scoped name from the repository id.
  502.     scoped_name = intrep_id.scoped_name()
  503.  
  504.     # The last two components of the scoped name make up the name of the
  505.     # Python class that represents the system exception.
  506.     #
  507.     # e.g. For the COMM_FAILURE exception, the repository id is:-
  508.     #
  509.     #          IDL:omg.org/CORBA/COMM_FAILURE:1.0
  510.     #
  511.     #      The scoped name is therefore:-
  512.     #
  513.     #          omg.org/CORBA/COMM_FAILURE
  514.     #
  515.     #      And the Python class name is:-
  516.     #
  517.     #          CORBA.COMM_FAILURE
  518.     klass = eval(scoped_name[-2:].join('.'))
  519.  
  520.     # Create an uninitialised exception instance, and then unmarshal the
  521.     # exception details!
  522.     ex = new.instance(klass, {})
  523.     ex._fnorb_unmarshal(cursor)
  524.  
  525.     return ex
  526.  
  527. #############################################################################
  528.