home *** CD-ROM | disk | FTP | other *** search
/ OS/2 Shareware BBS: 10 Tools / 10-Tools.zip / fnb101.zip / Lib / site-packages / Fnorb / orb / GIOPClientWorkerThreaded.py < prev    next >
Text File  |  1999-06-28  |  11KB  |  393 lines

  1. #!/usr/bin/env python
  2. #############################################################################
  3. # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
  4. # All Rights Reserved.
  5. #
  6. # The software contained on this media is the property of the DSTC Pty
  7. # Ltd.  Use of this software is strictly in accordance with the
  8. # license agreement in the accompanying LICENSE.HTML file.  If your
  9. # distribution of this software does not contain a LICENSE.HTML file
  10. # then you have no rights to use this software in any manner and
  11. # should contact DSTC at the address below to determine an appropriate
  12. # licensing arrangement.
  13. #      DSTC Pty Ltd
  14. #      Level 7, GP South
  15. #      Staff House Road
  16. #      University of Queensland
  17. #      St Lucia, 4072
  18. #      Australia
  19. #      Tel: +61 7 3365 4310
  20. #      Fax: +61 7 3365 4311
  21. #      Email: enquiries@dstc.edu.au
  22. # This software is being provided "AS IS" without warranty of any
  23. # kind.  In no event shall DSTC Pty Ltd be liable for damage of any
  24. # kind arising out of or in connection with the use or performance of
  25. # this software.
  26. #
  27. # Project:      Fnorb
  28. # File:         $Source: /units/arch/src/Fnorb/orb/RCS/GIOPClientWorkerThreaded.py,v $
  29. # Version:      @(#)$RCSfile: GIOPClientWorkerThreaded.py,v $ $Revision: 1.7 $
  30. #
  31. #############################################################################
  32. """ GIOPClientWorkerThreaded class. """
  33.  
  34.  
  35. # Standard/built-in modules.
  36. import thread
  37.  
  38. # Fnorb modules.
  39. import CORBA, GIOPClientWorker, GIOPConnectionHandler
  40.  
  41. # Fnorb threading modules.
  42. import condvar
  43.  
  44.  
  45. class GIOPClientWorkerThreaded(GIOPClientWorker.GIOPClientWorker):
  46.     """ GIOPClientWorkerThreaded class. """
  47.  
  48.     def __init__(self, protocol, address):
  49.     """ Provide an interface to a remote object!
  50.  
  51.     'protocol' is the protocol used by the client.
  52.     'address'  is the address of the remote object.
  53.  
  54.     """
  55.     # fixme: We don't need to keep these as instance variables... (maybe
  56.     # for unpickling?),
  57.     self.__protocol = protocol
  58.     self.__address = address
  59.  
  60.     # This flag is set when we get a CloseConnection message, or an
  61.     # exception event.
  62.     self.__closed = 0
  63.  
  64.     # Each request has a unique id. associated with it.
  65.     self.__request_id = 0
  66.  
  67.     # Mutex to make access to the request id and the closed flag
  68.     # thread-safe.
  69.     self.__lk = thread.allocate_lock()
  70.  
  71.     # Condition variable for outgoing messages.
  72.     self.__outgoing_cv = condvar.condvar()
  73.  
  74.     # Dictionary of condition variables for threads that are blocked
  75.     # waiting for replies.
  76.     self.__blocked_pending = {} # {RequestId: condvar}
  77.  
  78.     # Dictionary of complete replies.
  79.     self.__replies = {} # {RequestId: Message}
  80.  
  81.     # Mutex for incoming messages.
  82.     self.__incoming_lk = thread.allocate_lock()
  83.  
  84.     # Ask the protocol to create a connection and actually connect to the
  85.     # remote object.
  86.     self.__connection = protocol.create_connection()
  87.     self.__connection.connect(self.__address)
  88.  
  89.     # Set the connection to BLOCKING mode.
  90.     self.__connection.blocking(1)
  91.  
  92.     # Create a handler to look after the connection.
  93.     self.__handler = GIOPConnectionHandler.GIOPConnectionHandler \
  94.              (self, self.__connection)
  95.  
  96.     # Start the read thread.
  97.     thread.start_new_thread(self.__read_thread, ())
  98.  
  99.     return
  100.  
  101.     def pseudo__del__(self):
  102.     """ Pseudo destructor to remove circular references. """
  103.  
  104.     # Close down the worker.
  105.     self.__close()
  106.  
  107.     # The handler holds a (circular) reference to this instance so we
  108.     # have to explicitly clean it up.
  109.     self.__handler.pseudo__del__()
  110.  
  111.     # Clean up *our* reference to *it*!
  112.     del self.__handler
  113.  
  114.     return
  115.  
  116.     #########################################################################
  117.     # GIOPClientWorker interface.
  118.     #########################################################################
  119.  
  120.     def send(self, request_id, message):
  121.     """ Send an operation request to the remote object.
  122.  
  123.     This method BLOCKS until the message has been sent.
  124.  
  125.     """
  126.     # Block until the connection is available.
  127.     self.__outgoing_cv.acquire()
  128.     try:
  129.         # Send the entire message.
  130.         n = 0
  131.         while n < len(message):
  132.         n = n + self.__handler.send(message)
  133.  
  134.     except CORBA.COMM_FAILURE, ex:
  135.         # Let somebody else have a go ;^)
  136.         self.__outgoing_cv.release()
  137.  
  138.         # If the connection has been closed then we can try again.
  139.         if self.is_closed():
  140.         raise CORBA.TRANSIENT()
  141.  
  142.         else:
  143.         raise ex
  144.  
  145.     # Let somebody else have a go ;^)
  146.     self.__outgoing_cv.release()
  147.  
  148.     return
  149.  
  150.     def recv(self, request_id):
  151.     """ Wait for a specific reply. """
  152.  
  153.     # Make sure that we aren't interfering with the read thread.
  154.     self.__incoming_lk.acquire()
  155.  
  156.     # See if the reply has already arrived.
  157.     reply = self.__replies.get(request_id)
  158.     if reply is None:
  159.         # Create a new condition variable.
  160.         cv = condvar.condvar()
  161.  
  162.         # Add the condition variable to the dictionary of blocked threads.
  163.         # The read thread will 'signal' the condition variable (if and)
  164.         # when the corresponding reply is received.
  165.         self.__blocked_pending[request_id] = cv
  166.  
  167.         # Acquire the condition variable so that if the read thread
  168.         # receives the reply before we have called 'wait', it will be
  169.         # blocked.
  170.         cv.acquire()
  171.  
  172.         # Let the read thread do its thang!
  173.         self.__incoming_lk.release()
  174.  
  175.         # Wait on the condition variable ('wait' releases the lock on the
  176.         # condition variable first).
  177.         cv.wait()
  178.         cv.release()
  179.  
  180.         self.__incoming_lk.acquire()
  181.         reply = self.__replies[request_id]
  182.         del self.__replies[request_id]
  183.         self.__incoming_lk.release()
  184.  
  185.          # If the condition variable was signalled because we received a
  186.          # 'CloseConnection' message, then the reply will be a
  187.         # 'CORBA.TRANSIENT' exception. If the condition variable was
  188.         # signalled because of some other failure then the reply will be
  189.         # an instance of the appropriate CORBA system exception.
  190.         if isinstance(reply, CORBA.SystemException):
  191.         raise reply
  192.  
  193.     # The reply has already arrived - now that's what I call service ;^)
  194.     else:
  195.         del self.__replies[request_id]
  196.         self.__incoming_lk.release()
  197.  
  198.     return reply
  199.  
  200.     def poll(self, request_id):
  201.     """ Poll for a reply to a specific request. """
  202.  
  203.     # Have we got our reply yet
  204.     self.__incoming_lk.acquire()
  205.     result = self.__replies.has_key(request_id)
  206.     self.__incoming_lk.release()
  207.  
  208.     return result
  209.  
  210.     def peek(self, request_id):
  211.     """ Peek at the reply for the specified request.
  212.  
  213.     This method does *not* delete the reply from the client's queue.
  214.  
  215.     """
  216.     # Get the reply.
  217.     self.__incoming_lk.acquire()
  218.     reply = self.__replies[request_id]
  219.     self.__incoming_lk.release()
  220.  
  221.     return reply
  222.  
  223.     def delete_reply(self, request_id):
  224.     """ Delete the reply with the specified request id. """
  225.  
  226.     # Delete the reply.
  227.     self.__incoming_lk.acquire()
  228.     del self.__replies[request_id]
  229.     self.__incoming_lk.release()
  230.  
  231.     return
  232.  
  233.     def next_request_id(self):
  234.     """ Return the next request id. """
  235.  
  236.     # fixme: This will get stupidly big oneday!!!!
  237.     self.__lk.acquire()
  238.     request_id = self.__request_id = self.__request_id + 1
  239.     self.__lk.release()
  240.  
  241.     return request_id
  242.  
  243.     def is_closed(self):
  244.     """ Has the worker received a close event? """
  245.  
  246.     self.__lk.acquire()
  247.     closed = self.__closed
  248.     self.__lk.release()
  249.  
  250.     return closed
  251.  
  252.     def close_connection(self):
  253.     """ Close down the connection.
  254.  
  255.     This method is only ever called from the read thread.
  256.  
  257.     """
  258.     # Unblock all threads that are waiting for replies.
  259.     self.__unblock_all(CORBA.TRANSIENT())
  260.  
  261.     # Close down the worker.
  262.     self.__close()
  263.  
  264.     return
  265.  
  266.     #########################################################################
  267.     # GIOPConnectionHandlerListener interface.
  268.     #########################################################################
  269.  
  270.     def message_received(self, message):
  271.     """ Called when a complete GIOP message has been received. """
  272.  
  273.     # Code re-use! The protected method '_message_received' is implemented
  274.     # in the 'GIOPClientWorker' class.
  275.     #
  276.     # If the message is a GIOP Reply message, then this method will call
  277.     # our '_reply_received' method.
  278.     self._message_received(message)
  279.  
  280.     return
  281.  
  282.     def message_sent(self):
  283.      """ Called when a complete GIOP message has been sent.
  284.  
  285.     In the threaded worker, we block until the message has been sent
  286.     anyway, so we don't have anything to do here!
  287.  
  288.     """
  289.     pass
  290.  
  291.     #########################################################################
  292.     # Protected interface.
  293.     #########################################################################
  294.  
  295.     def _reply_received(self, reply_header, cursor):
  296.     """ Called by the '_message_received' method of GIOPClientWorker. """
  297.  
  298.     self.__incoming_lk.acquire()
  299.  
  300.     # Add the reply to our dictionary of complete replies.
  301.     self.__replies[reply_header.request_id] = (reply_header, cursor)
  302.  
  303.     # See if a thread is blocked waiting for this reply.
  304.     cv = self.__blocked_pending.get(reply_header.request_id)
  305.  
  306.     if cv is not None:
  307.         # Signal the blocked thread.
  308.         cv.acquire()
  309.         cv.signal()
  310.  
  311.         # Clean up.
  312.         del self.__blocked_pending[reply_header.request_id]
  313.  
  314.     self.__incoming_lk.release()
  315.     return
  316.  
  317.     #########################################################################
  318.     # Private interface.
  319.     #########################################################################
  320.  
  321.     def __read_thread(self):
  322.     """ Read messages from the server. """
  323.  
  324.     try:
  325.         while not self.is_closed():
  326.         # Blocking receive.
  327.         self.__handler.recv()
  328.  
  329.         # If we have received a 'CloseConnection' message, then we notify
  330.         # the GIOPClient by raising a 'TRANSIENT' system exception.
  331.         ex = CORBA.TRANSIENT()
  332.  
  333.     except CORBA.SystemException, ex:
  334.         # If we have received a 'CloseConnection' message, then we notify
  335.         # the GIOPClient by raising a 'TRANSIENT' system exception.
  336.         if self.is_closed():
  337.         ex = CORBA.TRANSIENT()
  338.  
  339.         # Otherwise, it was some other system exception, so close the 
  340.         # connection down.
  341.         else:
  342.         self.__close()
  343.  
  344.     # Unblock all threads that are waiting for replies.
  345.     self.__unblock_all(ex)
  346.  
  347.     # Explicitly exit the thread!
  348.     thread.exit()
  349.  
  350.     def __close(self):
  351.     """ Close down the worker. """
  352.  
  353.     self.__lk.acquire()
  354.     if not self.__closed:
  355.         # All done!
  356.         self.__closed = 1
  357.         self.__lk.release()
  358.  
  359.          # Close our connection.
  360.         self.__connection.disconnect()
  361.  
  362.     else:
  363.         self.__lk.release()
  364.  
  365.     return
  366.  
  367.     def __unblock_all(self, exception):
  368.     """ Unblock all waiting threads with the specified exception. """
  369.  
  370.     self.__incoming_lk.acquire()
  371.     # Set the reply for each blocked thread to be the specified exception.
  372.     for request_id in self.__blocked_pending.keys():
  373.         self.__replies[request_id] = exception
  374.  
  375.     # Get a list of condition variables to signal.
  376.     blocked_pending = self.__blocked_pending.values()
  377.  
  378.     # Clean them up!
  379.     self.__blocked_pending = {}
  380.  
  381.     self.__incoming_lk.release()
  382.  
  383.     # Unblock each thread.
  384.     for cv in blocked_pending:
  385.         cv.acquire()
  386.         cv.signal()
  387.  
  388.     return
  389.  
  390. #############################################################################
  391.