home *** CD-ROM | disk | FTP | other *** search
/ Computer Active 2010 August / CA08.iso / Darbas / kidoz_v1.air / kidoz.swf / scripts / mx / messaging / AbstractConsumer.as next >
Encoding:
Text File  |  2009-05-06  |  16.1 KB  |  487 lines

  1. package mx.messaging
  2. {
  3.    import flash.events.TimerEvent;
  4.    import flash.utils.Timer;
  5.    import mx.core.mx_internal;
  6.    import mx.events.PropertyChangeEvent;
  7.    import mx.logging.Log;
  8.    import mx.messaging.channels.PollingChannel;
  9.    import mx.messaging.events.ChannelEvent;
  10.    import mx.messaging.events.ChannelFaultEvent;
  11.    import mx.messaging.events.MessageEvent;
  12.    import mx.messaging.events.MessageFaultEvent;
  13.    import mx.messaging.messages.AcknowledgeMessage;
  14.    import mx.messaging.messages.CommandMessage;
  15.    import mx.messaging.messages.ErrorMessage;
  16.    import mx.messaging.messages.IMessage;
  17.    import mx.resources.IResourceManager;
  18.    import mx.resources.ResourceManager;
  19.    
  20.    use namespace mx_internal;
  21.    
  22.    public class AbstractConsumer extends MessageAgent
  23.    {
  24.       private var _currentAttempt:int;
  25.       
  26.       private var _timestamp:Number = -1;
  27.       
  28.       private var _resubscribeInterval:int = 5000;
  29.       
  30.       private var _resubscribeAttempts:int = 5;
  31.       
  32.       private var _resubscribeTimer:Timer;
  33.       
  34.       protected var _shouldBeSubscribed:Boolean;
  35.       
  36.       private var _subscribeMsg:CommandMessage;
  37.       
  38.       private var _subscribed:Boolean;
  39.       
  40.       private var resourceManager:IResourceManager = ResourceManager.getInstance();
  41.       
  42.       public function AbstractConsumer()
  43.       {
  44.          super();
  45.          _log = Log.getLogger("mx.messaging.Consumer");
  46.          _agentType = "consumer";
  47.       }
  48.       
  49.       override public function channelFaultHandler(param1:ChannelFaultEvent) : void
  50.       {
  51.          if(!param1.channel.connected)
  52.          {
  53.             setSubscribed(false);
  54.          }
  55.          super.channelFaultHandler(param1);
  56.          if(_shouldBeSubscribed && !param1.rejected && !param1.channel.connected)
  57.          {
  58.             startResubscribeTimer();
  59.          }
  60.       }
  61.       
  62.       protected function buildUnsubscribeMessage(param1:Boolean) : CommandMessage
  63.       {
  64.          var _loc2_:CommandMessage = new CommandMessage();
  65.          _loc2_.operation = CommandMessage.UNSUBSCRIBE_OPERATION;
  66.          _loc2_.clientId = clientId;
  67.          _loc2_.destination = destination;
  68.          if(param1)
  69.          {
  70.             _loc2_.headers[CommandMessage.PRESERVE_DURABLE_HEADER] = param1;
  71.          }
  72.          return _loc2_;
  73.       }
  74.       
  75.       public function receive(param1:Number = 0) : void
  76.       {
  77.          var _loc2_:CommandMessage = null;
  78.          if(clientId != null)
  79.          {
  80.             _loc2_ = new CommandMessage();
  81.             _loc2_.operation = CommandMessage.POLL_OPERATION;
  82.             _loc2_.destination = destination;
  83.             internalSend(_loc2_);
  84.          }
  85.       }
  86.       
  87.       override mx_internal function setClientId(param1:String) : void
  88.       {
  89.          var _loc2_:Boolean = false;
  90.          if(super.clientId != param1)
  91.          {
  92.             _loc2_ = false;
  93.             if(subscribed)
  94.             {
  95.                unsubscribe();
  96.                _loc2_ = true;
  97.             }
  98.             super.mx_internal::setClientId(param1);
  99.             if(_loc2_)
  100.             {
  101.                subscribe(param1);
  102.             }
  103.          }
  104.       }
  105.       
  106.       override public function disconnect() : void
  107.       {
  108.          _shouldBeSubscribed = false;
  109.          stopResubscribeTimer();
  110.          setSubscribed(false);
  111.          super.disconnect();
  112.       }
  113.       
  114.       public function subscribe(param1:String = null) : void
  115.       {
  116.          var _loc2_:Boolean = param1 != null && super.clientId != param1 ? true : false;
  117.          if(subscribed && _loc2_)
  118.          {
  119.             unsubscribe();
  120.          }
  121.          stopResubscribeTimer();
  122.          _shouldBeSubscribed = true;
  123.          if(_loc2_)
  124.          {
  125.             super.mx_internal::setClientId(param1);
  126.          }
  127.          if(Log.isInfo())
  128.          {
  129.             _log.info("\'{0}\' {1} subscribe.",id,_agentType);
  130.          }
  131.          _subscribeMsg = buildSubscribeMessage();
  132.          internalSend(_subscribeMsg);
  133.       }
  134.       
  135.       override public function channelDisconnectHandler(param1:ChannelEvent) : void
  136.       {
  137.          setSubscribed(false);
  138.          super.channelDisconnectHandler(param1);
  139.          if(_shouldBeSubscribed && !param1.rejected)
  140.          {
  141.             startResubscribeTimer();
  142.          }
  143.       }
  144.       
  145.       protected function buildSubscribeMessage() : CommandMessage
  146.       {
  147.          var _loc1_:CommandMessage = new CommandMessage();
  148.          _loc1_.operation = CommandMessage.SUBSCRIBE_OPERATION;
  149.          _loc1_.clientId = clientId;
  150.          _loc1_.destination = destination;
  151.          return _loc1_;
  152.       }
  153.       
  154.       protected function startResubscribeTimer() : void
  155.       {
  156.          if(_shouldBeSubscribed && _resubscribeTimer == null)
  157.          {
  158.             if(_resubscribeAttempts != 0 && _resubscribeInterval > 0)
  159.             {
  160.                if(Log.isDebug())
  161.                {
  162.                   _log.debug("\'{0}\' {1} starting resubscribe timer.",id,_agentType);
  163.                }
  164.                _resubscribeTimer = new Timer(1);
  165.                _resubscribeTimer.addEventListener(TimerEvent.TIMER,resubscribe);
  166.                _resubscribeTimer.start();
  167.                _currentAttempt = 0;
  168.             }
  169.          }
  170.       }
  171.       
  172.       public function unsubscribe(param1:Boolean = false) : void
  173.       {
  174.          _shouldBeSubscribed = false;
  175.          if(subscribed)
  176.          {
  177.             if(channelSet != null)
  178.             {
  179.                channelSet.removeEventListener(destination,mx_internal::messageHandler);
  180.             }
  181.             if(Log.isInfo())
  182.             {
  183.                _log.info("\'{0}\' {1} unsubscribe.",id,_agentType);
  184.             }
  185.             internalSend(buildUnsubscribeMessage(param1));
  186.          }
  187.          else
  188.          {
  189.             stopResubscribeTimer();
  190.          }
  191.       }
  192.       
  193.       mx_internal function messageHandler(param1:MessageEvent) : void
  194.       {
  195.          var _loc3_:CommandMessage = null;
  196.          var _loc2_:IMessage = param1.message;
  197.          if(_loc2_ is CommandMessage)
  198.          {
  199.             _loc3_ = _loc2_ as CommandMessage;
  200.             switch(_loc3_.operation)
  201.             {
  202.                case CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION:
  203.                   if(channelSet.currentChannel is PollingChannel)
  204.                   {
  205.                      PollingChannel(channelSet.currentChannel).disablePolling();
  206.                   }
  207.                   setSubscribed(false);
  208.                   break;
  209.                default:
  210.                   if(Log.isWarn())
  211.                   {
  212.                      _log.warn("\'{0}\' received a CommandMessage \'{1}\' that could not be handled.",id,CommandMessage.getOperationAsString(_loc3_.operation));
  213.                   }
  214.             }
  215.             return;
  216.          }
  217.          if(_loc2_.timestamp > _timestamp)
  218.          {
  219.             _timestamp = _loc2_.timestamp;
  220.          }
  221.          if(_loc2_ is ErrorMessage)
  222.          {
  223.             dispatchEvent(MessageFaultEvent.createEvent(ErrorMessage(_loc2_)));
  224.          }
  225.          else
  226.          {
  227.             dispatchEvent(MessageEvent.createEvent(MessageEvent.MESSAGE,_loc2_));
  228.          }
  229.       }
  230.       
  231.       [Bindable(event="propertyChange")]
  232.       public function get timestamp() : Number
  233.       {
  234.          return _timestamp;
  235.       }
  236.       
  237.       [Bindable(event="propertyChange")]
  238.       public function get resubscribeInterval() : int
  239.       {
  240.          return _resubscribeInterval;
  241.       }
  242.       
  243.       [Bindable(event="propertyChange")]
  244.       public function get subscribed() : Boolean
  245.       {
  246.          return _subscribed;
  247.       }
  248.       
  249.       override public function fault(param1:ErrorMessage, param2:IMessage) : void
  250.       {
  251.          if(_disconnectBarrier)
  252.          {
  253.             return;
  254.          }
  255.          if(_subscribeMsg != null && param1.correlationId == _subscribeMsg.messageId)
  256.          {
  257.             _shouldBeSubscribed = false;
  258.          }
  259.          if(!param1.headers[ErrorMessage.RETRYABLE_HINT_HEADER] || _resubscribeTimer == null)
  260.          {
  261.             stopResubscribeTimer();
  262.             super.fault(param1,param2);
  263.          }
  264.       }
  265.       
  266.       protected function setSubscribed(param1:Boolean) : void
  267.       {
  268.          var _loc2_:PropertyChangeEvent = null;
  269.          if(_subscribed != param1)
  270.          {
  271.             _loc2_ = PropertyChangeEvent.createUpdateEvent(this,"subscribed",_subscribed,param1);
  272.             _subscribed = param1;
  273.             if(_subscribed)
  274.             {
  275.                ConsumerMessageDispatcher.getInstance().registerSubscription(this);
  276.                if(channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel)
  277.                {
  278.                   PollingChannel(channelSet.currentChannel).enablePolling();
  279.                }
  280.             }
  281.             else
  282.             {
  283.                ConsumerMessageDispatcher.getInstance().unregisterSubscription(this);
  284.                if(channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel)
  285.                {
  286.                   PollingChannel(channelSet.currentChannel).disablePolling();
  287.                }
  288.             }
  289.             dispatchEvent(_loc2_);
  290.          }
  291.       }
  292.       
  293.       [Bindable(event="propertyChange")]
  294.       public function get resubscribeAttempts() : int
  295.       {
  296.          return _resubscribeAttempts;
  297.       }
  298.       
  299.       override public function channelConnectHandler(param1:ChannelEvent) : void
  300.       {
  301.          super.channelConnectHandler(param1);
  302.          if(connected && channelSet != null && channelSet.currentChannel != null && !channelSet.currentChannel.mx_internal::realtime && Log.isWarn())
  303.          {
  304.             _log.warn("\'{0}\' {1} connected over a non-realtime channel \'{2}\'" + " which means channel is not automatically receiving updates via polling or server push.",id,_agentType,channelSet.currentChannel.id);
  305.          }
  306.       }
  307.       
  308.       override public function set destination(param1:String) : void
  309.       {
  310.          var _loc2_:Boolean = false;
  311.          if(destination != param1)
  312.          {
  313.             _loc2_ = false;
  314.             if(subscribed)
  315.             {
  316.                unsubscribe();
  317.                _loc2_ = true;
  318.             }
  319.             super.destination = param1;
  320.             if(_loc2_)
  321.             {
  322.                subscribe();
  323.             }
  324.          }
  325.       }
  326.       
  327.       protected function stopResubscribeTimer() : void
  328.       {
  329.          if(_resubscribeTimer != null)
  330.          {
  331.             if(Log.isDebug())
  332.             {
  333.                _log.debug("\'{0}\' {1} stopping resubscribe timer.",id,_agentType);
  334.             }
  335.             _resubscribeTimer.removeEventListener(TimerEvent.TIMER,resubscribe);
  336.             _resubscribeTimer.reset();
  337.             _resubscribeTimer = null;
  338.          }
  339.       }
  340.       
  341.       override public function acknowledge(param1:AcknowledgeMessage, param2:IMessage) : void
  342.       {
  343.          var _loc3_:CommandMessage = null;
  344.          var _loc4_:int = 0;
  345.          var _loc5_:Array = null;
  346.          var _loc6_:IMessage = null;
  347.          if(_disconnectBarrier)
  348.          {
  349.             return;
  350.          }
  351.          if(!param1.headers[AcknowledgeMessage.ERROR_HINT_HEADER] && param2 is CommandMessage)
  352.          {
  353.             _loc3_ = param2 as CommandMessage;
  354.             _loc4_ = int(_loc3_.operation);
  355.             if(_loc4_ == CommandMessage.MULTI_SUBSCRIBE_OPERATION)
  356.             {
  357.                if(param2.headers.DSlastUnsub != null)
  358.                {
  359.                   _loc4_ = int(CommandMessage.UNSUBSCRIBE_OPERATION);
  360.                }
  361.                else
  362.                {
  363.                   _loc4_ = int(CommandMessage.SUBSCRIBE_OPERATION);
  364.                }
  365.             }
  366.             switch(_loc4_)
  367.             {
  368.                case CommandMessage.UNSUBSCRIBE_OPERATION:
  369.                   if(Log.isInfo())
  370.                   {
  371.                      _log.info("\'{0}\' {1} acknowledge for unsubscribe.",id,_agentType);
  372.                   }
  373.                   super.mx_internal::setClientId(null);
  374.                   setSubscribed(false);
  375.                   param1.clientId = null;
  376.                   super.acknowledge(param1,param2);
  377.                   break;
  378.                case CommandMessage.SUBSCRIBE_OPERATION:
  379.                   stopResubscribeTimer();
  380.                   if(param1.timestamp > _timestamp)
  381.                   {
  382.                      _timestamp = param1.timestamp - 1;
  383.                   }
  384.                   if(Log.isInfo())
  385.                   {
  386.                      _log.info("\'{0}\' {1} acknowledge for subscribe. Client id \'{2}\' new timestamp {3}",id,_agentType,param1.clientId,_timestamp);
  387.                   }
  388.                   super.mx_internal::setClientId(param1.clientId);
  389.                   setSubscribed(true);
  390.                   super.acknowledge(param1,param2);
  391.                   break;
  392.                case CommandMessage.POLL_OPERATION:
  393.                   if(param1.body != null && param1.body is Array)
  394.                   {
  395.                      _loc5_ = param1.body as Array;
  396.                      for each(_loc6_ in _loc5_)
  397.                      {
  398.                         mx_internal::messageHandler(MessageEvent.createEvent(MessageEvent.MESSAGE,_loc6_));
  399.                      }
  400.                   }
  401.                   super.acknowledge(param1,param2);
  402.             }
  403.          }
  404.          else
  405.          {
  406.             super.acknowledge(param1,param2);
  407.          }
  408.       }
  409.       
  410.       protected function resubscribe(param1:TimerEvent) : void
  411.       {
  412.          var _loc2_:ErrorMessage = null;
  413.          if(_resubscribeAttempts != -1 && _currentAttempt >= _resubscribeAttempts)
  414.          {
  415.             stopResubscribeTimer();
  416.             _shouldBeSubscribed = false;
  417.             _loc2_ = new ErrorMessage();
  418.             _loc2_.faultCode = "Client.Error.Subscribe";
  419.             _loc2_.faultString = resourceManager.getString("messaging","consumerSubscribeError");
  420.             _loc2_.faultDetail = resourceManager.getString("messaging","failedToSubscribe");
  421.             _loc2_.correlationId = _subscribeMsg.messageId;
  422.             fault(_loc2_,_subscribeMsg);
  423.             return;
  424.          }
  425.          if(Log.isDebug())
  426.          {
  427.             _log.debug("\'{0}\' {1} trying to resubscribe.",id,_agentType);
  428.          }
  429.          _resubscribeTimer.delay = _resubscribeInterval;
  430.          ++_currentAttempt;
  431.          internalSend(_subscribeMsg,false);
  432.       }
  433.       
  434.       public function set resubscribeInterval(param1:int) : void
  435.       {
  436.          var _loc2_:PropertyChangeEvent = null;
  437.          var _loc3_:String = null;
  438.          if(_resubscribeInterval != param1)
  439.          {
  440.             if(param1 < 0)
  441.             {
  442.                _loc3_ = resourceManager.getString("messaging","resubscribeIntervalNegative");
  443.                throw new ArgumentError(_loc3_);
  444.             }
  445.             if(param1 == 0)
  446.             {
  447.                stopResubscribeTimer();
  448.             }
  449.             else if(_resubscribeTimer != null)
  450.             {
  451.                _resubscribeTimer.delay = param1;
  452.             }
  453.             _loc2_ = PropertyChangeEvent.createUpdateEvent(this,"resubscribeInterval",_resubscribeInterval,param1);
  454.             _resubscribeInterval = param1;
  455.             dispatchEvent(_loc2_);
  456.          }
  457.       }
  458.       
  459.       public function set resubscribeAttempts(param1:int) : void
  460.       {
  461.          var _loc2_:PropertyChangeEvent = null;
  462.          if(_resubscribeAttempts != param1)
  463.          {
  464.             if(param1 == 0)
  465.             {
  466.                stopResubscribeTimer();
  467.             }
  468.             _loc2_ = PropertyChangeEvent.createUpdateEvent(this,"resubscribeAttempts",_resubscribeAttempts,param1);
  469.             _resubscribeAttempts = param1;
  470.             dispatchEvent(_loc2_);
  471.          }
  472.       }
  473.       
  474.       public function set timestamp(param1:Number) : void
  475.       {
  476.          var _loc2_:PropertyChangeEvent = null;
  477.          if(_timestamp != param1)
  478.          {
  479.             _loc2_ = PropertyChangeEvent.createUpdateEvent(this,"timestamp",_timestamp,param1);
  480.             _timestamp = param1;
  481.             dispatchEvent(_loc2_);
  482.          }
  483.       }
  484.    }
  485. }
  486.  
  487.