package Smokeping::probes::PingContinuous;

=head1 301 Moved Permanently

This is a Smokeping probe module. Please use the command 

C<smokeping -man Smokeping::probes::PingContinuous>

to view the documentation or the command

C<smokeping -makepod Smokeping::probes::PingContinuous>

to generate the POD document.

=cut

use strict;
use base qw(Smokeping::probes::base);
use IO::Epoll;
use IO::Pipe;
use Symbol;
use Carp;

sub pod_hash {
      return {
              name => <<DOC,
Smokeping::probes::PingContinuous - Continuous Ping Probe for SmokePing
DOC
              description => <<DOC,
Integrates PingContinuous as a probe into smokeping. The variable B<binary> must 
point to your copy of the ping program.
  
The (optional) B<packetsize> option lets you configure the packetsize for the pings sent.

DOC
		authors => <<'DOC',
Steven Wilton <swilton@fluentit.com.au>
(Based on FPing module by Tobias Oetiker <tobi@oetiker.ch>)
DOC
	}
}

my $pinger_request=undef;
my $pinger_reply=undef;

sub interval {
  my $self=shift;
  return (($self->step/$self->pings) * 0.95);
}

sub new($$$)
{
    my $proto = shift;
    my $class = ref($proto) || $proto;
    my $self = $class->SUPER::new(@_);

    # no need for this if we run as a cgi
    unless ( $ENV{SERVER_SOFTWARE} ) {
    	my $binary = join(" ", $self->binary);
	$self->{pingfactor} = 1000;
	if($self->interval() < 0.2) {
	  croak "ERROR: We can only do one ping every 210ms.  Either reduce the number of pings or increase the step to fix the issue\n";
	}
    };

    return $self;
}

sub run_pinger {
  my $self=shift;
  my $input=shift;
  my $output=shift;

  my $epfd = epoll_create(10);
  epoll_ctl($epfd, EPOLL_CTL_ADD, $input->fileno, EPOLLIN);

  my %results=();
  my %pingers=();
  my %pingermap=();
  while(1) {
    my $events = epoll_wait($epfd, 10, 1000);
    foreach my $event(@$events) {
      my $fileno=@{$event}[0];
      my $flags=@{$event}[1];

      if($fileno == $input->fileno) {
	if($flags != EPOLLIN) {
	  $self->do_log("Received event $flags on input pipe - exiting");
	  exit(0);
	}
	my $input_cmd=<$input>;
	#$self->do_log($input_cmd);

	if($input_cmd =~ /^FETCH (.+)$/) {
	  my $address=$1;
	  chomp($address);
	  if(!exists($pingermap{$address})) {
	    my $pipe = IO::Pipe->new();
	    my $interval=$self->interval();
	    my @cmd=($self->binary, "-O", "-i", $interval);
	    push @cmd,"-I",$self->{properties}{sourceaddress} if($self->{properties}{sourceaddress});
	    push @cmd,"-Q",$self->{properties}{tos} if($self->{properties}{tos});
	    push @cmd,"-s",$self->{properties}{packetsize} if($self->{properties}{packetsize});
	    $pipe->reader(@cmd, $address);
	    $pipe->blocking(0);
	    epoll_ctl($epfd, EPOLL_CTL_ADD, $pipe->fileno, EPOLLIN);

	    $pingers{$pipe->fileno}{iohandle}=$pipe;
	    $pingers{$pipe->fileno}{results}=[];
	    $pingers{$pipe->fileno}{max_seq}=0;
	    $pingers{$pipe->fileno}{reply_seq}=0;
	    $pingers{$pipe->fileno}{address}=$address;

	    $pingermap{$address}=$pipe->fileno;

	    print $output "NEW\n";
	  } else {
	    my $fileno=$pingermap{$address};
	    my @ret;
	    if(scalar(@{$pingers{$fileno}{results}}) < $self->pings) {
	      my $fakeloss=$self->pings-scalar(@{$pingers{$fileno}{results}});
	      $self->do_log("Adding $fakeloss lost pings to $address due to insufficient data");
	      @ret=@{$pingers{$fileno}{results}};
	      while($fakeloss-- > 0) {
		push @ret,"-";
	      }

	      # Reset the results array
	      $pingers{$fileno}{results}=[];
	    } else {
	      @ret=splice(@{$pingers{$fileno}{results}}, 0, $self->pings);
	      my $extra=scalar(@{$pingers{$fileno}{results}}) - ($self->pings * 0.1);
	      if($extra > 0) {
		$self->do_debug("Removing $extra of ". scalar(@{$pingers{$fileno}{results}}) ." ping results from array for $address");
		splice(@{$pingers{$fileno}{results}}, 0, $extra);
	      } else {
		$self->do_debug(scalar(@{$pingers{$fileno}{results}}) ." ping results remaining for $address ($extra)");
	      }
	    }
	    $self->do_debug("Data for $address: ". join(" ", @ret));
	    print $output join(" ", @ret) ."\n";
	  }
	}
      } else {
	my $io=$pingers{$fileno}{iohandle};
	if($flags != EPOLLIN) {
	  $io->close();
	  my $address=$pingers{$fileno}{address};
	  delete($pingermap{$address});
	  delete($pingers{$fileno});
	} else {
	  while(my $data=<$io>) {
	    if($data =~ /icmp_seq=(\d+)/ ) {
	      my $this_seq=$1;

	      # Always keep track of the biggest sequence number we have logged
	      $pingers{$fileno}{max_seq}=$this_seq;

	      # If this is a message from a reply packet
	      if($data =~ /bytes from.+time=([0-9\.]+) ms/) {
		my $pingtime=$1;

		# See if we missed any sequence numbers since the last reply
		my $drops=($pingers{$fileno}{reply_seq} && $this_seq > $pingers{$fileno}{reply_seq})?($this_seq - $pingers{$fileno}{reply_seq} - 1):0;

		# Add records for dropped packets
		if($drops) {
		  $self->do_debug("Detected $drops packets dropped in sequence numbers");
		  while($drops-- > 0) {
		    push @{$pingers{$fileno}{results}}, "-";
		  }
		}

		# Record this packet
		#$self->do_log("Recorded ping time of $pingtime for $pingers{$fileno}{address}");
		push @{$pingers{$fileno}{results}}, $pingtime;

		# Update the sequence number
		$pingers{$fileno}{reply_seq}=$this_seq;
	      }
	    }
	    #$self->do_log($data);
	  }
	}
      }
    }
  }
}

sub ProbeDesc($){
    my $self = shift;
    my $bytes = $self->{properties}{packetsize}||56;
    return "ICMP Echo Pings ($bytes Bytes)";
}

# derived class (ie. RemotePingContinuous) can override this
sub binary {
	my $self = shift;
	return $self->{properties}{binary};
}

# derived class (ie. PingContinuous6) can override this
sub testhost {
	return "localhost";
}

sub ping ($){
    my $self = shift;
    # do NOT call superclass ... the ping method MUST be overwriten

    # Fork off our worker if needed
    if(!$pinger_request) {
      $pinger_request=IO::Pipe->new();
      $pinger_reply=IO::Pipe->new();
      my $pid;
      if($pid = fork()) { # Parent
        $pinger_request->writer();
        $pinger_request->autoflush(1);
    
        $pinger_reply->reader();
      } elsif(defined($pid)) {
        $pinger_request->reader();
    
        $pinger_reply->writer();
        $pinger_reply->autoflush(1);
    
        $self->run_pinger($pinger_request, $pinger_reply);
        exit(0);
      }
    }

    foreach my $address(@{$self->addresses}) {
      print $pinger_request "FETCH $address\n";
      my $reply=<$pinger_reply>;
      chomp($reply);

      if($reply =~ /^NEW/) {
	# Send back an empty array to record an unknown result
	map { $self->{rtts}{$_} = undef } @{$self->{addrlookup}{$address}};
      } else {
	# Send back the results
        my @times = split /\s+/, $reply;
        @times = map {sprintf "%.10e", $_ / $self->{pingfactor}} sort {$a <=> $b} grep /^\d/, @times;
	map { $self->{rtts}{$_} = [@times] } @{$self->{addrlookup}{$address}};
      }
    }
}

sub rrdupdate_string($$) {
    my $self = shift;
    my $tree = shift;

    my $pings = $self->pings;
    if(exists($self->{rtts}{$tree}) && !defined($self->{rtts}{$tree})) {
	$self->do_debug("No data exists - returning undef");
	my $age='U';
	my $loss='U';
	my $median='U';
	my @times=map {"U"} 1..($pings);

	# Return all values as "U"
	return "${age}:${loss}:${median}:".(join ":", @times);
    } else {
	&Smokeping::probes::base::rrdupdate_string($self, $tree);
    }
}

sub probevars {
	my $class = shift;
	return $class->_makevars($class->SUPER::probevars, {
		_mandatory => [ 'binary' ],
		binary => {
			_sub => sub {
				my ($val) = @_;
        			return undef if $ENV{SERVER_SOFTWARE}; # don't check for fping presence in cgi mode
				return "ERROR: PingContinuous 'binary' does not point to an executable"
            				unless -f $val and -x _;
				return undef;
			},
			_doc => "The location of your fping binary.",
			_example => '/usr/bin/fping',
		},
		packetsize => {
			_re => '\d+',
			_example => 5000,
			_sub => sub {
				my ($val) = @_;
        			return "ERROR: PingContinuous packetsize must be between 12 and 64000"
              				if ( $val < 12 or $val > 64000 ); 
				return undef;
			},
			_doc => "The ping packet size (in the range of 12-64000 bytes).",

		},
		sourceaddress => {
			_re => '\d+(\.\d+){3}',
			_example => '192.168.0.1',
			_doc => <<DOC,
Set source address.
DOC
		},
		tos => {
			_re => '\d+|0x[0-9a-zA-Z]+',
			_example => '0x20',
			_doc => <<DOC,
Set the type of service (TOS) of outgoing ICMP packets.
DOC
		},
	});
}

1;
