Looks like version mismatch between Kafka client and server.
Can you check your command line again, may be you are contacting a
different cluster somehow.

On Mon, May 23, 2016 at 5:03 PM, amir bahmanyari <[email protected]>
wrote:

> Hi Raghu,
> Thanks for your reply.
> No, same issue with Kafka 0901.
> I rebuilt/rerun everything and still same issue.
> Thanks again.
>
> ------------------------------
> *From:* Raghu Angadi <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Monday, May 23, 2016 3:59 PM
> *Subject:* Re: Error reading field 'topic_metadata'
>
> Strange. may be the issue is similar to one here :
> https://github.com/apache/incubator-beam/pull/374
>
> Can you fix your kafka version to 0.9.0.1 as in above pull request?
>
> On Mon, May 23, 2016 at 3:32 PM, amir bahmanyari <[email protected]>
> wrote:
>
> Hi Colleagues,
> I have been successfully running KafkaIO in my Beam app thus far.
> I mvn clean rebuilt/packaged it and all of the sudden it now throws this
> exception below.
> *SchemaException: Error reading field 'topic_metadata': Error reading
> array of size 420978, only 34 bytes available*
> Any idea pls? Thanks for your attention.
>
> /opt/analytics/apache/flink-1.0.0/bin/flink run
> /opt/maven305/dataflow-test/target/dataflow-test-1.0.jar --topic lrdata
> --bootstrap.servers kafkahost:9092 --zookeeper.connect kafkahost:2181 --
> group.id myGroup
>
> .....................Completed method
> ...about to run pipeline
> ...Running thread  threw:  java.lang.RuntimeException: Error while
> translating UnboundedSource:
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource@6e5bfdfc
>         at
> org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:283)
>         at
> org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:244)
>         at
> org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:108)
>         at
> org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:89)
>         at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225)
>         at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
>         at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220)
>         at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>         at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)
>         at
> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>         at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108)
>         at
> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)
>         at
> benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:61) * //
> this is p.run() statement*
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> *Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error
> reading field 'topic_metadata': Error reading array of size 420978, only 34
> bytes available*
>         at
> org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)
>         at
> org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)
>         at
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
>         at
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:178)
>         at
> org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:205)
>         at
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1272)
>         at
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:574)
>         at
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.<init>(UnboundedSourceWrapper.java:131)
>         at
> org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:280)
>         ... 25 more
>
>
>
>
>

Reply via email to