home *** CD-ROM | disk | FTP | other *** search
- # -*- perl -*-
- #
- # Copyright (C) 2004-2005 Daniel P. Berrange
- #
- # This program is free software; you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation; either version 2 of the License, or
- # (at your option) any later version.
- #
- # This program is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with this program; if not, write to the Free Software
- # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- #
- # $Id: Reactor.pm,v 1.9 2006/01/27 15:34:24 dan Exp $
-
- =pod
-
- =head1 NAME
-
- Net::DBus::Reactor - application event loop
-
- =head1 SYNOPSIS
-
- Create and run an event loop:
-
- use Net::DBus::Reactor;
- my $reactor = Net::DBus::Reactor->new();
-
- $reactor->run();
-
- Manage some file handlers
-
- $reactor->add_read($fd,
- Net::DBus::Callback->new(method => sub {
- my $fd = shift;
- ...read some data...
- }, args => [$fd]);
-
- $reactor->add_write($fd,
- Net::DBus::Callback->new(method => sub {
- my $fd = shift;
- ...write some data...
- }, args => [$fd]);
-
- Temporarily (dis|en)able a handle
-
- # Disable
- $reactor->toggle_read($fd, 0);
- # Enable
- $reactor->toggle_read($fd, 1);
-
- Permanently remove a handle
-
- $reactor->remove_read($fd);
-
- Manage a regular timeout every 100 milliseconds
-
- my $timer = $reactor->add_timeout(100,
- Net::DBus::Callback->new(
- method => sub {
- ...process the alarm...
- }));
-
- Temporarily (dis|en)able a timer
-
- # Disable
- $reactor->toggle_timeout($timer, 0);
- # Enable
- $reactor->toggle_timeout($timer, 1);
-
- Permanently remove a timer
-
- $reactor->remove_timeout($timer);
-
- Add a post-dispatch hook
-
- my $hook = $reactor->add_hook(Net::DBus::Callback->new(
- method => sub {
- ... do some work...
- }));
-
- Remove a hook
-
- $reactor->remove_hook($hook);
-
- =head1 DESCRIPTION
-
- This class provides a general purpose event loop for
- the purposes of multiplexing I/O events and timeouts
- in a single process. The underlying implementation is
- done using the select system call. File handles can
- be registered for monitoring on read, write and exception
- (out-of-band data) events. Timers can be registered
- to expire with a periodic frequency. These are implemented
- using the timeout parameter of the select system call.
- Since this parameter merely represents an upper bound
- on the amount of time the select system call is allowed
- to sleep, the actual period of the timers may vary. Under
- normal load this variance is typically 10 milliseconds.
- Finally, hooks may be registered which will be invoked on
- each iteration of the event loop (ie after processing
- the file events, or timeouts indicated by the select
- system call returning).
-
- =head1 METHODS
-
- =over 4
-
- =cut
-
- package Net::DBus::Reactor;
-
- use 5.006;
- use strict;
- use warnings;
- use Carp;
- use Net::DBus::Binding::Watch;
- use Net::DBus::Callback;
- use Time::HiRes qw(gettimeofday);
-
- =item my $reactor = Net::DBus::Reactor->new();
-
- Creates a new event loop ready for monitoring file handles, or
- generating timeouts. Except in very unsual circumstances (examples
- of which I can't think up) it is not neccessary or desriable to
- explicitly create new reactor instances. Instead call the L<main>
- method to get a handle to the singleton instance.
-
- =cut
-
- sub new {
- my $proto = shift;
- my $class = ref($proto) || $proto;
- my %params = @_;
- my $self = {};
-
- $self->{fds} = {
- read => {},
- write => {},
- exception => {}
- };
- $self->{timeouts} = [];
- $self->{hooks} = [];
-
- bless $self, $class;
-
- return $self;
- }
-
- use vars qw($main_reactor);
-
- =item $reactor->main
-
- Return a handle to the singleton instance of the reactor. This
- is the recommended way of getting hold of a reactor, since it
- removes the need for modules to pass around handles to their
- privately created reactors.
-
- =cut
-
- sub main {
- my $class = shift;
- $main_reactor = $class->new() unless defined $main_reactor;
- return $main_reactor;
- }
-
-
- =item $reactor->manage($connection);
-
- =item $reactor->manage($server);
-
- Registers a C<Net::DBus::Connection> or C<Net::DBus::Server> object
- for management by the event loop. This basically involves
- hooking up the watch & timeout callbacks to the event loop.
- For connections it will also register a hook to invoke the
- C<dispatch> method periodically.
-
- =cut
-
- sub manage {
- my $self = shift;
- my $object = shift;
-
- if ($object->can("set_watch_callbacks")) {
- $object->set_watch_callbacks(sub {
- my $object = shift;
- my $watch = shift;
-
- $self->_manage_watch_on($object, $watch);
- }, sub {
- my $object = shift;
- my $watch = shift;
-
- $self->_manage_watch_off($object, $watch);
- }, sub {
- my $object = shift;
- my $watch = shift;
-
- $self->_manage_watch_toggle($object, $watch);
- });
- }
-
- if ($object->can("set_timeout_callbacks")) {
- $object->set_timeout_callbacks(sub {
- my $object = shift;
- my $timeout = shift;
-
- my $key = $self->add_timeout($timeout->get_interval,
- Net::DBus::Callback->new(object => $timeout,
- method => "handle",
- args => []),
- $timeout->is_enabled);
- $timeout->set_data($key);
- }, sub {
- my $object = shift;
- my $timeout = shift;
-
- my $key = $timeout->get_data;
- $self->remove_timeout($key);
- }, sub {
- my $object = shift;
- my $timeout = shift;
-
- my $key = $timeout->get_data;
- $self->remove_timeout($key,
- $timeout->is_enabled,
- $timeout->get_interval);
- });
- }
-
- if ($object->can("dispatch")) {
- $self->add_hook(Net::DBus::Callback->new(object => $object,
- method => "dispatch",
- args => []),
- 1);
- }
- if ($object->can("flush")) {
- $self->add_hook(Net::DBus::Callback->new(object => $object,
- method => "flush",
- args => []),
- 1);
- }
- }
-
-
- sub _manage_watch_on {
- my $self = shift;
- my $object = shift;
- my $watch = shift;
- my $flags = $watch->get_flags;
-
- if ($flags & &Net::DBus::Binding::Watch::READABLE) {
- $self->add_read($watch->get_fileno,
- Net::DBus::Callback->new(object => $watch,
- method => "handle",
- args => [&Net::DBus::Binding::Watch::READABLE]),
- $watch->is_enabled);
- }
- if ($flags & &Net::DBus::Binding::Watch::WRITABLE) {
- $self->add_write($watch->get_fileno,
- Net::DBus::Callback->new(object => $watch,
- method => "handle",
- args => [&Net::DBus::Binding::Watch::WRITABLE]),
- $watch->is_enabled);
- }
- # $self->add_exception($watch->get_fileno, $watch,
- # Net::DBus::Callback->new(object => $watch,
- # method => "handle",
- # args => [&Net::DBus::Binding::Watch::ERROR]),
- # $watch->is_enabled);
-
- }
-
- sub _manage_watch_off {
- my $self = shift;
- my $object = shift;
- my $watch = shift;
- my $flags = $watch->get_flags;
-
- if ($flags & &Net::DBus::Binding::Watch::READABLE) {
- $self->remove_read($watch->get_fileno);
- }
- if ($flags & &Net::DBus::Binding::Watch::WRITABLE) {
- $self->remove_write($watch->get_fileno);
- }
- # $self->remove_exception($watch->get_fileno);
- }
-
- sub _manage_watch_toggle {
- my $self = shift;
- my $object = shift;
- my $watch = shift;
- my $flags = $watch->get_flags;
-
- if ($flags & &Net::DBus::Binding::Watch::READABLE) {
- $self->toggle_read($watch->get_fileno, $watch->is_enabled);
- }
- if ($flags & &Net::DBus::Binding::Watch::WRITABLE) {
- $self->toggle_write($watch->get_fileno, $watch->is_enabled);
- }
- $self->toggle_exception($watch->get_fileno, $watch->is_enabled);
- }
-
-
- =item $reactor->run();
-
- Starts the event loop monitoring any registered
- file handles and timeouts. At least one file
- handle, or timer must have been registered prior
- to running the reactor, otherwise it will immediately
- exit. The reactor will run until all registered
- file handles, or timeouts have been removed, or
- disabled. The reactor can be explicitly stopped by
- calling the C<shutdown> method.
-
- =cut
-
- sub run {
- my $self = shift;
-
- $self->{running} = 1;
- while ($self->{running}) { $self->step };
- }
-
- =item $reactor->shutdown();
-
- Explicitly shutdown the reactor after pending
- events have been processed.
-
- =cut
-
- sub shutdown {
- my $self = shift;
- $self->{running} = 0;
- }
-
- =item $reactor->step();
-
- Perform one iteration of the event loop, going to
- sleep until an event occurs on a registered file
- handle, or a timeout occurrs. This method is generally
- not required in day-to-day use.
-
- =cut
-
- sub step {
- my $self = shift;
-
- my @callbacks = $self->_dispatch_hook();
-
- foreach my $callback (@callbacks) {
- $callback->invoke;
- }
-
- my ($ri, $ric) = $self->_bits("read");
- my ($wi, $wic) = $self->_bits("write");
- my ($ei, $eic) = $self->_bits("exception");
- my $timeout = $self->_timeout($self->_now);
-
- if (!$ric && !$wic && !$eic && !(defined $timeout)) {
- $self->{running} = 0;
- return;
- }
-
- my ($ro, $wo, $eo);
- my $n = select($ro=$ri,$wo=$wi,$eo=$ei, (defined $timeout ? ($timeout ? $timeout/1000 : 0) : undef));
-
- @callbacks = ();
- if ($n) {
- push @callbacks, $self->_dispatch_fd("read", $ro);
- push @callbacks, $self->_dispatch_fd("write", $wo);
- push @callbacks, $self->_dispatch_fd("error", $eo);
- }
- push @callbacks, $self->_dispatch_timeout($self->_now);
- #push @callbacks, $self->_dispatch_hook();
-
- foreach my $callback (@callbacks) {
- $callback->invoke;
- }
-
- return 1;
- }
-
- sub _now {
- my $self = shift;
-
- my @now = gettimeofday;
-
- return $now[0] * 1000 + (($now[1] - ($now[1] % 1000)) / 1000);
- }
-
- sub _bits {
- my $self = shift;
- my $type = shift;
- my $vec = '';
-
- my $count = 0;
- foreach (keys %{$self->{fds}->{$type}}) {
- next unless $self->{fds}->{$type}->{$_}->{enabled};
-
- $count++;
- vec($vec, $_, 1) = 1;
- }
- return ($vec, $count);
- }
-
- sub _timeout {
- my $self = shift;
- my $now = shift;
-
- my $timeout;
- foreach (@{$self->{timeouts}}) {
- next unless $_->{enabled};
-
- my $expired = $now - $_->{last_fired};
- my $interval = ($expired > $_->{interval} ? 0 : $_->{interval} - $expired);
- $timeout = $interval if !(defined $timeout) ||
- ($interval < $timeout);
- }
- return $timeout;
- }
-
-
- sub _dispatch_fd {
- my $self = shift;
- my $type = shift;
- my $vec = shift;
-
- my @callbacks;
- foreach my $fd (keys %{$self->{fds}->{$type}}) {
- next unless $self->{fds}->{$type}->{$fd}->{enabled};
-
- if (vec($vec, $fd, 1)) {
- my $rec = $self->{fds}->{$type}->{$fd};
-
- push @callbacks, $self->{fds}->{$type}->{$fd}->{callback};
- }
- }
- return @callbacks;
- }
-
-
- sub _dispatch_timeout {
- my $self = shift;
- my $now = shift;
-
- my @callbacks;
- foreach my $timeout (@{$self->{timeouts}}) {
- next unless $timeout->{enabled};
- my $expired = $now - $timeout->{last_fired};
-
- # Select typically returns a little (0-10 ms) before we
- # asked it for. (8 milliseconds seems reasonable balance
- # between early timeouts & extra select calls
- if ($expired >= ($timeout->{interval}-8)) {
- $timeout->{last_fired} = $now;
- push @callbacks, $timeout->{callback};
- }
- }
- return @callbacks;
- }
-
-
- sub _dispatch_hook {
- my $self = shift;
- my $now = shift;
-
- my @callbacks;
- foreach my $hook (@{$self->{hooks}}) {
- next unless $hook->{enabled};
- push @callbacks, $hook->{callback};
- }
- return @callbacks;
- }
-
-
- =item $reactor->add_read($fd, $callback[, $status]);
-
- Registers a file handle for monitoring of read
- events. The C<$callback> parameter specifies an
- instance of the C<Net::DBus::Callback> object to invoke
- each time an event occurs. The optional C<$status>
- parameter is a boolean value to specify whether the
- watch is initially enabled.
-
- =cut
-
- sub add_read {
- my $self = shift;
- $self->_add("read", @_);
- }
-
- =item $reactor->add_write($fd, $callback[, $status]);
-
- Registers a file handle for monitoring of write
- events. The C<$callback> parameter specifies an
- instance of the C<Net::DBus::Callback> object to invoke
- each time an event occurs. The optional C<$status>
- parameter is a boolean value to specify whether the
- watch is initially enabled.
-
- =cut
-
- sub add_write {
- my $self = shift;
- $self->_add("write", @_);
- }
-
-
- =item $reactor->add_exception($fd, $callback[, $status]);
-
- Registers a file handle for monitoring of exception
- events. The C<$callback> parameter specifies an
- instance of the C<Net::DBus::Callback> object to invoke
- each time an event occurs. The optional C<$status>
- parameter is a boolean value to specify whether the
- watch is initially enabled.
-
- =cut
-
- sub add_exception {
- my $self = shift;
- $self->_add("exception", @_);
- }
-
-
- =item my $id = $reactor->add_timeout($interval, $callback, $status);
-
- Registers a new timeout to expire every C<$interval>
- milliseconds. The C<$callback> parameter specifies an
- instance of the C<Net::DBus::Callback> object to invoke
- each time the timeout expires. The optional C<$status>
- parameter is a boolean value to specify whether the
- timeout is initially enabled. The return parameter is
- a unique identifier which can be used to later remove
- or disable the timeout.
-
- =cut
-
- sub add_timeout {
- my $self = shift;
- my $interval = shift;
- my $callback = shift;
- my $enabled = shift;
- $enabled = 1 unless defined $enabled;
-
- my $key;
- for (my $i = 0 ; $i <= $#{$self->{timeouts}} && !(defined $key); $i++) {
- $key = $i unless defined $self->{timeouts}->[$i];
- }
- $key = $#{$self->{timeouts}}+1 unless defined $key;
-
- $self->{timeouts}->[$key] = {
- interval => $interval,
- last_fired => $self->_now,
- callback => $callback,
- enabled => $enabled
- };
-
- return $key;
- }
-
-
- =item $reactor->remove_timeout($id);
-
- Removes a previously registered timeout specified by
- the C<$id> parameter.
-
- =cut
-
- sub remove_timeout {
- my $self = shift;
- my $key = shift;
-
- die "no timeout active with key '$key'"
- unless defined $self->{timeouts}->[$key];
-
- $self->{timeouts}->[$key] = undef;
- }
-
-
- =item $reactor->toggle_timeout($id, $status[, $interval]);
-
- Updates the state of a previously registered timeout
- specifed by the C<$id> parameter. The C<$status>
- parameter specifies whether the timeout is to be enabled
- or disabled, while the optional C<$interval> parameter
- can be used to change the period of the timeout.
-
- =cut
-
- sub toggle_timeout {
- my $self = shift;
- my $key = shift;
- my $enabled = shift;
-
- $self->{timeouts}->[$key]->{enabled} = $enabled;
- $self->{timeouts}->[$key]->{interval} = shift if @_;
- }
-
-
- =item my $id = $reactor->add_hook($callback[, $status]);
-
- Registers a new hook to be fired on each iteration
- of the event loop. The C<$callback> parameter
- specifies an instance of the C<Net::DBus::Callback>
- class to invoke. The C<$status> parameter determines
- whether the hook is initially enabled, or disabled.
- The return parameter is a unique id which should
- be used to later remove, or disable the hook.
-
- =cut
-
- sub add_hook {
- my $self = shift;
- my $callback = shift;
- my $enabled = shift;
- $enabled = 1 unless defined $enabled;
-
- my $key;
- for (my $i = 0 ; $i <= $#{$self->{hooks}} && !(defined $key); $i++) {
- $key = $i unless defined $self->{hooks}->[$i];
- }
- $key = $#{$self->{hooks}}+1 unless defined $key;
-
- $self->{hooks}->[$key] = {
- callback => $callback,
- enabled => $enabled
- };
-
- return $key;
- }
-
-
- =item $reactor->remove_hook($id)
-
- Removes the previously registered hook identified
- by C<$id>.
-
- =cut
-
- sub remove_hook {
- my $self = shift;
- my $key = shift;
-
- die "no hook present with key '$key'"
- unless defined $self->{hooks}->[$key];
-
-
- $self->{hooks}->[$key] = undef;
- }
-
- =item $reactor->toggle_hook($id[, $status])
-
- Updates the status of the previously registered
- hook identified by C<$id>. The C<$status> parameter
- determines whether the hook is to be enabled or
- disabled.
-
- =cut
-
- sub toggle_hook {
- my $self = shift;
- my $key = shift;
- my $enabled = shift;
-
- $self->{hooks}->[$key]->{enabled} = $enabled;
- }
-
- sub _add {
- my $self = shift;
- my $type = shift;
- my $fd = shift;
- my $callback = shift;
- my $enabled = shift;
- $enabled = 1 unless defined $enabled;
-
- $self->{fds}->{$type}->{$fd} = {
- callback => $callback,
- enabled => $enabled
- };
- }
-
- =item $reactor->remove_read($fd);
-
- =item $reactor->remove_write($fd);
-
- =item $reactor->remove_exception($fd);
-
- Removes a watch on the file handle C<$fd>.
-
- =cut
-
- sub remove_read {
- my $self = shift;
- $self->_remove("read", @_);
- }
-
- sub remove_write {
- my $self = shift;
- $self->_remove("write", @_);
- }
-
- sub remove_exception {
- my $self = shift;
- $self->_remove("exception", @_);
- }
-
- sub _remove {
- my $self = shift;
- my $type = shift;
- my $fd = shift;
-
- die "no handle ($type) active with fd '$fd'"
- unless exists $self->{fds}->{$type}->{$fd};
-
- delete $self->{fds}->{$type}->{$fd};
- }
-
- =item $reactor->toggle_read($fd, $status);
-
- =item $reactor->toggle_write($fd, $status);
-
- =item $reactor->toggle_exception($fd, $status);
-
- Updates the status of a watch on the file handle C<$fd>.
- The C<$status> parameter species whether the watch is
- to be enabled or disabled.
-
- =cut
-
- sub toggle_read {
- my $self = shift;
- $self->_toggle("read", @_);
- }
-
- sub toggle_write {
- my $self = shift;
- $self->_toggle("write", @_);
- }
-
- sub toggle_exception {
- my $self = shift;
- $self->_toggle("exception", @_);
- }
-
- sub _toggle {
- my $self = shift;
- my $type = shift;
- my $fd = shift;
- my $enabled = shift;
-
- $self->{fds}->{$type}->{$fd}->{enabled} = $enabled;
- }
-
-
- 1;
-
- =pod
-
- =back
-
- =head1 SEE ALSO
-
- L<Net::DBus::Callback>, L<Net::DBus::Connection>, L<Net::DBus::Server>
-
- =head1 AUTHOR
-
- Daniel Berrange E<lt>dan@berrange.comE<gt>
-
- =head1 COPYRIGHT
-
- Copyright 2004 by Daniel Berrange
-
- =cut
-