HiWe want to use Perl 6 as Kafka client. When I want to save the offset in the queue after I processed the message I get some MoarVM panics:
MoarVM panic: Internal error: zeroed target thread ID in work pass or sometimes MoarVM panic: Internal error: invalid thread ID 8614688 in GC work pass This only happens after running it for some time (< 1 minute) Code below: use PKafka::Consumer; use PKafka::Message; use PKafka::Config; sub MAIN () { my $exception-channel = Channel.new; for (0..5) -> $partition { say "start $partition"; my $kafka-config = PKafka::Config.new({'metadata.broker.list' => 'kafka-dev-1:5551, kafka-dev-2:5551, kafka-dev-3:5551',
'security.protocol' => 'ssl', 'ssl.ca.location' => '/etc/ssl/ca_server_cert.pem', 'ssl.certificate.location' => '/etc/ssl/client_cert.pem', 'ssl.key.location' => '/etc/ssl/client_key.pem', 'ssl.key.password' => 'password', 'group.id' => 'metrics-feed-consumer', 'enable.auto.offset.store' => 'false', }); my $kafka-consumer = PKafka::Consumer.new(topic => 'monitoring.metrics', brokers => 'kafka-dev-1:5551, kafka-dev-2:5551, kafka-dev-3:5551',
config => $kafka-config ); $kafka-consumer.messages.tap(-> $msg { given $msg { when PKafka::Message { say "$partition : got {$msg.payload-str}"; $kafka-consumer.save-offset($msg); # <--- guilty line } } });my $worker-done = $kafka-consumer.consume-from-last(partition => $partition);
$exception-channel.send($worker-done); } react { # handle exceptions whenever $exception-channel -> $promise { whenever $promise {} } # fininsh up properly with signals whenever signal(SIGINT,SIGKILL) { done } } } #### end of codeCould that an MoarVM issue? Could it be some threading issue withing librdkafka? Or in the interaction between both?
Cheers Konrad -- konrad bucheli principal systems engineer open systems ag raeffelstrasse 29 ch-8045 zurich t: +41 58 100 10 10 f: +41 58 100 10 11 k...@open.ch http://www.open.ch
smime.p7s
Description: S/MIME Cryptographic Signature