> How did you write this 'multiplexer' on the fetch?

OK, I don't have alot of time, so I don't have lots of time to give a
step-by-step rundown.  But I think the code is pretty self-explanatory, and
I don't mind answering questions later.

We have a pair of states, get_hits and get_row (you would name them
query_start, query_row).  get_hits EXECUTES the query, and get_row FETCHES
the results.  EXECUTE will take as long as the DB takes.  FETCH is *very*
fast, so in the code below you will see "my $rowcount = 100" which is how
many rows to read from the resultset per pass.  Feel free to tweak this
number to whatever you feel comfortable with.  I have a different, related
state pair where it's only 2 per pass, but that's because I'm also doing
UPDATEs in that query and they take much longer.

use Time::HiRes qw/gettimeofday tv_interval time/;

sub get_hits {
    my ($kernel, $object, $heap) = @_[KERNEL, OBJECT, HEAP];

    # already loading... ignore
    return if $heap->{loading_due};

    DEBUG && print "$object received get_hits\n";

    $heap->{loading_due} = 1;
    $heap->{due_start} = time;
    $heap->{current_query} = $heap->{read_hits_due};

    eval {
        local $heap->{dbh}{RaiseError} = 1;

        # The statement handle is in the heap
        $heap->{current_query}->execute;

    };

    # $@ contains an error string if there were any DB problems.
    if ($@) {
        carp ("$object DATABASE error: $@ ($heap->{dbh}->errstr)");
        return $kernel->call(db => reconnect => 'get_hits');
    }

    print "$object get_hits execute complete in ",
      time - $heap->{due_start}  ," seconds\n" if DEBUG_TIME;

    $heap->{sofar} = 0;

    # queue up reading rows
    $kernel->yield('get_row');

    # schedule updating status info
    #### this is a DIFFERENT query that works the same way ####
    $heap->{"Next Status/URL Refresh"} = localtime time + 3600;
    $kernel->delay(start_refresh => 3600);

}

sub get_row {
    my ($kernel, $object, $heap) = @_[KERNEL, OBJECT, HEAP];

    # how many rows are we going to read w/ this pass?
    my $rowcount = 100;
    my $t1 = [gettimeofday] if DEBUG_TIME;

    while ($rowcount--) {

        my $t0 = [gettimeofday];

        my $acctrec = {};
        my $row;

            eval {
                $row = $heap->{current_query}->fetch;
            };

        if ($row) {

# {{{ PROCESS A ROW

            DEBUG && print "$object get_row: $heap->{sofar} rows so far\n"
              unless ++$heap->{sofar} % 100;

                #################################
                ## Fill in data stuctures here ##
              #################################

            print {$heap->{log}} tv_interval ( $t0 ), "\n" if DEBUG_TIME;

# }}} PROCESS A ROW

        } else {

# {{{ NO MORE ROWS

            my $time = int(time - $heap->{due_start});
            DEBUG && print "$object get_row: $heap->{sofar} rows total\n";
            print "$object finished loading get_hits in $time seconds\n"
              if DEBUG_TIME;

                ##################################
                ## Finalize initialization here ##
              ##################################

            # Clear state markers
            delete @{$heap}{ qw/ loading_due deferred sofar due_start query_result
/};

            # schedule re-updating for 11AM tomorrow
            $time = time;
            $time = $time - $time % 86400 + # midnite today, GMT
              3600 * # 3600 seconds == 1 hour
                ( 24 + 8 - (localtime)[8] # add 24hrs to midnite, plus GMT Offset
                  + 11 # Make it 11AM, instead of 12AM
                );

            $kernel->alarm( 'get_hits', $time);
            $heap->{"Next System Reload"} = localtime $time;

            last;               # break out of the while loop;

# }}} NO MORE ROWS

        }

    }
    printf "%.03f seconds for this pass\n", tv_interval ( $t1 ) if
DEBUG_TIME > 1;

    # Get MORE rows, next pass
    $kernel->yield('get_row') if exists $heap->{loading_due};

}

HTH!

L8r,
Rob

#!/usr/bin/perl -w
use Disclaimer qw/:standard/;

Reply via email to