home *** CD-ROM | disk | FTP | other *** search
/ Netrunner 2004 October / NETRUNNER0410.ISO / regular / ActivePerl-5.8.4.810-MSWin32-x86.msi / _951416620c47efd279bcb9f723f3a35a < prev    next >
Encoding:
Text File  |  2004-06-01  |  7.6 KB  |  287 lines

  1. # ======================================================================
  2. #
  3. # Copyright (C) 2000-2001 Paul Kulchenko (paulclinger@yahoo.com)
  4. # SOAP::Lite is free software; you can redistribute it
  5. # and/or modify it under the same terms as Perl itself.
  6. #
  7. # $Id: MQ.pm,v 1.3 2001/08/11 19:09:57 paulk Exp $
  8. #
  9. # ======================================================================
  10.  
  11. package SOAP::Transport::MQ;
  12.  
  13. use strict;
  14. use vars qw($VERSION);
  15. $VERSION = sprintf("%d.%s", map {s/_//g; $_} q$Name: release-0_55-public $ =~ /-(\d+)_([\d_]+)/);
  16.  
  17. use MQClient::MQSeries; 
  18. use MQSeries::QueueManager;
  19. use MQSeries::Queue;
  20. use MQSeries::Message;
  21.  
  22. use URI;
  23. use URI::Escape; 
  24. use SOAP::Lite;
  25.  
  26. # ======================================================================
  27.  
  28. package URI::mq; # ok, lets do 'mq://' scheme
  29. require URI::_server; require URI::_userpass; 
  30. @URI::mq::ISA=qw(URI::_server URI::_userpass);
  31.  
  32.   # mq://user@host:port?Channel=A;QueueManager=B;RequestQueue=C;ReplyQueue=D
  33.   # ^^   ^^^^ ^^^^ ^^^^ ^^^^^^^^^ ^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^ ^^^^^^^^^^^^
  34.  
  35. # ======================================================================
  36.  
  37. package SOAP::Transport::MQ::Client;
  38.  
  39. use vars qw(@ISA);
  40. @ISA = qw(SOAP::Client);
  41.  
  42. use MQSeries qw(:constants);
  43.  
  44. sub DESTROY { SOAP::Trace::objects('()') }
  45.  
  46. sub new { 
  47.   my $self = shift;
  48.  
  49.   unless (ref $self) {
  50.     my $class = ref($self) || $self;
  51.     my(@params, @methods);
  52.     while (@_) { $class->can($_[0]) ? push(@methods, shift() => shift) : push(@params, shift) }
  53.     $self = bless {@params} => $class;
  54.     while (@methods) { my($method, $params) = splice(@methods,0,2);
  55.       $self->$method(ref $params eq 'ARRAY' ? @$params : $params) 
  56.     }
  57.     SOAP::Trace::objects('()');
  58.   }
  59.   return $self;
  60. }
  61.  
  62. sub BEGIN {
  63.   no strict 'refs';
  64.   for my $method (qw(requestqueue replyqueue)) {
  65.     my $field = '_' . $method;
  66.     *$method = sub {
  67.       my $self = shift->new;
  68.       @_ ? ($self->{$field} = shift, return $self) : return $self->{$field};
  69.     }
  70.   }
  71. }
  72.  
  73. sub endpoint {
  74.   my $self = shift;
  75.  
  76.   return $self->SUPER::endpoint unless @_;
  77.  
  78.   my $endpoint = shift;
  79.  
  80.   # nothing to do if new endpoint is the same as the current one
  81.   return $self if $self->SUPER::endpoint eq $endpoint;
  82.  
  83.   my $uri = URI->new($endpoint);
  84.   my %parameters = (%$self, map {URI::Escape::uri_unescape($_)} map {split/=/,$_,2} split /[&;]/, $uri->query || '');
  85.  
  86.   $ENV{MQSERVER} = sprintf "%s/TCP/%s(%s)", $parameters{Channel}, $uri->host, $uri->port
  87.     if $uri->host;
  88.  
  89.   my $qmgr = MQSeries::QueueManager->new(QueueManager => $parameters{QueueManager}) ||
  90.     die "Unable to connect to queue manager $parameters{QueueManager}\n";
  91.  
  92.   $self->requestqueue(MQSeries::Queue->new (
  93.     QueueManager => $qmgr,
  94.     Queue        => $parameters{RequestQueue},
  95.     Mode         => 'output',
  96.   ) || die "Unable to open $parameters{RequestQueue}\n");
  97.  
  98.   $self->replyqueue(MQSeries::Queue->new (
  99.     QueueManager => $qmgr,
  100.     Queue        => $parameters{ReplyQueue},
  101.     Mode         => 'input',
  102.   ) || die "Unable to open $parameters{ReplyQueue}\n");
  103.  
  104.   $self->SUPER::endpoint($endpoint);
  105. }
  106.  
  107. sub send_receive {
  108.   my($self, %parameters) = @_;
  109.   my($envelope, $endpoint) = 
  110.     @parameters{qw(envelope endpoint)};
  111.  
  112.   $self->endpoint($endpoint ||= $self->endpoint);
  113.  
  114.   %parameters = (%$self, %parameters);
  115.   my $expiry = $parameters{Expiry} || 60000;
  116.  
  117.   SOAP::Trace::debug($envelope);
  118.  
  119.   my $request = MQSeries::Message->new (
  120.     MsgDesc => {Format => MQFMT_STRING, Expiry => $expiry},
  121.     Data => $envelope,
  122.   );
  123.  
  124.   $self->requestqueue->Put(Message => $request) ||
  125.     die "Unable to put message to queue\n";
  126.  
  127.   my $reply = MQSeries::Message->new (
  128.     MsgDesc => {CorrelId => $request->MsgDesc('MsgId')},
  129.   );
  130.  
  131.   my $result = $self->replyqueue->Get (
  132.     Message => $reply,
  133.     Wait => $expiry,
  134.   );
  135.  
  136.   my $msg = $reply->Data if $result > 0;
  137.  
  138.   SOAP::Trace::debug($msg);
  139.  
  140.   my $code = $result > 0 ? undef : 
  141.              $result < 0 ? 'Timeout' : 'Error occured while waiting for response';
  142.  
  143.   $self->code($code);
  144.   $self->message($code);
  145.   $self->is_success(!defined $code || $code eq '');
  146.   $self->status($code);
  147.  
  148.   return $msg;
  149. }
  150.  
  151. # ======================================================================
  152.  
  153. package SOAP::Transport::MQ::Server;
  154.  
  155. use Carp ();
  156. use vars qw(@ISA $AUTOLOAD);
  157. @ISA = qw(SOAP::Server);
  158.  
  159. use MQSeries qw(:constants);
  160.  
  161. sub new {
  162.   my $self = shift;
  163.     
  164.   unless (ref $self) {
  165.     my $class = ref($self) || $self;
  166.     my $uri = URI->new(shift);
  167.     $self = $class->SUPER::new(@_);
  168.  
  169.     my %parameters = (%$self, map {URI::Escape::uri_unescape($_)} map {split/=/,$_,2} split /[&;]/, $uri->query || '');
  170.  
  171.     $ENV{MQSERVER} = sprintf "%s/TCP/%s(%s)", $parameters{Channel}, $uri->host, $uri->port
  172.       if $uri->host;
  173.  
  174.     my $qmgr = MQSeries::QueueManager->new(QueueManager => $parameters{QueueManager}) ||
  175.       Carp::croak "Unable to connect to queue manager $parameters{QueueManager}";
  176.  
  177.     $self->requestqueue(MQSeries::Queue->new (
  178.       QueueManager => $qmgr,
  179.       Queue        => $parameters{RequestQueue},
  180.       Mode         => 'input',
  181.     ) || Carp::croak  "Unable to open $parameters{RequestQueue}");
  182.  
  183.     $self->replyqueue(MQSeries::Queue->new (
  184.       QueueManager => $qmgr,
  185.       Queue        => $parameters{ReplyQueue},
  186.       Mode         => 'output',
  187.     ) || Carp::croak  "Unable to open $parameters{ReplyQueue}");
  188.   }
  189.   return $self;
  190. }
  191.  
  192. sub BEGIN {
  193.   no strict 'refs';
  194.   for my $method (qw(requestqueue replyqueue)) {
  195.     my $field = '_' . $method;
  196.     *$method = sub {
  197.       my $self = shift->new;
  198.       @_ ? ($self->{$field} = shift, return $self) : return $self->{$field};
  199.     }
  200.   }
  201. }
  202.  
  203. sub handle {
  204.   my $self = shift->new;
  205.  
  206.   my $msg = 0;
  207.   while (1) {
  208.     my $request = MQSeries::Message->new;
  209.  
  210.     # nonblock waiting
  211.     $self->requestqueue->Get (
  212.       Message => $request,
  213.     ) || die "Error occured while waiting for requests\n";
  214.  
  215.     return $msg if $self->requestqueue->Reason == MQRC_NO_MSG_AVAILABLE;
  216.  
  217.     my $reply = MQSeries::Message->new (
  218.       MsgDesc => {
  219.         CorrelId => $request->MsgDesc('MsgId'),
  220.         Expiry   => $request->MsgDesc('Expiry'),
  221.       },
  222.       Data => $self->SUPER::handle($request->Data),
  223.     );
  224.  
  225.     $self->replyqueue->Put (
  226.       Message => $reply,
  227.     ) || die "Unable to put reply message\n";
  228.  
  229.     $msg++;
  230.   }
  231. }
  232.  
  233. # ======================================================================
  234.  
  235. 1;
  236.  
  237. __END__
  238.  
  239. =head1 NAME
  240.  
  241. SOAP::Transport::MQ - Server/Client side MQ support for SOAP::Lite
  242.  
  243. =head1 SYNOPSIS
  244.  
  245. =over 4
  246.  
  247. =item Client
  248.  
  249.   use SOAP::Lite 
  250.     uri => 'http://my.own.site.com/My/Examples',
  251.     proxy => 'mq://server:port?Channel=CHAN1;QueueManager=QM_SOAP;RequestQueue=SOAPREQ1;ReplyQueue=SOAPRESP1',
  252.   ;
  253.  
  254.   print getStateName(1);
  255.  
  256. =item Server
  257.  
  258.   use SOAP::Transport::MQ;
  259.  
  260.   my $server = SOAP::Transport::MQ::Server
  261.     ->new('mq://server:port?Channel=CHAN1;QueueManager=QM_SOAP;RequestQueue=SOAPREQ1;ReplyQueue=SOAPRESP1')
  262.     # specify list of objects-by-reference here 
  263.     -> objects_by_reference(qw(My::PersistentIterator My::SessionIterator My::Chat))
  264.     # specify path to My/Examples.pm here
  265.     -> dispatch_to('/Your/Path/To/Deployed/Modules', 'Module::Name', 'Module::method')
  266.   ;
  267.  
  268.   print "Contact to SOAP server\n";
  269.   do { $server->handle } while sleep 1;
  270.  
  271. =back
  272.  
  273. =head1 DESCRIPTION
  274.  
  275. =head1 COPYRIGHT
  276.  
  277. Copyright (C) 2000-2001 Paul Kulchenko. All rights reserved.
  278.  
  279. This library is free software; you can redistribute it and/or modify
  280. it under the same terms as Perl itself.
  281.  
  282. =head1 AUTHOR
  283.  
  284. Paul Kulchenko (paulclinger@yahoo.com)
  285.  
  286. =cut
  287.