home *** CD-ROM | disk | FTP | other *** search
/ Computer Active 2010 August / CA08.iso / Multimedija / shufflr.air / ShufflrClient.swf / scripts / mx / messaging / AbstractConsumer.as next >
Encoding:
Text File  |  2010-06-23  |  16.8 KB  |  512 lines

  1. package mx.messaging
  2. {
  3.    import flash.events.TimerEvent;
  4.    import flash.utils.Timer;
  5.    import mx.core.mx_internal;
  6.    import mx.events.PropertyChangeEvent;
  7.    import mx.logging.Log;
  8.    import mx.messaging.channels.PollingChannel;
  9.    import mx.messaging.events.ChannelEvent;
  10.    import mx.messaging.events.ChannelFaultEvent;
  11.    import mx.messaging.events.MessageEvent;
  12.    import mx.messaging.events.MessageFaultEvent;
  13.    import mx.messaging.messages.AcknowledgeMessage;
  14.    import mx.messaging.messages.CommandMessage;
  15.    import mx.messaging.messages.ErrorMessage;
  16.    import mx.messaging.messages.IMessage;
  17.    import mx.resources.IResourceManager;
  18.    import mx.resources.ResourceManager;
  19.    
  20.    use namespace mx_internal;
  21.    
  22.    public class AbstractConsumer extends MessageAgent
  23.    {
  24.       private var _currentAttempt:int;
  25.       
  26.       private var _timestamp:Number = -1;
  27.       
  28.       private var _resubscribeInterval:int = 5000;
  29.       
  30.       private var _resubscribeAttempts:int = 5;
  31.       
  32.       private var _resubscribeTimer:Timer;
  33.       
  34.       protected var _shouldBeSubscribed:Boolean;
  35.       
  36.       private var _subscribeMsg:CommandMessage;
  37.       
  38.       private var _maxFrequency:uint = 0;
  39.       
  40.       private var _subscribed:Boolean;
  41.       
  42.       private var resourceManager:IResourceManager = ResourceManager.getInstance();
  43.       
  44.       public function AbstractConsumer()
  45.       {
  46.          super();
  47.          _log = Log.getLogger("mx.messaging.Consumer");
  48.          _agentType = "consumer";
  49.       }
  50.       
  51.       override public function channelFaultHandler(param1:ChannelFaultEvent) : void
  52.       {
  53.          if(!param1.channel.connected)
  54.          {
  55.             setSubscribed(false);
  56.          }
  57.          super.channelFaultHandler(param1);
  58.          if(_shouldBeSubscribed && !param1.rejected && !param1.channel.connected)
  59.          {
  60.             startResubscribeTimer();
  61.          }
  62.       }
  63.       
  64.       protected function buildUnsubscribeMessage(param1:Boolean) : CommandMessage
  65.       {
  66.          var _loc2_:CommandMessage = new CommandMessage();
  67.          _loc2_.operation = CommandMessage.UNSUBSCRIBE_OPERATION;
  68.          _loc2_.clientId = clientId;
  69.          _loc2_.destination = destination;
  70.          if(param1)
  71.          {
  72.             _loc2_.headers[CommandMessage.PRESERVE_DURABLE_HEADER] = param1;
  73.          }
  74.          return _loc2_;
  75.       }
  76.       
  77.       public function receive(param1:Number = 0) : void
  78.       {
  79.          var _loc2_:CommandMessage = null;
  80.          if(clientId != null)
  81.          {
  82.             _loc2_ = new CommandMessage();
  83.             _loc2_.operation = CommandMessage.POLL_OPERATION;
  84.             _loc2_.destination = destination;
  85.             internalSend(_loc2_);
  86.          }
  87.       }
  88.       
  89.       protected function resubscribe(param1:TimerEvent) : void
  90.       {
  91.          var _loc2_:ErrorMessage = null;
  92.          if(_resubscribeAttempts != -1 && _currentAttempt >= _resubscribeAttempts)
  93.          {
  94.             stopResubscribeTimer();
  95.             _shouldBeSubscribed = false;
  96.             _loc2_ = new ErrorMessage();
  97.             _loc2_.faultCode = "Client.Error.Subscribe";
  98.             _loc2_.faultString = resourceManager.getString("messaging","consumerSubscribeError");
  99.             _loc2_.faultDetail = resourceManager.getString("messaging","failedToSubscribe");
  100.             _loc2_.correlationId = _subscribeMsg.messageId;
  101.             fault(_loc2_,_subscribeMsg);
  102.             return;
  103.          }
  104.          if(Log.isDebug())
  105.          {
  106.             _log.debug("\'{0}\' {1} trying to resubscribe.",id,_agentType);
  107.          }
  108.          _resubscribeTimer.delay = _resubscribeInterval;
  109.          ++_currentAttempt;
  110.          internalSend(_subscribeMsg,false);
  111.       }
  112.       
  113.       override mx_internal function setClientId(param1:String) : void
  114.       {
  115.          var _loc2_:Boolean = false;
  116.          if(super.clientId != param1)
  117.          {
  118.             _loc2_ = false;
  119.             if(subscribed)
  120.             {
  121.                unsubscribe();
  122.                _loc2_ = true;
  123.             }
  124.             super.mx_internal::setClientId(param1);
  125.             if(_loc2_)
  126.             {
  127.                subscribe(param1);
  128.             }
  129.          }
  130.       }
  131.       
  132.       override public function disconnect() : void
  133.       {
  134.          _shouldBeSubscribed = false;
  135.          stopResubscribeTimer();
  136.          setSubscribed(false);
  137.          super.disconnect();
  138.       }
  139.       
  140.       public function subscribe(param1:String = null) : void
  141.       {
  142.          var _loc2_:Boolean = param1 != null && super.clientId != param1 ? true : false;
  143.          if(subscribed && _loc2_)
  144.          {
  145.             unsubscribe();
  146.          }
  147.          stopResubscribeTimer();
  148.          _shouldBeSubscribed = true;
  149.          if(_loc2_)
  150.          {
  151.             super.mx_internal::setClientId(param1);
  152.          }
  153.          if(Log.isInfo())
  154.          {
  155.             _log.info("\'{0}\' {1} subscribe.",id,_agentType);
  156.          }
  157.          _subscribeMsg = buildSubscribeMessage();
  158.          internalSend(_subscribeMsg);
  159.       }
  160.       
  161.       override public function channelDisconnectHandler(param1:ChannelEvent) : void
  162.       {
  163.          setSubscribed(false);
  164.          super.channelDisconnectHandler(param1);
  165.          if(_shouldBeSubscribed && !param1.rejected)
  166.          {
  167.             startResubscribeTimer();
  168.          }
  169.       }
  170.       
  171.       protected function buildSubscribeMessage() : CommandMessage
  172.       {
  173.          var _loc1_:CommandMessage = new CommandMessage();
  174.          _loc1_.operation = CommandMessage.SUBSCRIBE_OPERATION;
  175.          _loc1_.clientId = clientId;
  176.          _loc1_.destination = destination;
  177.          if(maxFrequency > 0)
  178.          {
  179.             _loc1_.headers[CommandMessage.MAX_FREQUENCY_HEADER] = maxFrequency;
  180.          }
  181.          return _loc1_;
  182.       }
  183.       
  184.       protected function startResubscribeTimer() : void
  185.       {
  186.          if(_shouldBeSubscribed && _resubscribeTimer == null)
  187.          {
  188.             if(_resubscribeAttempts != 0 && _resubscribeInterval > 0)
  189.             {
  190.                if(Log.isDebug())
  191.                {
  192.                   _log.debug("\'{0}\' {1} starting resubscribe timer.",id,_agentType);
  193.                }
  194.                _resubscribeTimer = new Timer(1);
  195.                _resubscribeTimer.addEventListener(TimerEvent.TIMER,resubscribe);
  196.                _resubscribeTimer.start();
  197.                _currentAttempt = 0;
  198.             }
  199.          }
  200.       }
  201.       
  202.       public function unsubscribe(param1:Boolean = false) : void
  203.       {
  204.          _shouldBeSubscribed = false;
  205.          if(subscribed)
  206.          {
  207.             if(channelSet != null)
  208.             {
  209.                channelSet.removeEventListener(destination,mx_internal::messageHandler);
  210.             }
  211.             if(Log.isInfo())
  212.             {
  213.                _log.info("\'{0}\' {1} unsubscribe.",id,_agentType);
  214.             }
  215.             internalSend(buildUnsubscribeMessage(param1));
  216.          }
  217.          else
  218.          {
  219.             stopResubscribeTimer();
  220.          }
  221.       }
  222.       
  223.       mx_internal function messageHandler(param1:MessageEvent) : void
  224.       {
  225.          var _loc3_:CommandMessage = null;
  226.          var _loc2_:IMessage = param1.message;
  227.          if(_loc2_ is CommandMessage)
  228.          {
  229.             _loc3_ = _loc2_ as CommandMessage;
  230.             switch(_loc3_.operation)
  231.             {
  232.                case CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION:
  233.                   if(channelSet.currentChannel is PollingChannel)
  234.                   {
  235.                      PollingChannel(channelSet.currentChannel).disablePolling();
  236.                   }
  237.                   setSubscribed(false);
  238.                   break;
  239.                default:
  240.                   if(Log.isWarn())
  241.                   {
  242.                      _log.warn("\'{0}\' received a CommandMessage \'{1}\' that could not be handled.",id,CommandMessage.getOperationAsString(_loc3_.operation));
  243.                   }
  244.             }
  245.             return;
  246.          }
  247.          if(_loc2_.timestamp > _timestamp)
  248.          {
  249.             _timestamp = _loc2_.timestamp;
  250.          }
  251.          if(_loc2_ is ErrorMessage)
  252.          {
  253.             dispatchEvent(MessageFaultEvent.createEvent(ErrorMessage(_loc2_)));
  254.          }
  255.          else
  256.          {
  257.             dispatchEvent(MessageEvent.createEvent(MessageEvent.MESSAGE,_loc2_));
  258.          }
  259.       }
  260.       
  261.       [Bindable(event="propertyChange")]
  262.       public function get timestamp() : Number
  263.       {
  264.          return _timestamp;
  265.       }
  266.       
  267.       [Bindable(event="propertyChange")]
  268.       public function get maxFrequency() : uint
  269.       {
  270.          return _maxFrequency;
  271.       }
  272.       
  273.       [Bindable(event="propertyChange")]
  274.       public function get resubscribeInterval() : int
  275.       {
  276.          return _resubscribeInterval;
  277.       }
  278.       
  279.       [Bindable(event="propertyChange")]
  280.       public function get subscribed() : Boolean
  281.       {
  282.          return _subscribed;
  283.       }
  284.       
  285.       override public function fault(param1:ErrorMessage, param2:IMessage) : void
  286.       {
  287.          if(_disconnectBarrier)
  288.          {
  289.             return;
  290.          }
  291.          if(param1.headers[ErrorMessage.RETRYABLE_HINT_HEADER])
  292.          {
  293.             if(_resubscribeTimer == null)
  294.             {
  295.                if(_subscribeMsg != null && param1.correlationId == _subscribeMsg.messageId)
  296.                {
  297.                   _shouldBeSubscribed = false;
  298.                }
  299.                super.fault(param1,param2);
  300.             }
  301.          }
  302.          else
  303.          {
  304.             super.fault(param1,param2);
  305.          }
  306.       }
  307.       
  308.       override public function set destination(param1:String) : void
  309.       {
  310.          var _loc2_:Boolean = false;
  311.          if(destination != param1)
  312.          {
  313.             _loc2_ = false;
  314.             if(subscribed)
  315.             {
  316.                unsubscribe();
  317.                _loc2_ = true;
  318.             }
  319.             super.destination = param1;
  320.             if(_loc2_)
  321.             {
  322.                subscribe();
  323.             }
  324.          }
  325.       }
  326.       
  327.       [Bindable(event="propertyChange")]
  328.       public function get resubscribeAttempts() : int
  329.       {
  330.          return _resubscribeAttempts;
  331.       }
  332.       
  333.       protected function stopResubscribeTimer() : void
  334.       {
  335.          if(_resubscribeTimer != null)
  336.          {
  337.             if(Log.isDebug())
  338.             {
  339.                _log.debug("\'{0}\' {1} stopping resubscribe timer.",id,_agentType);
  340.             }
  341.             _resubscribeTimer.removeEventListener(TimerEvent.TIMER,resubscribe);
  342.             _resubscribeTimer.reset();
  343.             _resubscribeTimer = null;
  344.          }
  345.       }
  346.       
  347.       override public function channelConnectHandler(param1:ChannelEvent) : void
  348.       {
  349.          super.channelConnectHandler(param1);
  350.          if(connected && channelSet != null && channelSet.currentChannel != null && !channelSet.currentChannel.mx_internal::realtime && Log.isWarn())
  351.          {
  352.             _log.warn("\'{0}\' {1} connected over a non-realtime channel \'{2}\'" + " which means channel is not automatically receiving updates via polling or server push.",id,_agentType,channelSet.currentChannel.id);
  353.          }
  354.       }
  355.       
  356.       protected function setSubscribed(param1:Boolean) : void
  357.       {
  358.          var _loc2_:PropertyChangeEvent = null;
  359.          if(_subscribed != param1)
  360.          {
  361.             _loc2_ = PropertyChangeEvent.createUpdateEvent(this,"subscribed",_subscribed,param1);
  362.             _subscribed = param1;
  363.             if(_subscribed)
  364.             {
  365.                ConsumerMessageDispatcher.getInstance().registerSubscription(this);
  366.                if(channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel)
  367.                {
  368.                   PollingChannel(channelSet.currentChannel).enablePolling();
  369.                }
  370.             }
  371.             else
  372.             {
  373.                ConsumerMessageDispatcher.getInstance().unregisterSubscription(this);
  374.                if(channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel)
  375.                {
  376.                   PollingChannel(channelSet.currentChannel).disablePolling();
  377.                }
  378.             }
  379.             dispatchEvent(_loc2_);
  380.          }
  381.       }
  382.       
  383.       public function set maxFrequency(param1:uint) : void
  384.       {
  385.          var _loc2_:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this,"maxFrequency",_maxFrequency,param1);
  386.          _maxFrequency = param1;
  387.          dispatchEvent(_loc2_);
  388.       }
  389.       
  390.       override public function acknowledge(param1:AcknowledgeMessage, param2:IMessage) : void
  391.       {
  392.          var _loc3_:CommandMessage = null;
  393.          var _loc4_:int = 0;
  394.          var _loc5_:Array = null;
  395.          var _loc6_:IMessage = null;
  396.          if(_disconnectBarrier)
  397.          {
  398.             return;
  399.          }
  400.          if(!param1.headers[AcknowledgeMessage.ERROR_HINT_HEADER] && param2 is CommandMessage)
  401.          {
  402.             _loc3_ = param2 as CommandMessage;
  403.             _loc4_ = int(_loc3_.operation);
  404.             if(_loc4_ == CommandMessage.MULTI_SUBSCRIBE_OPERATION)
  405.             {
  406.                if(param2.headers.DSlastUnsub != null)
  407.                {
  408.                   _loc4_ = int(CommandMessage.UNSUBSCRIBE_OPERATION);
  409.                }
  410.                else
  411.                {
  412.                   _loc4_ = int(CommandMessage.SUBSCRIBE_OPERATION);
  413.                }
  414.             }
  415.             switch(_loc4_)
  416.             {
  417.                case CommandMessage.UNSUBSCRIBE_OPERATION:
  418.                   if(Log.isInfo())
  419.                   {
  420.                      _log.info("\'{0}\' {1} acknowledge for unsubscribe.",id,_agentType);
  421.                   }
  422.                   super.mx_internal::setClientId(null);
  423.                   setSubscribed(false);
  424.                   param1.clientId = null;
  425.                   super.acknowledge(param1,param2);
  426.                   break;
  427.                case CommandMessage.SUBSCRIBE_OPERATION:
  428.                   stopResubscribeTimer();
  429.                   if(param1.timestamp > _timestamp)
  430.                   {
  431.                      _timestamp = param1.timestamp - 1;
  432.                   }
  433.                   if(Log.isInfo())
  434.                   {
  435.                      _log.info("\'{0}\' {1} acknowledge for subscribe. Client id \'{2}\' new timestamp {3}",id,_agentType,param1.clientId,_timestamp);
  436.                   }
  437.                   super.mx_internal::setClientId(param1.clientId);
  438.                   setSubscribed(true);
  439.                   super.acknowledge(param1,param2);
  440.                   break;
  441.                case CommandMessage.POLL_OPERATION:
  442.                   if(param1.body != null && param1.body is Array)
  443.                   {
  444.                      _loc5_ = param1.body as Array;
  445.                      for each(_loc6_ in _loc5_)
  446.                      {
  447.                         mx_internal::messageHandler(MessageEvent.createEvent(MessageEvent.MESSAGE,_loc6_));
  448.                      }
  449.                   }
  450.                   super.acknowledge(param1,param2);
  451.             }
  452.          }
  453.          else
  454.          {
  455.             super.acknowledge(param1,param2);
  456.          }
  457.       }
  458.       
  459.       public function set resubscribeInterval(param1:int) : void
  460.       {
  461.          var _loc2_:PropertyChangeEvent = null;
  462.          var _loc3_:String = null;
  463.          if(_resubscribeInterval != param1)
  464.          {
  465.             if(param1 < 0)
  466.             {
  467.                _loc3_ = resourceManager.getString("messaging","resubscribeIntervalNegative");
  468.                throw new ArgumentError(_loc3_);
  469.             }
  470.             if(param1 == 0)
  471.             {
  472.                stopResubscribeTimer();
  473.             }
  474.             else if(_resubscribeTimer != null)
  475.             {
  476.                _resubscribeTimer.delay = param1;
  477.             }
  478.             _loc2_ = PropertyChangeEvent.createUpdateEvent(this,"resubscribeInterval",_resubscribeInterval,param1);
  479.             _resubscribeInterval = param1;
  480.             dispatchEvent(_loc2_);
  481.          }
  482.       }
  483.       
  484.       public function set resubscribeAttempts(param1:int) : void
  485.       {
  486.          var _loc2_:PropertyChangeEvent = null;
  487.          if(_resubscribeAttempts != param1)
  488.          {
  489.             if(param1 == 0)
  490.             {
  491.                stopResubscribeTimer();
  492.             }
  493.             _loc2_ = PropertyChangeEvent.createUpdateEvent(this,"resubscribeAttempts",_resubscribeAttempts,param1);
  494.             _resubscribeAttempts = param1;
  495.             dispatchEvent(_loc2_);
  496.          }
  497.       }
  498.       
  499.       public function set timestamp(param1:Number) : void
  500.       {
  501.          var _loc2_:PropertyChangeEvent = null;
  502.          if(_timestamp != param1)
  503.          {
  504.             _loc2_ = PropertyChangeEvent.createUpdateEvent(this,"timestamp",_timestamp,param1);
  505.             _timestamp = param1;
  506.             dispatchEvent(_loc2_);
  507.          }
  508.       }
  509.    }
  510. }
  511.  
  512.