- installed kafka 0901- bounced the VMs-restarted everything (hadoop, flink, 
kafka.....)- rebuilt Beam app making sure it includes kafka client 0901All of 
the sudden it works...My best guess is, like you suspected, another 0900 thread 
was running somewhere and recycling the servers killed it.Thanks Raghu.

      From: Raghu Angadi <[email protected]>
 To: amir bahmanyari <[email protected]> 
Cc: "[email protected]" <[email protected]>
 Sent: Monday, May 23, 2016 5:46 PM
 Subject: Re: Error reading field 'topic_metadata'
   
Looks like version mismatch between Kafka client and server. Can you check your 
command line again, may be you are contacting a different cluster somehow.
On Mon, May 23, 2016 at 5:03 PM, amir bahmanyari <[email protected]> wrote:

Hi Raghu,Thanks for your reply.No, same issue with Kafka 0901.I rebuilt/rerun 
everything and still same issue.Thanks again.
      From: Raghu Angadi <[email protected]>
 To: [email protected]; amir bahmanyari <[email protected]> 
 Sent: Monday, May 23, 2016 3:59 PM
 Subject: Re: Error reading field 'topic_metadata'
  
Strange. may be the issue is similar to one here : 
https://github.com/apache/incubator-beam/pull/374
Can you fix your kafka version to 0.9.0.1 as in above pull request?
On Mon, May 23, 2016 at 3:32 PM, amir bahmanyari <[email protected]> wrote:

Hi Colleagues,I have been successfully running KafkaIO in my Beam app thus 
far.I mvn clean rebuilt/packaged it and all of the sudden it now throws this 
exception below.SchemaException: Error reading field 'topic_metadata': Error 
reading array of size 420978, only 34 bytes available
Any idea pls? Thanks for your attention.
/opt/analytics/apache/flink-1.0.0/bin/flink run 
/opt/maven305/dataflow-test/target/dataflow-test-1.0.jar --topic lrdata 
--bootstrap.servers kafkahost:9092 --zookeeper.connect kafkahost:2181 
--group.id myGroup
.....................Completed method...about to run pipeline...Running thread  
threw:  java.lang.RuntimeException: Error while translating UnboundedSource: 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource@6e5bfdfc        at 
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:283)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:244)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:108)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:89)
        at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:225) 
       at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220) 
       at 
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:220) 
       at 
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
        at 
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:292)        at 
org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
        at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:132)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:108)
        at 
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:49)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182)        at 
benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:61)  // this 
is p.run() statement        at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)  
      at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)        
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)   
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused 
by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
'topic_metadata': Error reading array of size 420978, only 34 bytes available   
     at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:73)      
  at 
org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380)    
    at 
org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:269)  
      at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:178)
        at 
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:205)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1272)
        at 
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaSource.generateInitialSplits(KafkaIO.java:574)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.<init>(UnboundedSourceWrapper.java:131)
        at 
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:280)
        ... 25 more



   



  

Reply via email to