- installed kafka 0901- bounced the VMs-restarted everything (hadoop, flink,
kafka.....)- rebuilt Beam app making sure it includes kafka client 0901All of
the sudden it works...My best guess is, like you suspected, another 0900 thread
was running somewhere and recycling the servers killed it.Thanks Raghu.
From: Raghu Angadi <[email protected]>
To: amir bahmanyari <[email protected]>
Cc: "[email protected]" <[email protected]>
Sent: Monday, May 23, 2016 5:46 PM
Subject: Re: Error reading field 'topic_metadata'
Looks like version mismatch between Kafka client and server. Can you check your
command line again, may be you are contacting a different cluster somehow.
On Mon, May 23, 2016 at 5:03 PM, amir bahmanyari <[email protected]> wrote:
Hi Raghu,Thanks for your reply.No, same issue with Kafka 0901.I rebuilt/rerun
everything and still same issue.Thanks again.
From: Raghu Angadi <[email protected]>
To: [email protected]; amir bahmanyari <[email protected]>
Sent: Monday, May 23, 2016 3:59 PM
Subject: Re: Error reading field 'topic_metadata'
Strange. may be the issue is similar to one here :
https://github.com/apache/incubator-beam/pull/374
Can you fix your kafka version to 0.9.0.1 as in above pull request?
On Mon, May 23, 2016 at 3:32 PM, amir bahmanyari <[email protected]> wrote:
Hi Colleagues,I have been successfully running KafkaIO in my Beam app thus
far.I mvn clean rebuilt/packaged it and all of the sudden it now throws this
exception below.SchemaException: Error reading field 'topic_metadata': Error
reading array of size 420978, only 34 bytes available
Any idea pls? Thanks for your attention.
/opt/analytics/apache/flink-1.0.0/bin/flink run
/opt/maven305/dataflow-test/target/dataflow-test-1.0.jar --topic lrdata
--bootstrap.servers kafkahost:9092 --zookeeper.connect kafkahost:2181
--group.id myGroup
.....................Completed method...about to run pipeline...Running thread
threw: java.lang.RuntimeException: Error while translating UnboundedSource:
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource@6e5bfdfc at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:283)
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:244)
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:108)
at
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:89)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292) at
org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:61) // this
is p.run() statement at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused
by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field
'topic_metadata': Error reading array of size 420978, only 34 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
at
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
at
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:178)
at
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:205)
at
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1272)
at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:574)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.<init>(UnboundedSourceWrapper.java:131)
at
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:280)
... 25 more