> How did you write this 'multiplexer' on the fetch?
OK, I don't have alot of time, so I don't have lots of time to give a
step-by-step rundown. But I think the code is pretty self-explanatory, and
I don't mind answering questions later.
We have a pair of states, get_hits and get_row (you would name them
query_start, query_row). get_hits EXECUTES the query, and get_row FETCHES
the results. EXECUTE will take as long as the DB takes. FETCH is *very*
fast, so in the code below you will see "my $rowcount = 100" which is how
many rows to read from the resultset per pass. Feel free to tweak this
number to whatever you feel comfortable with. I have a different, related
state pair where it's only 2 per pass, but that's because I'm also doing
UPDATEs in that query and they take much longer.
use Time::HiRes qw/gettimeofday tv_interval time/;
sub get_hits {
my ($kernel, $object, $heap) = @_[KERNEL, OBJECT, HEAP];
# already loading... ignore
return if $heap->{loading_due};
DEBUG && print "$object received get_hits\n";
$heap->{loading_due} = 1;
$heap->{due_start} = time;
$heap->{current_query} = $heap->{read_hits_due};
eval {
local $heap->{dbh}{RaiseError} = 1;
# The statement handle is in the heap
$heap->{current_query}->execute;
};
# $@ contains an error string if there were any DB problems.
if ($@) {
carp ("$object DATABASE error: $@ ($heap->{dbh}->errstr)");
return $kernel->call(db => reconnect => 'get_hits');
}
print "$object get_hits execute complete in ",
time - $heap->{due_start} ," seconds\n" if DEBUG_TIME;
$heap->{sofar} = 0;
# queue up reading rows
$kernel->yield('get_row');
# schedule updating status info
#### this is a DIFFERENT query that works the same way ####
$heap->{"Next Status/URL Refresh"} = localtime time + 3600;
$kernel->delay(start_refresh => 3600);
}
sub get_row {
my ($kernel, $object, $heap) = @_[KERNEL, OBJECT, HEAP];
# how many rows are we going to read w/ this pass?
my $rowcount = 100;
my $t1 = [gettimeofday] if DEBUG_TIME;
while ($rowcount--) {
my $t0 = [gettimeofday];
my $acctrec = {};
my $row;
eval {
$row = $heap->{current_query}->fetch;
};
if ($row) {
# {{{ PROCESS A ROW
DEBUG && print "$object get_row: $heap->{sofar} rows so far\n"
unless ++$heap->{sofar} % 100;
#################################
## Fill in data stuctures here ##
#################################
print {$heap->{log}} tv_interval ( $t0 ), "\n" if DEBUG_TIME;
# }}} PROCESS A ROW
} else {
# {{{ NO MORE ROWS
my $time = int(time - $heap->{due_start});
DEBUG && print "$object get_row: $heap->{sofar} rows total\n";
print "$object finished loading get_hits in $time seconds\n"
if DEBUG_TIME;
##################################
## Finalize initialization here ##
##################################
# Clear state markers
delete @{$heap}{ qw/ loading_due deferred sofar due_start query_result
/};
# schedule re-updating for 11AM tomorrow
$time = time;
$time = $time - $time % 86400 + # midnite today, GMT
3600 * # 3600 seconds == 1 hour
( 24 + 8 - (localtime)[8] # add 24hrs to midnite, plus GMT Offset
+ 11 # Make it 11AM, instead of 12AM
);
$kernel->alarm( 'get_hits', $time);
$heap->{"Next System Reload"} = localtime $time;
last; # break out of the while loop;
# }}} NO MORE ROWS
}
}
printf "%.03f seconds for this pass\n", tv_interval ( $t1 ) if
DEBUG_TIME > 1;
# Get MORE rows, next pass
$kernel->yield('get_row') if exists $heap->{loading_due};
}
HTH!
L8r,
Rob
#!/usr/bin/perl -w
use Disclaimer qw/:standard/;