Looks like version mismatch between Kafka client and server. Can you check your command line again, may be you are contacting a different cluster somehow.
On Mon, May 23, 2016 at 5:03 PM, amir bahmanyari <[email protected]> wrote: > Hi Raghu, > Thanks for your reply. > No, same issue with Kafka 0901. > I rebuilt/rerun everything and still same issue. > Thanks again. > > ------------------------------ > *From:* Raghu Angadi <[email protected]> > *To:* [email protected]; amir bahmanyari <[email protected]> > > *Sent:* Monday, May 23, 2016 3:59 PM > *Subject:* Re: Error reading field 'topic_metadata' > > Strange. may be the issue is similar to one here : > https://github.com/apache/incubator-beam/pull/374 > > Can you fix your kafka version to 0.9.0.1 as in above pull request? > > On Mon, May 23, 2016 at 3:32 PM, amir bahmanyari <[email protected]> > wrote: > > Hi Colleagues, > I have been successfully running KafkaIO in my Beam app thus far. > I mvn clean rebuilt/packaged it and all of the sudden it now throws this > exception below. > *SchemaException: Error reading field 'topic_metadata': Error reading > array of size 420978, only 34 bytes available* > Any idea pls? Thanks for your attention. > > /opt/analytics/apache/flink-1.0.0/bin/flink run > /opt/maven305/dataflow-test/target/dataflow-test-1.0.jar --topic lrdata > --bootstrap.servers kafkahost:9092 --zookeeper.connect kafkahost:2181 -- > group.id myGroup > > .....................Completed method > ...about to run pipeline > ...Running thread threw: java.lang.RuntimeException: Error while > translating UnboundedSource: > org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource@6e5bfdfc > at > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:283) > at > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:244) > at > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:108) > at > org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:89) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) > at > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292) > at > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108) > at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) > at > benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:61) * // > this is p.run() statement* > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > *Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error > reading field 'topic_metadata': Error reading array of size 420978, only 34 > bytes available* > at > org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73) > at > org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) > at > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:178) > at > org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:205) > at > org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1272) > at > org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:574) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.<init>(UnboundedSourceWrapper.java:131) > at > org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:280) > ... 25 more > > > > >
