I tweaked it a bit. Attached.
On Thu, May 6, 2010 at 06:02, alfonso caponi <[email protected]> wrote:
> Inspired by the pool.pl example, I wrote a simple multi-thread script for
> dns server testing on my LAN. It seems working fine, but I would like your
> opinion on any improvements.
>
> Note: I've commented "#while (! $TERM)" to avoid multiple requests while !
> $TERM :(
>
> Thank you very much at all!
>
> 2010/4/16 Jerry D. Hedden <[email protected]>
>>
>> Get the 'threads' distribution from CPAN and look at the sample
>> scripts in the 'examples' directory.
>>
>> On Fri, Apr 16, 2010 at 11:48, alfonso caponi <[email protected]>
>> wrote:
>> > Hi list,
>> >
>> > I've some doubts with a multi-threads script. My goal is to create a
>> > script
>> > with the same max number of threads running at all times.
>> >
>> > For example: I've an array with 9 elements and 3 threads to process it;
>> > each
>> > thread can process an element using a different time (a thread could end
>> > before another) and I would like 3 threads always running.
>> >
>> > My idea is to take an element from a queue, start threads to reach the
>> > maximum number of active threads allowed and simultaneously start a
>> > separate
>> > (but unique) thread to wait every thread in execution.
>> >
>> > In attachment my "solution". Have you any tips? I would not use other
>> > modules as well threads, threads::shared, thread::queue.
>> >
>> > Thank you very much!
>> > Al
>
>
#!/usr/bin/perl
$|=1;
use strict;
use warnings;
use Net::DNS;
use threads;
use threads::shared;
use Thread::Queue;
use Uniq;
my $ip_list = $ARGV[0];
die "$0 <ip list file>\n" unless $ARGV[0];
########################
### Global Variables ###
# FQDN list
my @fqdn = ("www.nba.com","www.google.com","www.kernel.org","www.linux.org");
# Maximum working threads
my $MAX_THREADS = 10;
# Maximum thread working time
my $TIMEOUT = 10;
# Flag to inform all threads that application is terminating
my $TERM :shared = 0;
# Prevents double detach attempts
my $DETACHING :shared;
#######################
### Signal Handling ###
# Gracefully terminate application on ^C or command line 'kill'
$SIG{'INT'} = $SIG{'TERM'} = sub {
print(">>> Terminating <<<\n");
$TERM = 1;
};
# This signal handler is called inside threads
# that get cancelled by the timer thread
$SIG{'KILL'} = sub {
# Tell user we've been terminated
printf("%3d <- Killed\n", threads->tid());
# Detach and terminate
lock($DETACHING);
threads->detach() if ! threads->is_detached();
threads->exit();
};
#####
my @hosts;
open(IPLIST, "<$ip_list") || die "Error : cannot open the ip address list file:
$!\n";
chomp (@hosts = <IPLIST>);
close (IPLIST);
@hosts = uniq sort @hosts;
### fill the queue
# >> Don't need a shared queue for this
# >> Just use @hosts directly
#my $q = new Thread::Queue;
#foreach my $host (@hosts) {
# foreach my $fqdn (@fqdn) {
# $q->enqueue("$host;$fqdn");
# }
#}
###############################
### Main Processing Section ###
MAIN: {
# Start timer thread
print "[+] Start timer thread\n";
my $queue = Thread::Queue->new();
threads->create('Timer',$queue)->detach();
# Manage the thread pool until we run out of data
# or signalled to terminate
while (@hosts && ! $TERM) {
# Keep max threads running
for (my $needed = $MAX_THREADS - threads->list(); $needed && ! $TERM;
$needed--) {
my $element = shift(@hosts);
last if (! $element);
# New thread
threads->create('RequestDNS',$queue,$element,$TIMEOUT);
}
# Wait for any threads to finish
sleep(1);
}
# Wait for max timeout for threads to finish
while ((threads->list() > 0) && $TIMEOUT--) {
sleep(1);
}
# Detach and kill any remaining threads
foreach my $thr (threads->list()) {
lock($DETACHING);
$thr->detach() if ! $thr->is_detached();
$thr->kill('KILL');
}
}
exit(0);
sub RequestDNS {
my ($queue,$element,$timeout) = @_;
my ($dns_server,$fqdn) = split(/;/,$element);
# My thread ID
my $tid = threads->tid();
printf("$dns_server -> %3d\n", $tid);
# Register with timer thread
$queue->enqueue($tid,$timeout);
# Do some work while monitoring $TERM
#while (! $TERM) {
print "try $dns_server;$fqdn\n";
my $res = Net::DNS::Resolver->new;
$res->nameservers($dns_server);
my $query = $res->query($fqdn,"A");
if ($query) {
foreach my $rr (grep { $_->type eq 'A' } $query->answer) {
my $reply = $rr->address;
print "$dns_server;$fqdn;$reply\n";
}
}# else {
# warn "query failed: $dns_server ($fqdn)", $res->errorstring, "\n";
#}
#}
# Remove signal handler
$SIG{'KILL'} = sub {};
# Unregister with timer thread
$queue->enqueue($tid, undef);
# Tell user we're done
printf("%3d <- Finished\n", $tid);
# Detach and terminate
lock($DETACHING);
threads->detach() if ! threads->is_detached();
threads->exit();
}
# The timer thread that monitors other threads for timeout
sub Timer {
my $queue = shift; # The registration queue
my %timers; # Contains threads and timeouts
# Loop until told to quit
while (! $TERM) {
# Check queue
while (my $tid = $queue->dequeue_nb()) {
if (! ($timers{$tid}{'timeout'} = $queue->dequeue()) || !
($timers{$tid}{'thread'} = threads->object($tid))) {
# No timeout - unregister thread
delete($timers{$tid});
}
}
# Cancel timed out threads
foreach my $tid (keys(%timers)) {
if (--$timers{$tid}{'timeout'} < 0) {
$timers{$tid}{'thread'}->kill('KILL');
delete($timers{$tid});
}
}
# Tick tock
sleep(1);
}
}