HiWe want to use Perl 6 as Kafka client. When I want to save the offset in the queue after I processed the message I get some MoarVM panics:
MoarVM panic: Internal error: zeroed target thread ID in work pass
or sometimes
MoarVM panic: Internal error: invalid thread ID 8614688 in GC work pass
This only happens after running it for some time (< 1 minute)
Code below:
use PKafka::Consumer;
use PKafka::Message;
use PKafka::Config;
sub MAIN () {
my $exception-channel = Channel.new;
for (0..5) -> $partition {
say "start $partition";
my $kafka-config = PKafka::Config.new({
'metadata.broker.list' => 'kafka-dev-1:5551,
kafka-dev-2:5551, kafka-dev-3:5551',
'security.protocol' => 'ssl',
'ssl.ca.location' => '/etc/ssl/ca_server_cert.pem',
'ssl.certificate.location' => '/etc/ssl/client_cert.pem',
'ssl.key.location' => '/etc/ssl/client_key.pem',
'ssl.key.password' => 'password',
'group.id' => 'metrics-feed-consumer',
'enable.auto.offset.store' => 'false',
});
my $kafka-consumer = PKafka::Consumer.new(
topic => 'monitoring.metrics', brokers =>
'kafka-dev-1:5551, kafka-dev-2:5551, kafka-dev-3:5551',
config => $kafka-config
);
$kafka-consumer.messages.tap(-> $msg {
given $msg {
when PKafka::Message {
say "$partition : got {$msg.payload-str}";
$kafka-consumer.save-offset($msg); # <--- guilty line
}
}
});
my $worker-done = $kafka-consumer.consume-from-last(partition =>
$partition);
$exception-channel.send($worker-done);
}
react {
# handle exceptions
whenever $exception-channel -> $promise {
whenever $promise {}
}
# fininsh up properly with signals
whenever signal(SIGINT,SIGKILL) { done }
}
}
#### end of code
Could that an MoarVM issue? Could it be some threading issue withing
librdkafka? Or in the interaction between both?
Cheers Konrad -- konrad bucheli principal systems engineer open systems ag raeffelstrasse 29 ch-8045 zurich t: +41 58 100 10 10 f: +41 58 100 10 11 [email protected] http://www.open.ch
smime.p7s
Description: S/MIME Cryptographic Signature
