home *** CD-ROM | disk | FTP | other *** search
/ OS/2 Shareware BBS: 10 Tools / 10-Tools.zip / fnb101.zip / Lib / site-packages / Fnorb / orb / GIOPClientWorkerReactive.py < prev    next >
Text File  |  1999-06-28  |  10KB  |  328 lines

  1. #!/usr/bin/env python
  2. #############################################################################
  3. # Copyright (C) DSTC Pty Ltd (ACN 052 372 577) 1997, 1998, 1999
  4. # All Rights Reserved.
  5. #
  6. # The software contained on this media is the property of the DSTC Pty
  7. # Ltd.  Use of this software is strictly in accordance with the
  8. # license agreement in the accompanying LICENSE.HTML file.  If your
  9. # distribution of this software does not contain a LICENSE.HTML file
  10. # then you have no rights to use this software in any manner and
  11. # should contact DSTC at the address below to determine an appropriate
  12. # licensing arrangement.
  13. #      DSTC Pty Ltd
  14. #      Level 7, GP South
  15. #      Staff House Road
  16. #      University of Queensland
  17. #      St Lucia, 4072
  18. #      Australia
  19. #      Tel: +61 7 3365 4310
  20. #      Fax: +61 7 3365 4311
  21. #      Email: enquiries@dstc.edu.au
  22. # This software is being provided "AS IS" without warranty of any
  23. # kind.  In no event shall DSTC Pty Ltd be liable for damage of any
  24. # kind arising out of or in connection with the use or performance of
  25. # this software.
  26. #
  27. # Project:      Fnorb
  28. # File:         $Source: /units/arch/src/Fnorb/orb/RCS/GIOPClientWorkerReactive.py,v $
  29. # Version:      @(#)$RCSfile: GIOPClientWorkerReactive.py,v $ $Revision: 1.6 $
  30. #
  31. #############################################################################
  32. """ GIOPClientWorkerReactive class. """
  33.  
  34.  
  35. # Fnorb modules.
  36. import CORBA, EventHandler, GIOPClientWorker, GIOPConnectionHandler, Reactor
  37.  
  38.  
  39. class GIOPClientWorkerReactive(GIOPClientWorker.GIOPClientWorker,
  40.                    EventHandler.EventHandler):
  41.     """ GIOPClientWorkerReactive class.
  42.  
  43.     A concrete 'EventHandler' in the 'Reactor' pattern.
  44.  
  45.     This class is *not* thread safe, but since the whole point of the 'Reactor'
  46.     pattern is to work in a single-threaded environment, I don't see this as
  47.     much of a problem ;^)
  48.  
  49.     """
  50.  
  51.     def __init__(self, protocol, address):
  52.     """ Provide an interface to a remote object!
  53.  
  54.     'protocol' is the protocol used by this worker.
  55.     'address'  is the address of the remote object.
  56.  
  57.     """
  58.     # fixme: We don't need to keep these as instance variables... (maybe
  59.     # for unpickling?
  60.     self.__protocol = protocol
  61.     self.__address = address
  62.  
  63.     # This flag is set when we get a CloseConnection message, or an
  64.     # exception event.
  65.     self.__closed = 0
  66.  
  67.     # Each request has a unique id. associated with it.
  68.     self.__request_id = 0
  69.  
  70.     # Queue of outgoing messages.
  71.     self.__outgoing = [] # [(RequestId, Message)]
  72.  
  73.     # Dictionary of complete replies.
  74.     self.__replies = {} # {RequestId: (ReplyHeader, Cursor)}
  75.  
  76.      # Ask the protocol to create a connection and actually connect to the
  77.      # remote object.
  78.      self.__connection = protocol.create_connection()
  79.      self.__connection.connect(self.__address)
  80.  
  81.     # Create a handler to look after the connection.
  82.     self.__handler = GIOPConnectionHandler.GIOPConnectionHandler \
  83.              (self, self.__connection)
  84.  
  85.     # Get a reference to the active Reactor.
  86.     #
  87.     # fixme: Should the reactor be obtained from the protocol?
  88.     self.__reactor = Reactor.Reactor_init()
  89.  
  90.     # Register our interest in read events with the Reactor.
  91.     self.__reactor.register_handler(self, Reactor.READ)
  92.  
  93.     return
  94.  
  95.     def pseudo__del__(self):
  96.     """ Pseudo destructor to remove circular references.
  97.  
  98.     This method is called from the 'pseudo__del__' of the GIOPClient that
  99.     we belong to.
  100.  
  101.     """
  102.     # Close down the worker.
  103.     self.__close()
  104.  
  105.     # The handler holds a (circular) reference to this instance so we
  106.     # have to explicitly clean it up.
  107.     self.__handler.pseudo__del__()
  108.  
  109.     # Clean up *our* reference to *it*!
  110.     del self.__handler
  111.  
  112.     return
  113.  
  114.     #########################################################################
  115.     # GIOPClientWorker interface.
  116.     #########################################################################
  117.  
  118.     def send(self, request_id, message):
  119.     """ Send an operation request to the remote object.
  120.  
  121.     This method WAITS for the message to be sent.
  122.  
  123.     """
  124.     # Add the outgoing message to the queue.
  125.     self.__outgoing.append((request_id, message))
  126.  
  127.     # Register my interest in write events with the Reactor.
  128.     self.__reactor.register_handler(self, Reactor.WRITE)
  129.  
  130.     # Get the reactor to process events until the message has been sent.
  131.     while 1:
  132.         for (id, message) in self.__outgoing:
  133.         if request_id == id:
  134.             break
  135.  
  136.         # Else, the request has been sent!
  137.         else:
  138.         break
  139.  
  140.         # Get the reactor to wait for and process a single event.
  141.         self.__reactor.do_one_event()
  142.  
  143.     return
  144.  
  145.     def recv(self, request_id):
  146.     """ Wait for a specific reply. """
  147.  
  148.     # Get the reactor to process events until we either get a close event
  149.     # or we get a reply.
  150.     while 1:
  151.         # Have we got our reply yet?
  152.         try:
  153.         reply = self.__replies[request_id]
  154.         del self.__replies[request_id]
  155.         break
  156.  
  157.         # Err, nope!
  158.         except KeyError:
  159.         pass
  160.  
  161.         # Get the reactor to wait for and process a single event.
  162.         self.__reactor.do_one_event()
  163.  
  164.     return reply
  165.  
  166.     def poll(self, request_id):
  167.     """ Poll for a reply to a specific request. """
  168.  
  169.     # Get the reactor to poll for a single event.
  170.     self.__reactor.do_one_event(0)
  171.  
  172.     # Have we got our reply yet?
  173.     return self.__replies.has_key(request_id)
  174.  
  175.     def peek(self, request_id):
  176.     """ Peek at the reply for the specified request.
  177.  
  178.     This method does *not* delete the reply from the client's queue.
  179.  
  180.     """
  181.     # Get the reply.
  182.     return self.__replies[request_id]
  183.  
  184.     def delete_reply(self, request_id):
  185.     """ Delete the reply with the specified request id. """
  186.  
  187.     # Delete the reply.
  188.     del self.__replies[request_id]
  189.  
  190.     return
  191.  
  192.     def next_request_id(self):
  193.     """ Return the next request id. """
  194.  
  195.     # fixme: This will get stupidly big oneday!!!!
  196.     self.__request_id = self.__request_id + 1
  197.  
  198.     return self.__request_id
  199.  
  200.     def is_closed(self):
  201.     """ Has the worker received a close event? """
  202.  
  203.     return self.__closed
  204.  
  205.     def close_connection(self):
  206.     """ Orderly shutdown, in repsonse to a 'CloseConnection' message. """
  207.  
  208.     if not self.__closed:
  209.         # Close down the worker.
  210.         self.__close()
  211.         
  212.         # We notify our 'GIOPClient' that a 'CloseConnection' message
  213.         # was received by raising a 'CORBA.TRANSIENT' system exception.
  214.         raise CORBA.TRANSIENT()
  215.  
  216.     return
  217.  
  218.     #########################################################################
  219.     # EventHandler interface.
  220.     #########################################################################
  221.  
  222.     def handle_event(self, mask):
  223.     """ Callback method to handle all events except close events. """
  224.  
  225.     # Read event.
  226.     if mask & Reactor.READ:
  227.         self.__handler.recv()
  228.  
  229.     # Write event.
  230.     if mask & Reactor.WRITE:
  231.         # Get the message at the head of the outgoing queue.
  232.         (request_id, message) = self.__outgoing[0]
  233.  
  234.         # Send it!
  235.         self.__handler.send(message)
  236.  
  237.     # Exception event.
  238.     if mask & Reactor.EXCEPTION:
  239.         # Close down the worker.
  240.         self.__close()
  241.  
  242.         # Barf!
  243.         raise CORBA.COMM_FAILURE() # System exception.
  244.  
  245.     return
  246.  
  247.     def handle_close(self):
  248.     """ Callback method to handle close events. """
  249.  
  250.     # Close down the worker.
  251.     self.__close()
  252.  
  253.     return
  254.  
  255.     def handle(self):
  256.     """ Return my underlying I/O handle.
  257.  
  258.     In this case, my I/O handle is the file descriptor of my socket. 
  259.  
  260.     """
  261.     return self.__connection.handle()
  262.  
  263.     #########################################################################
  264.     # GIOPConnectionHandlerListener interface.
  265.     #########################################################################
  266.  
  267.     def message_received(self, message):
  268.     """ Called when a complete GIOP message has been received. """
  269.  
  270.     # Code re-use! The protected method '_message_received' is implemented
  271.     # in the 'GIOPClientWorker' class.
  272.     #
  273.     # If the message is a GIOP Reply message, then this method will call
  274.     # our '_reply_received' method.
  275.     self._message_received(message)
  276.  
  277.     return
  278.  
  279.     def message_sent(self):
  280.     """ Called when a complete GIOP message has been sent. """
  281.  
  282.     # Get the details of the message that has just been sent.
  283.     (request_id, message) = self.__outgoing[0]
  284.  
  285.     # Remove the message from the outgoing queue.
  286.     del self.__outgoing[0]
  287.  
  288.     # If there are no other messages left to send then tell the Reactor
  289.     # that I am no longer interested in write events.
  290.     if len(self.__outgoing) == 0:
  291.         self.__reactor.unregister_handler(self, Reactor.WRITE)
  292.  
  293.     return
  294.  
  295.     #########################################################################
  296.     # Protected interface.
  297.     #########################################################################
  298.  
  299.     def _reply_received(self, reply_header, cursor):
  300.     """ Called by the '_message_received' method of GIOPClientWorker. """
  301.  
  302.     # Add the reply to our dictionary of complete replies.
  303.     self.__replies[reply_header.request_id] = (reply_header, cursor)
  304.  
  305.     return
  306.  
  307.     #########################################################################
  308.     # Private interface.
  309.     #########################################################################
  310.     
  311.     def __close(self):
  312.     """ Close down the worker. """
  313.  
  314.     # Withdraw all of my Reactor registrations.
  315.     self.__reactor.unregister_handler(self, Reactor.ALL)
  316.  
  317.     # Close our connection.
  318.     self.__connection.disconnect()
  319.  
  320.     # All done!
  321.     self.__closed = 1
  322.  
  323.     return
  324.  
  325. #############################################################################
  326.