home *** CD-ROM | disk | FTP | other *** search
/ PC Professionell 2004 December / PCpro_2004_12.ISO / files / webserver / xampp / xampp-perl-addon-1.4.9-installer.exe / ReadWrite.pm < prev    next >
Encoding:
Perl POD Document  |  2004-04-20  |  29.3 KB  |  899 lines

  1. # $Id: ReadWrite.pm,v 1.68 2004/04/20 00:15:55 sungo Exp $
  2.  
  3. package POE::Wheel::ReadWrite;
  4.  
  5. use strict;
  6.  
  7. use vars qw($VERSION);
  8. $VERSION = do {my@r=(q$Revision: 1.68 $=~/\d+/g);sprintf"%d."."%04d"x$#r,@r};
  9.  
  10. use Carp;
  11. use POE qw(Wheel Driver::SysRW Filter::Line);
  12.  
  13. # Offsets into $self.
  14. sub HANDLE_INPUT               () {  0 }
  15. sub HANDLE_OUTPUT              () {  1 }
  16. sub FILTER_INPUT               () {  2 }
  17. sub FILTER_OUTPUT              () {  3 }
  18. sub DRIVER_BOTH                () {  4 }
  19. sub EVENT_INPUT                () {  5 }
  20. sub EVENT_ERROR                () {  6 }
  21. sub EVENT_FLUSHED              () {  7 }
  22. sub WATERMARK_WRITE_MARK_HIGH  () {  8 }
  23. sub WATERMARK_WRITE_MARK_LOW   () {  9 }
  24. sub WATERMARK_WRITE_EVENT_HIGH () { 10 }
  25. sub WATERMARK_WRITE_EVENT_LOW  () { 11 }
  26. sub WATERMARK_WRITE_STATE      () { 12 }
  27. sub DRIVER_BUFFERED_OUT_OCTETS () { 13 }
  28. sub STATE_WRITE                () { 14 }
  29. sub STATE_READ                 () { 15 }
  30. sub UNIQUE_ID                  () { 16 }
  31.  
  32. sub CRIMSON_SCOPE_HACK ($) { 0 }
  33.  
  34. #------------------------------------------------------------------------------
  35.  
  36. sub new {
  37.   my $type = shift;
  38.   my %params = @_;
  39.  
  40.   croak "wheels no longer require a kernel reference as their first parameter"
  41.     if (@_ && (ref($_[0]) eq 'POE::Kernel'));
  42.  
  43.   croak "$type requires a working Kernel" unless defined $poe_kernel;
  44.  
  45.   my ($in_handle, $out_handle);
  46.   if (defined $params{Handle}) {
  47.     carp "Ignoring InputHandle parameter (Handle parameter takes precedence)"
  48.       if defined $params{InputHandle};
  49.     carp "Ignoring OutputHandle parameter (Handle parameter takes precedence)"
  50.       if defined $params{OutputHandle};
  51.     $in_handle = $out_handle = delete $params{Handle};
  52.   }
  53.   else {
  54.     croak "Handle or InputHandle required"
  55.       unless defined $params{InputHandle};
  56.     croak "Handle or OutputHandle required"
  57.       unless defined $params{OutputHandle};
  58.     $in_handle  = delete $params{InputHandle};
  59.     $out_handle = delete $params{OutputHandle};
  60.   }
  61.  
  62.   my ($in_filter, $out_filter);
  63.   if (defined $params{Filter}) {
  64.     carp "Ignoring InputFilter parameter (Filter parameter takes precedence)"
  65.       if (defined $params{InputFilter});
  66.     carp "Ignoring OutputFilter parameter (Filter parameter takes precedence)"
  67.       if (defined $params{OutputFilter});
  68.     $in_filter = $out_filter = delete $params{Filter};
  69.   }
  70.   else {
  71.     $in_filter = delete $params{InputFilter};
  72.     $out_filter = delete $params{OutputFilter};
  73.  
  74.     # If neither Filter, InputFilter or OutputFilter is defined, then
  75.     # they default to POE::Filter::Line.
  76.     unless (defined $in_filter or defined $out_filter) {
  77.       $in_filter = $out_filter = POE::Filter::Line->new();
  78.     }
  79.   }
  80.  
  81.   my $driver = delete $params{Driver};
  82.   $driver = POE::Driver::SysRW->new() unless defined $driver;
  83.  
  84.   { my $mark_errors = 0;
  85.     if (defined($params{HighMark}) xor defined($params{LowMark})) {
  86.       carp "HighMark and LowMark parameters require each-other";
  87.       $mark_errors++;
  88.     }
  89.     # Then they both exist, and they must be checked.
  90.     elsif (defined $params{HighMark}) {
  91.       unless (defined($params{HighMark}) and defined($params{LowMark})) {
  92.         carp "HighMark and LowMark parameters must both be defined";
  93.         $mark_errors++;
  94.       }
  95.       unless (($params{HighMark} > 0) and ($params{LowMark} > 0)) {
  96.         carp "HighMark and LowMark parameters must be above 0";
  97.         $mark_errors++;
  98.       }
  99.     }
  100.     if (defined($params{HighMark}) xor defined($params{HighEvent})) {
  101.       carp "HighMark and HighEvent parameters require each-other";
  102.       $mark_errors++;
  103.     }
  104.     if (defined($params{LowMark}) xor defined($params{LowEvent})) {
  105.       carp "LowMark and LowEvent parameters require each-other";
  106.       $mark_errors++;
  107.     }
  108.     croak "Water mark errors" if $mark_errors;
  109.   }
  110.  
  111.   my $self = bless
  112.     [ $in_handle,                       # HANDLE_INPUT
  113.       $out_handle,                      # HANDLE_OUTPUT
  114.       $in_filter,                       # FILTER_INPUT
  115.       $out_filter,                      # FILTER_OUTPUT
  116.       $driver,                          # DRIVER_BOTH
  117.       delete $params{InputEvent},       # EVENT_INPUT
  118.       delete $params{ErrorEvent},       # EVENT_ERROR
  119.       delete $params{FlushedEvent},     # EVENT_FLUSHED
  120.       # Water marks.
  121.       delete $params{HighMark},         # WATERMARK_WRITE_MARK_HIGH
  122.       delete $params{LowMark},          # WATERMARK_WRITE_MARK_LOW
  123.       delete $params{HighEvent},        # WATERMARK_WRITE_EVENT_HIGH
  124.       delete $params{LowEvent},         # WATERMARK_WRITE_EVENT_LOW
  125.       0,                                # WATERMARK_WRITE_STATE
  126.       # Driver statistics.
  127.       0,                                # DRIVER_BUFFERED_OUT_OCTETS
  128.       # Dynamic state names.
  129.       undef,                            # STATE_WRITE
  130.       undef,                            # STATE_READ
  131.       # Unique ID.
  132.       &POE::Wheel::allocate_wheel_id(), # UNIQUE_ID
  133.     ], $type;
  134.  
  135.   if (scalar keys %params) {
  136.     carp( "unknown parameters in $type constructor call: ",
  137.           join(', ', keys %params)
  138.         );
  139.   }
  140.  
  141.   $self->_define_read_state();
  142.   $self->_define_write_state();
  143.  
  144.   return $self;
  145. }
  146.  
  147. #------------------------------------------------------------------------------
  148. # Redefine the select-write handler.  This uses stupid closure tricks
  149. # to prevent keeping extra references to $self around.
  150.  
  151. sub _define_write_state {
  152.   my $self = shift;
  153.  
  154.   # Read-only members.  If any of these change, then the write state
  155.   # is invalidated and needs to be redefined.
  156.   my $driver        = $self->[DRIVER_BOTH];
  157.   my $high_mark     = $self->[WATERMARK_WRITE_MARK_HIGH];
  158.   my $low_mark      = $self->[WATERMARK_WRITE_MARK_LOW];
  159.   my $event_error   = \$self->[EVENT_ERROR];
  160.   my $event_flushed = \$self->[EVENT_FLUSHED];
  161.   my $event_high    = \$self->[WATERMARK_WRITE_EVENT_HIGH];
  162.   my $event_low     = \$self->[WATERMARK_WRITE_EVENT_LOW];
  163.   my $unique_id     = $self->[UNIQUE_ID];
  164.  
  165.   # Read/write members.  These are done by reference, to avoid pushing
  166.   # $self into the anonymous sub.  Extra copies of $self are bad and
  167.   # can prevent wheels from destructing properly.
  168.   my $is_in_high_water_state     = \$self->[WATERMARK_WRITE_STATE];
  169.   my $driver_buffered_out_octets = \$self->[DRIVER_BUFFERED_OUT_OCTETS];
  170.  
  171.   # Register the select-write handler.
  172.  
  173.   $poe_kernel->state
  174.     ( $self->[STATE_WRITE] = ref($self) . "($unique_id) -> select write",
  175.       sub {                             # prevents SEGV
  176.         0 && CRIMSON_SCOPE_HACK('<');
  177.                                         # subroutine starts here
  178.         my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
  179.  
  180.         $$driver_buffered_out_octets = $driver->flush($handle);
  181.  
  182.         # When you can't write, nothing else matters.
  183.         if ($!) {
  184.           $$event_error && $k->call( $me, $$event_error,
  185.                                      'write', ($!+0), $!, $unique_id
  186.                                    );
  187.           $k->select_write($handle);
  188.         }
  189.  
  190.         # Could write, or perhaps couldn't but only because the
  191.         # filehandle's buffer is choked.
  192.         else {
  193.  
  194.           # In high water state?  Check for low water.  High water
  195.           # state will never be set if $event_low is undef, so don't
  196.           # bother checking its definedness here.
  197.           if ($$is_in_high_water_state) {
  198.             if ( $$driver_buffered_out_octets <= $low_mark ) {
  199.               $$is_in_high_water_state = 0;
  200.               $k->call( $me, $$event_low, $unique_id ) if defined $$event_low;
  201.             }
  202.           }
  203.  
  204.           # Not in high water state.  Check for high water.  Needs to
  205.           # also check definedness of $$driver_buffered_out_octets.
  206.           # Although we know this ahead of time and could probably
  207.           # optimize it away with a second state definition, it would
  208.           # be best to wait until ReadWrite stabilizes.  That way
  209.           # there will be only half as much code to maintain.
  210.           elsif ( $high_mark and
  211.                   ( $$driver_buffered_out_octets >= $high_mark )
  212.                 ) {
  213.             $$is_in_high_water_state = 1;
  214.             $k->call( $me, $$event_high, $unique_id ) if defined $$event_high;
  215.           }
  216.         }
  217.  
  218.         # All chunks written; fire off a "flushed" event.  This
  219.         # occurs independently, so it's possible to get a low-water
  220.         # call and a flushed call at the same time (if the low mark
  221.         # is 1).
  222.         unless ($$driver_buffered_out_octets) {
  223.           $k->select_pause_write($handle);
  224.           $$event_flushed && $k->call($me, $$event_flushed, $unique_id);
  225.         }
  226.       }
  227.    );
  228.  
  229.   $poe_kernel->select_write($self->[HANDLE_OUTPUT], $self->[STATE_WRITE]);
  230.  
  231.   # Pause the write select immediately, unless output is pending.
  232.   $poe_kernel->select_pause_write($self->[HANDLE_OUTPUT])
  233.     unless ($self->[DRIVER_BUFFERED_OUT_OCTETS]);
  234. }
  235.  
  236. #------------------------------------------------------------------------------
  237. # Redefine the select-read handler.  This uses stupid closure tricks
  238. # to prevent keeping extra references to $self around.
  239.  
  240. sub _define_read_state {
  241.   my $self = shift;
  242.  
  243.   # Register the select-read handler.
  244.  
  245.   if (defined $self->[EVENT_INPUT]) {
  246.  
  247.     # If any of these change, then the read state is invalidated and
  248.     # needs to be redefined.
  249.  
  250.     my $driver       = $self->[DRIVER_BOTH];
  251.     my $input_filter = \$self->[FILTER_INPUT];
  252.     my $event_input  = \$self->[EVENT_INPUT];
  253.     my $event_error  = \$self->[EVENT_ERROR];
  254.     my $unique_id    = $self->[UNIQUE_ID];
  255.  
  256.     # If the filter can get_one, then define the input state in terms
  257.     # of get_one_start() and get_one().
  258.  
  259.     if ( $$input_filter->can('get_one') and
  260.          $$input_filter->can('get_one_start')
  261.        ) {
  262.       $poe_kernel->state
  263.         ( $self->[STATE_READ] = ref($self) . "($unique_id) -> select read",
  264.           sub {
  265.  
  266.             # Protects against coredump on older perls.
  267.             0 && CRIMSON_SCOPE_HACK('<');
  268.  
  269.             # The actual code starts here.
  270.             my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
  271.             if (defined(my $raw_input = $driver->get($handle))) {
  272.               $$input_filter->get_one_start($raw_input);
  273.               while (1) {
  274.                 my $next_rec = $$input_filter->get_one();
  275.                 last unless @$next_rec;
  276.                 foreach my $cooked_input (@$next_rec) {
  277.                   $k->call($me, $$event_input, $cooked_input, $unique_id);
  278.                 }
  279.               }
  280.             }
  281.             else {
  282.               $$event_error and
  283.                 $k->call( $me, $$event_error, 'read', ($!+0), $!, $unique_id );
  284.               $k->select_read($handle);
  285.             }
  286.           }
  287.         );
  288.     }
  289.  
  290.     # Otherwise define the input state in terms of the older, less
  291.     # robust, yet faster get().
  292.  
  293.     else {
  294.       $poe_kernel->state
  295.         ( $self->[STATE_READ] = ref($self) . "($unique_id) -> select read",
  296.           sub {
  297.  
  298.             # Protects against coredump on older perls.
  299.             0 && CRIMSON_SCOPE_HACK('<');
  300.  
  301.             # The actual code starts here.
  302.             my ($k, $me, $handle) = @_[KERNEL, SESSION, ARG0];
  303.             if (defined(my $raw_input = $driver->get($handle))) {
  304.               foreach my $cooked_input (@{$$input_filter->get($raw_input)}) {
  305.                 $k->call($me, $$event_input, $cooked_input, $unique_id);
  306.               }
  307.             }
  308.             else {
  309.               $$event_error and
  310.                 $k->call( $me, $$event_error, 'read', ($!+0), $!, $unique_id );
  311.               $k->select_read($handle);
  312.             }
  313.           }
  314.         );
  315.     }
  316.                                         # register the state's select
  317.     $poe_kernel->select_read($self->[HANDLE_INPUT], $self->[STATE_READ]);
  318.   }
  319.                                         # undefine the select, just in case
  320.   else {
  321.     $poe_kernel->select_read($self->[HANDLE_INPUT])
  322.   }
  323. }
  324.  
  325. #------------------------------------------------------------------------------
  326. # Redefine events.
  327.  
  328. sub event {
  329.   my $self = shift;
  330.   push(@_, undef) if (scalar(@_) & 1);
  331.  
  332.   my ($redefine_read, $redefine_write) = (0, 0);
  333.  
  334.   while (@_) {
  335.     my ($name, $event) = splice(@_, 0, 2);
  336.  
  337.     if ($name eq 'InputEvent') {
  338.       $self->[EVENT_INPUT] = $event;
  339.       $redefine_read = 1;
  340.     }
  341.     elsif ($name eq 'ErrorEvent') {
  342.       $self->[EVENT_ERROR] = $event;
  343.       $redefine_read = $redefine_write = 1;
  344.     }
  345.     elsif ($name eq 'FlushedEvent') {
  346.       $self->[EVENT_FLUSHED] = $event;
  347.       $redefine_write = 1;
  348.     }
  349.     elsif ($name eq 'HighEvent') {
  350.       if (defined $self->[WATERMARK_WRITE_MARK_HIGH]) {
  351.         $self->[WATERMARK_WRITE_EVENT_HIGH] = $event;
  352.         $redefine_write = 1;
  353.       }
  354.       else {
  355.         carp "Ignoring HighEvent (there is no high watermark set)";
  356.       }
  357.     }
  358.     elsif ($name eq 'LowEvent') {
  359.       if (defined $self->[WATERMARK_WRITE_MARK_LOW]) {
  360.         $self->[WATERMARK_WRITE_EVENT_LOW] = $event;
  361.         $redefine_write = 1;
  362.       }
  363.       else {
  364.         carp "Ignoring LowEvent (there is no high watermark set)";
  365.       }
  366.     }
  367.     else {
  368.       carp "ignoring unknown ReadWrite parameter '$name'";
  369.     }
  370.   }
  371.  
  372.   $self->_define_read_state()  if $redefine_read;
  373.   $self->_define_write_state() if $redefine_write;
  374. }
  375.  
  376. #------------------------------------------------------------------------------
  377.  
  378. sub DESTROY {
  379.   my $self = shift;
  380.  
  381.   # Turn off the select.  This is a problem if a wheel is being
  382.   # swapped, since it will turn off selects for the other wheel.
  383.   $poe_kernel->select($self->[HANDLE_INPUT]);
  384.  
  385.   if ($self->[STATE_READ]) {
  386.     $poe_kernel->state($self->[STATE_READ]);
  387.     $self->[STATE_READ] = undef;
  388.   }
  389.  
  390.   $poe_kernel->select($self->[HANDLE_OUTPUT]);
  391.  
  392.   if ($self->[STATE_WRITE]) {
  393.     $poe_kernel->state($self->[STATE_WRITE]);
  394.     $self->[STATE_WRITE] = undef;
  395.   }
  396.  
  397.   &POE::Wheel::free_wheel_id($self->[UNIQUE_ID]);
  398. }
  399.  
  400. #------------------------------------------------------------------------------
  401. # TODO - We set the high/low watermark state here, but we don't fire
  402. # events for it.  My assumption is that the return value tells us
  403. # all we want to know.
  404.  
  405. sub put {
  406.   my ($self, @chunks) = @_;
  407.  
  408.   my $old_buffered_out_octets = $self->[DRIVER_BUFFERED_OUT_OCTETS];
  409.   my $new_buffered_out_octets =
  410.     $self->[DRIVER_BUFFERED_OUT_OCTETS] =
  411.     $self->[DRIVER_BOTH]->put($self->[FILTER_OUTPUT]->put(\@chunks));
  412.  
  413.   # Resume write-ok if the output buffer gets data.  This avoids
  414.   # redundant calls to select_resume_write(), which is probably a good
  415.   # thing.
  416.   if ($new_buffered_out_octets and !$old_buffered_out_octets) {
  417.     $poe_kernel->select_resume_write($self->[HANDLE_OUTPUT]);
  418.   }
  419.  
  420.   # If the high watermark has been reached, return true.
  421.   if (
  422.     $self->[WATERMARK_WRITE_MARK_HIGH] and
  423.     $new_buffered_out_octets >= $self->[WATERMARK_WRITE_MARK_HIGH]
  424.   ) {
  425.     return $self->[WATERMARK_WRITE_STATE] = 1;
  426.   }
  427.  
  428.   return $self->[WATERMARK_WRITE_STATE] = 0;
  429. }
  430.  
  431. #------------------------------------------------------------------------------
  432. # Redefine filter. -PG / Now that there are two filters internally,
  433. # one input and one output, make this set both of them at the same
  434. # time. -RCC
  435.  
  436. sub _transfer_input_buffer {
  437.   my ($self, $buf) = @_;
  438.  
  439.   my $old_input_filter = $self->[FILTER_INPUT];
  440.  
  441.   # If the new filter implements "get_one", use that.
  442.   if ( $old_input_filter->can('get_one') and
  443.        $old_input_filter->can('get_one_start')
  444.      ) {
  445.     if (defined $buf) {
  446.       $self->[FILTER_INPUT]->get_one_start($buf);
  447.       while ($self->[FILTER_INPUT] == $old_input_filter) {
  448.         my $next_rec = $self->[FILTER_INPUT]->get_one();
  449.         last unless @$next_rec;
  450.         foreach my $cooked_input (@$next_rec) {
  451.           $poe_kernel->call( $poe_kernel->get_active_session(),
  452.                              $self->[EVENT_INPUT],
  453.                              $cooked_input, $self->[UNIQUE_ID]
  454.                            );
  455.         }
  456.       }
  457.     }
  458.   }
  459.  
  460.   # Otherwise use the old behavior.
  461.   else {
  462.     if (defined $buf) {
  463.       foreach my $cooked_input (@{$self->[FILTER_INPUT]->get($buf)}) {
  464.         $poe_kernel->call( $poe_kernel->get_active_session(),
  465.                            $self->[EVENT_INPUT],
  466.                            $cooked_input, $self->[UNIQUE_ID]
  467.                          );
  468.       }
  469.     }
  470.   }
  471. }
  472.  
  473. # Set input and output filters.
  474.  
  475. sub set_filter {
  476.   my ($self, $new_filter) = @_;
  477.   my $buf = $self->[FILTER_INPUT]->get_pending();
  478.   $self->[FILTER_INPUT] = $self->[FILTER_OUTPUT] = $new_filter;
  479.  
  480.   $self->_define_read_state();
  481.   $self->_transfer_input_buffer($buf);
  482. }
  483.  
  484. # Redefine input and/or output filters separately.
  485. sub set_input_filter {
  486.   my ($self, $new_filter) = @_;
  487.   my $buf = $self->[FILTER_INPUT]->get_pending();
  488.   $self->[FILTER_INPUT] = $new_filter;
  489.  
  490.   $self->_define_read_state();
  491.   $self->_transfer_input_buffer($buf);
  492. }
  493.  
  494. # No closures need to be redefined or anything.  All the previously
  495. # put stuff has been serialized already.
  496. sub set_output_filter {
  497.   my ($self, $new_filter) = @_;
  498.   $self->[FILTER_OUTPUT] = $new_filter;
  499. }
  500.  
  501. # Get the current input filter; used for accessing the filter's custom
  502. # methods, as in: $wheel->get_input_filter()->filter_method();
  503. sub get_input_filter {
  504.   my $self = shift;
  505.   return $self->[FILTER_INPUT];
  506. }
  507.  
  508. # Get the current input filter; used for accessing the filter's custom
  509. # methods, as in: $wheel->get_input_filter()->filter_method();
  510. sub get_output_filter {
  511.   my $self = shift;
  512.   return $self->[FILTER_OUTPUT];
  513. }
  514.  
  515. # Set the high water mark.
  516.  
  517. sub set_high_mark {
  518.   my ($self, $new_high_mark) = @_;
  519.   if (defined $self->[WATERMARK_WRITE_MARK_HIGH]) {
  520.     if (defined $new_high_mark) {
  521.       if ($new_high_mark > $self->[WATERMARK_WRITE_MARK_LOW]) {
  522.         $self->[WATERMARK_WRITE_MARK_HIGH] = $new_high_mark;
  523.         $self->_define_write_state();
  524.       }
  525.       else {
  526.         carp "New high mark would not be greater than low mark.  Ignored";
  527.       }
  528.     }
  529.     else {
  530.       carp "New high mark is undefined.  Ignored";
  531.     }
  532.   }
  533.   else {
  534.     carp "Ignoring high mark (must be initialized in constructor first)";
  535.   }
  536. }
  537.  
  538. sub set_low_mark {
  539.   my ($self, $new_low_mark) = @_;
  540.   if (defined $self->[WATERMARK_WRITE_MARK_LOW]) {
  541.     if (defined $new_low_mark) {
  542.       if ($new_low_mark > 0) {
  543.         if ($new_low_mark < $self->[WATERMARK_WRITE_MARK_HIGH]) {
  544.           $self->[WATERMARK_WRITE_MARK_LOW] = $new_low_mark;
  545.           $self->_define_write_state();
  546.         }
  547.         else {
  548.           carp "New low mark would not be less than high high mark.  Ignored";
  549.         }
  550.       }
  551.       else {
  552.         carp "New low mark would be less than one.  Ignored";
  553.       }
  554.     }
  555.     else {
  556.       carp "New low mark is undefined.  Ignored";
  557.     }
  558.   }
  559.   else {
  560.     carp "Ignoring low mark (must be initialized in constructor first)";
  561.   }
  562. }
  563.  
  564. # Return driver statistics.
  565. sub get_driver_out_octets {
  566.   $_[0]->[DRIVER_BUFFERED_OUT_OCTETS];
  567. }
  568.  
  569. sub get_driver_out_messages {
  570.   $_[0]->[DRIVER_BOTH]->get_out_messages_buffered();
  571. }
  572.  
  573. # Get the wheel's ID.
  574. sub ID {
  575.   return $_[0]->[UNIQUE_ID];
  576. }
  577.  
  578. # Pause the wheel's input watcher.
  579. sub pause_input {
  580.   my $self = shift;
  581.   return unless defined $self->[HANDLE_INPUT];
  582.   $poe_kernel->select_pause_read( $self->[HANDLE_INPUT] );
  583. }
  584.  
  585. # Resume the wheel's input watcher.
  586. sub resume_input {
  587.   my $self = shift;
  588.   return unless  defined $self->[HANDLE_INPUT];
  589.   $poe_kernel->select_resume_read( $self->[HANDLE_INPUT] );
  590. }
  591.  
  592. # Shutdown the socket for reading.
  593. sub shutdown_input {
  594.   my $self = shift;
  595.   return unless defined $self->[HANDLE_INPUT];
  596.   eval { local $^W = 0; shutdown($self->[HANDLE_INPUT], 0) };
  597.   $poe_kernel->select_read($self->[HANDLE_INPUT], undef);
  598. }
  599.  
  600. # Shutdown the socket for writing.
  601. sub shutdown_output {
  602.   my $self = shift;
  603.   return unless defined $self->[HANDLE_OUTPUT];
  604.   eval { local $^W=0; shutdown($self->[HANDLE_OUTPUT], 1) };
  605.   $poe_kernel->select_write($self->[HANDLE_OUTPUT], undef);
  606. }
  607.  
  608. ###############################################################################
  609. 1;
  610.  
  611. __END__
  612.  
  613. =head1 NAME
  614.  
  615. POE::Wheel::ReadWrite - buffered non-blocking I/O
  616.  
  617. =head1 SYNOPSIS
  618.  
  619.   $wheel = POE::Wheel::ReadWrite->new(
  620.  
  621.     # To read and write from the same handle, such as a socket, use
  622.     # the Handle parameter:
  623.     Handle       => $file_or_socket_handle,  # Handle to read/write
  624.  
  625.     # To read and write from different handles, such as a dual pipe to
  626.     # a child process, or a console, use InputHandle and OutputHandle:
  627.     InputHandle  => $readable_filehandle,    # Handle to read
  628.     OutputHandle => $writable_filehandle,    # Handle to write
  629.  
  630.     Driver       => POE::Driver::Something->new(), # How to read/write it
  631.  
  632.     # To read and write using the same line discipline, such as
  633.     # Filter::Line, use the Filter parameter:
  634.     Filter       => POE::Filter::Something->new(), # How to parse in and out
  635.  
  636.     # To read and write using different line disciplines, such as
  637.     # stream out and line in:
  638.     InputFilter  => POE::Filter::Something->new(),     # Read data one way
  639.     OutputFilter => POE::Filter::SomethingElse->new(), # Write data another
  640.  
  641.     InputEvent   => $input_event_name,  # Input received event
  642.     FlushedEvent => $flush_event_name,  # Output flushed event
  643.     ErrorEvent   => $error_event_name,  # Error occurred event
  644.  
  645.     # To enable callbacks for high and low water events (using any one
  646.     # of these options requires the rest):
  647.     HighMark  => $high_mark_octets, # Outgoing high-water mark
  648.     HighEvent => $high_mark_event,  # Event to emit when high-water reached
  649.     LowMark   => $low_mark_octets,  # Outgoing low-water mark
  650.     LowEvent  => $low_mark_event,   # Event to emit when low-water reached
  651.   );
  652.  
  653.   $wheel->put( $something );
  654.   $wheel->event( ... );
  655.  
  656.   # To set both the input and output filters at once:
  657.   $wheel->set_filter( POE::Filter::Something->new() );
  658.  
  659.   # To set an input filter or an output filter:
  660.   $wheel->set_input_filter( POE::Filter::Something->new() );
  661.   $wheel->set_output_filter( POE::Filter::Something->new() );
  662.  
  663.   # To alter the high or low water marks:
  664.   $wheel->set_high_mark( $new_high_mark_octets );
  665.   $wheel->set_low_mark( $new_low_mark_octets );
  666.  
  667.   # To fetch driver statistics:
  668.   $pending_octets   = $wheel->get_driver_out_octets();
  669.   $pending_messages = $wheel->get_driver_out_messages();
  670.  
  671.   # To retrieve the wheel's ID:
  672.   print $wheel->ID;
  673.  
  674.   # To pause and resume a wheel's input events.
  675.   $wheel->pause_input();
  676.   $wheel->resume_input();
  677.  
  678.   # To shutdown a wheel's socket(s).
  679.   $wheel->shutdown_input();
  680.   $wheel->shutdown_output();
  681.  
  682. =head1 DESCRIPTION
  683.  
  684. ReadWrite performs buffered, select-based I/O on filehandles.  It
  685. generates events for common file conditions, such as when data has
  686. been read or flushed.
  687.  
  688. =head1 PUBLIC METHODS
  689.  
  690. =over 2
  691.  
  692. =item put LISTREF_OF_RECORDS
  693.  
  694. put() queues records for transmission.  They may not be transmitted
  695. immediately.  ReadWrite uses its Filter to translate the records into
  696. a form suitable for writing.  It uses its Driver to queue and send
  697. them.
  698.  
  699. put() accepts a list of records.  It returns a boolean value
  700. indicating whether the wheel's high-water mark has been reached.  It
  701. always returns false if a wheel doesn't have a high-water mark set.
  702.  
  703. This will quickly fill a wheel's output queue if it has a high-water
  704. mark set.  Otherwise it will loop infinitely, eventually exhausting
  705. memory.
  706.  
  707.   1 while $wheel->put( &get_next_thing_to_send );
  708.  
  709. =item event EVENT_TYPE => EVENT_NAME, ...
  710.  
  711. event() is covered in the POE::Wheel manpage.
  712.  
  713. =item set_filter POE_FILTER
  714.  
  715. =item set_input_filter POE_FILTER
  716.  
  717. =item set_output_filter POE_FILTER
  718.  
  719. set_input_filter() changes the filter a wheel uses for reading.
  720. set_output_filter() changes a wheel's output filter.  set_filter()
  721. changes them both at once.
  722.  
  723. These methods let programs change a wheel's underlying protocol while
  724. it runs.  It retrieves the existing filter's unprocessed input using
  725. its get_pending() method and passes that to the new filter.
  726.  
  727. Switching filters can be tricky.  Please see the discussion of
  728. get_pending() in L<POE::Filter>.
  729.  
  730. The HTTPD filter does not support get_pending(), and it will complain
  731. if a program tries to switch away from one.
  732.  
  733. =item get_input_filter
  734.  
  735. =item get_output_filter
  736.  
  737. Return the wheel's input or output filter.  In many cases, they both
  738. may be the same.  This is used to access custom methods on the filter
  739. itself; for example, Filter::Stackable has methods to push and pop
  740. filters on its stack.
  741.  
  742.   $wheel->get_input_filter()->pop();
  743.  
  744. =item set_high_mark HIGH_MARK_OCTETS
  745.  
  746. =item set_low_mark LOW_MARK_OCTETS
  747.  
  748. These methods set a wheel's high- and low-water marks.  New values
  749. will not take effect until the next put() call or internal buffer
  750. flush.  The event() method can change the events emitted by high- and
  751. low-water marks.
  752.  
  753. =item ID
  754.  
  755. The ID method returns a ReadWrite wheel's unique ID.  This ID will be
  756. included in every event the wheel generates, and it can be used to
  757. match events with the wheels which generated them.
  758.  
  759. =item pause_input
  760.  
  761. =item resume_input
  762.  
  763. ReadWrite wheels will continually generate input events for as long as
  764. they have data to read.  Sometimes it's necessary to control the flow
  765. of data coming from a wheel or the input filehandle it manages.
  766.  
  767. pause_input() instructs the wheel to temporarily stop checking its
  768. input filehandle for data.  This can keep a session (or a
  769. corresponding output buffer) from being overwhelmed.
  770.  
  771. resume_input() instructs the wheel to resume checking its input
  772. filehandle for data.
  773.  
  774. =item shutdown_input
  775.  
  776. =item shutdown_output
  777.  
  778. Some applications require the remote end to shut down a socket before
  779. they will continue.  These methods map directly to shutdown() for the
  780. wheel's input and output sockets.
  781.  
  782. =back
  783.  
  784. =head1 EVENTS AND PARAMETERS
  785.  
  786. =over 2
  787.  
  788. =item Driver
  789.  
  790. Driver is a POE::Driver subclass that is used to read from and write
  791. to ReadWrite's filehandle(s).  It encapsulates the low-level I/O
  792. operations so in theory ReadWrite never needs to know about them.
  793.  
  794. Driver defaults to C<<POE::Driver::SysRW->new()>>.
  795.  
  796. =item Filter
  797.  
  798. Filter is a POE::Filter subclass that's used to parse incoming data
  799. and create streamable outgoing data.  It encapsulates the lowest level
  800. of a protocol so in theory ReadWrite never needs to know about them.
  801.  
  802. Filter defaults to C<<POE::Filter::Line->new()>>.
  803.  
  804. =item InputEvent
  805.  
  806. InputEvent contains the event that the wheel emits for every complete
  807. record read.  Every InputEvent is accompanied by two parameters.
  808. C<ARG0> contains the record which was read.  C<ARG1> contains the
  809. wheel's unique ID.
  810.  
  811. The wheel will not attempt to read from its Handle or InputHandle if
  812. InputEvent is omitted.
  813.  
  814. A sample InputEvent handler:
  815.  
  816.   sub input_state {
  817.     my ($heap, $input, $wheel_id) = @_[HEAP, ARG0, ARG1];
  818.     print "Echoing input from wheel $wheel_id: $input\n";
  819.     $heap->{wheel}->put($input);     # Echo it back.
  820.   }
  821.  
  822. =item FlushedEvent
  823.  
  824. FlushedEvent contains the event that ReadWrite emits whenever its
  825. output queue becomes empty.  This signals that all pending data has
  826. been written, and it's often used to wait for "goodbye" messages to be
  827. sent before a session shuts down.
  828.  
  829. FlushedEvent comes with a single parameter, C<ARG0>, that indicates
  830. which wheel flushed its buffer.
  831.  
  832. A sample FlushedEvent handler:
  833.  
  834.   sub flushed_state {
  835.     # Stop a wheel after all outgoing data is flushed.
  836.     # This frees the wheel's resources, including the
  837.     # filehandle, and closes the connection.
  838.     delete $_[HEAP]->{wheel}->{$_[ARG0]};
  839.   }
  840.  
  841. =item ErrorEvent
  842.  
  843. ErrorEvent contains the event that ReadWrite emits whenever an error
  844. occurs.  Every ErrorEvent comes with four parameters:
  845.  
  846. C<ARG0> contains the name of the operation that failed.  This usually
  847. is 'read'.  Note: This is not necessarily a function name.  The wheel
  848. doesn't know which function its Driver is using.
  849.  
  850. C<ARG1> and C<ARG2> hold numeric and string values for C<$!>,
  851. respectively.
  852.  
  853. C<ARG3> contains the wheel's unique ID.
  854.  
  855. A sample ErrorEvent handler:
  856.  
  857.   sub error_state {
  858.     my ($operation, $errnum, $errstr, $wheel_id) = @_[ARG0..ARG3];
  859.     warn "Wheel $wheel_id generated $operation error $errnum: $errstr\n";
  860.     delete $heap->{wheels}->{$wheel_id}; # shut down that wheel
  861.   }
  862.  
  863. =item HighEvent
  864.  
  865. =item LowEvent
  866.  
  867. ReadWrite emits a HighEvent when a wheel's pending output queue has
  868. grown to be at least HighMark octets.  A LowEvent is emitted when a
  869. wheel's pending octet count drops below the value of LowMark.
  870.  
  871. HighEvent and LowEvent flip-flop.  Once a HighEvent has been emitted,
  872. it won't be emitted again until a LowEvent is emitted.  Likewise,
  873. LowEvent will not be emitted again until HighEvent is.  ReadWrite
  874. always starts in a low-water state.
  875.  
  876. Sessions which stream output are encouraged to use these events for
  877. flow control.  Sessions can redure their transmission rates or stop
  878. transmitting altogether upon receipt of a HighEvent, and they can
  879. resume full-speed transmission once LowEvent arrives.
  880.  
  881. =back
  882.  
  883. =head1 SEE ALSO
  884.  
  885. POE::Wheel.
  886.  
  887. The SEE ALSO section in L<POE> contains a table of contents covering
  888. the entire POE distribution.
  889.  
  890. =head1 BUGS
  891.  
  892. Oh, probably some.
  893.  
  894. =head1 AUTHORS & COPYRIGHTS
  895.  
  896. Please see L<POE> for more information about authors and contributors.
  897.  
  898. =cut
  899.