Hi

We want to use Perl 6 as Kafka client. When I want to save the offset in the queue after I processed the message I get some MoarVM panics:

    MoarVM panic: Internal error: zeroed target thread ID in work pass

or sometimes

    MoarVM panic: Internal error: invalid thread ID 8614688 in GC work pass

This only happens after running it for some time (< 1 minute)
Code below:

use PKafka::Consumer;
use PKafka::Message;
use PKafka::Config;

sub MAIN () {
   my $exception-channel = Channel.new;
   for (0..5) -> $partition {

       say "start $partition";

       my $kafka-config = PKafka::Config.new({
'metadata.broker.list' => 'kafka-dev-1:5551, kafka-dev-2:5551, kafka-dev-3:5551',
               'security.protocol' => 'ssl',
               'ssl.ca.location' => '/etc/ssl/ca_server_cert.pem',
               'ssl.certificate.location' => '/etc/ssl/client_cert.pem',
               'ssl.key.location' => '/etc/ssl/client_key.pem',
               'ssl.key.password' => 'password',
               'group.id' => 'metrics-feed-consumer',
               'enable.auto.offset.store' => 'false',
       });
       my $kafka-consumer = PKafka::Consumer.new(
topic => 'monitoring.metrics', brokers => 'kafka-dev-1:5551, kafka-dev-2:5551, kafka-dev-3:5551',
            config => $kafka-config
       );
       $kafka-consumer.messages.tap(-> $msg {
           given $msg {
               when PKafka::Message {
                   say "$partition : got {$msg.payload-str}";
                   $kafka-consumer.save-offset($msg); # <--- guilty line
               }
           }
       });

my $worker-done = $kafka-consumer.consume-from-last(partition => $partition);

       $exception-channel.send($worker-done);
   }

   react {
       # handle exceptions
       whenever $exception-channel -> $promise {
           whenever $promise {}
       }

       # fininsh up properly with signals
       whenever signal(SIGINT,SIGKILL) { done }
   }
}


#### end of code

Could that an MoarVM issue? Could it be some threading issue withing librdkafka? Or in the interaction between both?


Cheers
Konrad


--
konrad bucheli
principal systems engineer

open systems ag
raeffelstrasse 29
ch-8045 zurich

t: +41 58 100 10 10
f: +41 58 100 10 11
k...@open.ch

http://www.open.ch

Attachment: smime.p7s
Description: S/MIME Cryptographic Signature

Reply via email to