All,
Forgive me ahead of time...my Java is MIGHTY rusty as I don't use it much...
I am trying to create a Storm topology that uses a number of spouts and
bolts written in Perl. I have read the documentation for IO::Storm on
metacpan and have a decent understanding of how to write the individual
spouts and bolts, but there is no detail about how to pull it all
together. I have three questions:
1. In my emit function (from perl), I am outputting a file name to be
further processed as a simple string (e.g. '/somedir/somefile.txt'). What
format should I be emitting? Is this correct? If so, does the bolt
receiving the emitted message (again, written in Perl) need to do any
conversion on the string? or is it just a string that is ready for further
use?
2. Given the source code files in perl and the java code for the topology,
I see references to the config...but have not stumbled on what to put in
the config for my cluster...am I missing some obvious documentation?
3. Finally, here is the code for my initial spout that simply grabs all
files in a directory, then listens for additional files to be added (think
log directory with new files being added periodically). As the follow-on
bolt requests the next file to process (using the next_tuple method), send
the file name to the bolt and then, once the ack is received, remove the
file from the array of files to process. Am I going about this the "right"
way?
#!/usr/bin/env perl
package NetflowFilesToProcessSpout;
use 5.016;
use strict;
use warnings;
use Data::Dumper;
use File::Monitor;
extends 'IO::Storm::Spout';
our $VERSION = '0.01';
#=+ Make it easier for portability
use Exporter;
our $SOURCE_DIR => '/someDir/someSubDir/';
our @EXPORT_OK = ($SOURCE_DIR);
#=+ Some variables that are needed across subs, namely the array of files
and the File::Monitor object
my (@files,$monitor);
sub initialize {
my ( $self, $storm_conf, $context ) = @_;
#=+ Now load all flow files into an array
opendir(my $dh, $SOURCE_DIR);
my @files = grep !/^\./, readdir($dh);
my @sortedFiles = sort {$a cmp $b} @files;
closedir($dh);
#=+ Add full path to each file
for(my $index=0;$index<=$#files;$index++){
$files[$index] = $SOURCE_DIR.$files[$index];
}
#=+ Now set up the directory monitor to push new files onto the
existing file array
my $monitor = File::Monitor->new();
$monitor->watch($SOURCE_DIR);
#=+ We do the first scan now so that any changes going forward will be
recognized
@files = $monitor->scan;
}
sub next_tuple {
my ($self) = @_;
#=+ first, check if there are any new files in our flow directory
my @changes = $monitor->scan();
push @files, @changes if @changes;
if(@files) {
#=+ Now emit exactly one file name (oldest first, for now)
my $nextFileToProc = $files[0];
$self->emit($nextFileToProc);
}
else {
#=+ in order to keep Storm from thinking the process is dead if there
are not currently any more files to process, return a noop (from the
IO::Storm documentation)
return;
}
}
sub ack {
my ($self,$id) = @_;
#=+ For now, this looks like a good way to ensure we don't get rid of a
file to process if there was a failure donw the pipeline.
shift @files;
}
sub process {
my ($self, $tuple) = @_;
#=+ Let's log that have pushed out a file for processing
my $filesize = -s $tuple;
$self->log('NetflowFilesToProcessSpout: sent file to process'.$tuple.'
(size:'.$filesize.')');
}
#=+ Finally, need to actually run
NetflowFilesToProcessSpout->new->run;
1;