Elizabeth Mattijsen wrote:

> At 16:59 +0000 1/29/04, [EMAIL PROTECTED] (PerlDiscuss - 
> Perl Newsgroups an wrote:
> >I'm currently using Thread::Queue::Any for hashtables.  If I enqueue up to
> >100 hashtables consecutively onto a queue and have worker threads
> >dequeueing them as they appear, my program works fine.  However if I try
> >to enqueue 200 hashtables consecutively, I get the following error: Thread
> >failed to start: Corrupted storable string (v2.6) at Storable.pm
> >(autosplit into thaw.al).  And if I try to enqueue 500 hashtables, I get
> >the errror: Thread failed to start: Bad hash at Storable.pm (autosplit
> >into _freeze.al).  If I enqueue anymore than that, I get a seg fault and
> >core dump.  Has anyone seen this problem before?  I looked into the
> >Storable module to see how it works, and found it to be very complex. 
> >
> >My requirements are such that I may have as many as a few thousand
> >hashtables being put on the queue consecutively...I'm wondering if this is
> >possible using these modules?

> This shouldn't happen.  Do you have an example program that 
> demonstrates the problem. And also let us know on what platform and 
> what version of Perl you're using?


> Liz


I'm running on Tru64 platform and using Perl 5.8.2.

With the following code (excuse any typos, I don't have the code in front
of me so I'm writing from memory), I get the error "Not an array reference
at Thread::Any line 33".  Line 33 for me is @{Storable::thaw(
shift->SUPER::dequeue )};

use Thread;
use Thread::Queue::Any;
use threads;
use threads::shared;

test_fnct;

test_fnct {
   $global_shutdown = 0;  # I don't have shutdown of the threads working
yet!
   test->new();

   test->startup();

   for (1 .. 5000) {
         %h = ("$_" => "hash");
         $q_in->enqueue(%h);
   }
}


sub new {}


sub startup {

    $q1_in = Thread::Queue::Any->new;
    $thread = threads->new(\&sub1);
}

sub sub1 {
    while (!$global_shutdown) {
        if ($q1_in->pending() > 0)  { # something on q_in
             my %data = %{($q1_in->dequeue)[0]};
             while (($key, $value) = each %data) {
                   print "key: ${key}, value: ${value}\n";
             }
       }
   }
}



The errors I was getting before (i.e. error: Thread failed to start:
Corrupted storable string (v2.6) at Storable.pm (autosplit into thaw.al),
etc) happen when I pass data between multiple queues.  I didn't realize
this until after I posted my errors.  Here's the code where I get those
errors.

use Thread;
use Thread::Queue::Any;
use threads;
use threads::shared;

test_fnct;

test_fnct {
   $global_shutdown = 0;  # I don't have shutdown of the threads working
yet!
   test->new();

   test->startup();

   for (1 .. 5000) {
         %h = ("$_" => "hash");
         $q1_in->enqueue(%h);
   }

   while (!$global_shutdown) {
         if ($q1_out->pending() > 0) {
             my %data = %{($q1_out->dequeue)[0]};
             $q2_in->enqueue(%data);
         }
         if ($q2_out->pending() > 0) [
             my %data = %{($q2_out->dequeue)[0]};
             $q3_in->enqueue(%data);
         }
        if ($q3_out->pending() > 0) [
             my %data = %{($q3_out->dequeue)[0]};
             while (($key, $value) = each %data) {
                  print "key: ${key}, value: ${value}\n";
        }
    }
}


sub new {}


sub startup {

    $q1_in = Thread::Queue::Any->new;
    $q1_out = Thread::Queue::Any->new;

    $q2_in = Thread::Queue::Any->new;
    $q2_out = Thread::Queue::Any->new;
  
    $q3_in = Thread::Queue::Any->new;
    $q3_out = Thread::Queue::Any->new;


    $thread1 = threads->new(\&sub1);
    
    for (1 .. 5) {
       push @threads2 (threads->new(\&sub2));    
    }

    for (1 .. 5) {
       push @threads3 (threads->new(\&sub3));
    }
    
}

sub sub1 {
   while (!$global_shutdown) {
       if ($q_in->pending() > 0)  { # something on q_in
             my %data = %{($q1_in->dequeue)[0]};
             # do some processing
             $q1_out->enqueue(%data);              
       }
   }
}


sub sub2 {
   while (!$global_shutdown) {
       if ($q2_in->pending() > 0)  { # something on q_in
             my %data = %{($q2_in->dequeue)[0]};
             # do some processing
             $q2_out->enqueue(%data);              
       }
   }
}


sub sub3 {
   while (!$global_shutdown) {
       if ($q3_in->pending() > 0)  { # something on q_in
             my %data = %{($q3_in->dequeue)[0]};
             # do some processing
             $q3_out->enqueue(%data);              
       }
   }
}


Any help would be greatly appreciated!

Erin

Reply via email to