Thanks Chaitanya. Issue is fixed. I had wrong version of Kafka for the operator 
I used.
Thanks for helpp

From: Chaitanya Chebolu [mailto:[email protected]]
Sent: Tuesday, December 20, 2016 10:41 PM
To: [email protected]
Subject: Re: Property Name for Kafka config

John,

   Looks like malhar-contrib and kafka dependency are missed.
   Please add the below dependencies in pom:
<dependency>
      <groupId>org.apache.apex</groupId>
      <artifactId>malhar-contrib</artifactId>
      <version>${malhar.version}</version>
      <exclusions>
        <exclusion>
          <groupId>*</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

<dependency>
      <groupId>org.apache.apex</groupId>
      <artifactId>malhar-contrib</artifactId>
      <version>${malhar.version}</version>
      <exclusions>
        <exclusion>
          <groupId>*</groupId>
          <artifactId>*</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

Regards,
Chaitanya

On Wed, Dec 21, 2016 at 11:18 AM, JOHN, BIBIN 
<[email protected]<mailto:[email protected]>> wrote:
Thanks for your response. I have attached pom file.

From: Chaitanya Chebolu 
[mailto:[email protected]<mailto:[email protected]>]
Sent: Tuesday, December 20, 2016 9:46 PM

To: [email protected]<mailto:[email protected]>
Subject: Re: Property Name for Kafka config

John,

  This looks like a dependency issue.
   Could you please provide the details of which version of Kafka and Malhar 
library are you using?

Regards,
Chaitanya

On Wed, Dec 21, 2016 at 2:39 AM, JOHN, BIBIN 
<[email protected]<mailto:[email protected]>> wrote:
Thanks Chitanya. I was able to start job, but failing with below exception.

2016-12-20 12:44:47,768 INFO  zookeeper.ZooKeeper (ZooKeeper.java:<init>(438)) 
- Initiating client connection, 
connectString=hltd435.hydc.sbc.com:2181<http://hltd435.hydc.sbc.com:2181> 
sessionTimeout=30000 
watcher=org.I0Itec.zkclient.ZkClient@40dd3977<mailto:watcher=org.I0Itec.zkclient.ZkClient@40dd3977>
2016-12-20 12:44:47,782 INFO  zkclient.ZkClient 
(ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state SyncConnected
2016-12-20 12:44:47,786 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1019)) - Opening socket connection to server 
localhost:2181. Will not attempt to authenticate using SASL (unknown error)
2016-12-20 12:44:47,787 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:primeConnection(864)) - Socket connection established to 
localhost:2181, initiating session
2016-12-20 12:44:47,793 INFO  zookeeper.ClientCnxn 
(ClientCnxn.java:onConnected(1279)) - Session establishment complete on server 
localhost:2181, sessionid = 0x158fcc42050001a, negotiated timeout = 30000
2016-12-20 12:44:47,795 INFO  zkclient.ZkClient 
(ZkClient.java:processStateChanged(711)) - zookeeper state changed 
(SyncConnected)
2016-12-20 12:44:47,797 ERROR stram.StreamingAppMaster 
(StreamingAppMaster.java:main(106)) - Exiting Application Master
java.lang.NoSuchMethodError: 
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;
        at 
com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers(KafkaMetadataUtil.java:113)
        at 
com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers(KafkaConsumer.java:131)
        at 
com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:488)
        at 
com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
        at 
com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
        at 
com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
        at 
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
        at 
com.datatorrent.stram.StreamingContainerManager.getInstance(StreamingContainerManager.java:3023)
        at 
com.datatorrent.stram.StreamingAppMasterService.serviceInit(StreamingAppMasterService.java:551)
        at 
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
        at 
com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:102)
End of LogType:dt.log

LogType:launch_container.sh
Log Upload Time:Tue Dec 20 12:44:49 -0800 2016
LogLength:20175
Log Contents:
#!/bin/bash

Thanks and Regards,
Bibin John| Data Movement Technology Development
20205 North Creek Pkwy , Bothell, WA 98011 USA
• Office: (770) 235 5614 | Cell: (469) 648-9858
Email: [email protected]<mailto:[email protected]>

From: Chaitanya Chebolu 
[mailto:[email protected]<mailto:[email protected]>]
Sent: Tuesday, December 20, 2016 2:51 AM
To: [email protected]<mailto:[email protected]>
Subject: Re: Property Name for Kafka config

Hi John,

Please refer the below example application:
https://github.com/DataTorrent/examples/tree/master/tutorials/exactly-once
and documentation of Kafka Input Operator at 
https://github.com/apache/apex-malhar/blob/master/docs/operators/kafkaInputOperator.md

Regards,
Chaitanya

On Tue, Dec 20, 2016 at 3:47 PM, JOHN, BIBIN 
<[email protected]<mailto:[email protected]>> wrote:
All,
Could you please let me know name of the properties which I must use for below 
operator?

KafkaSinglePortStringInputOperator



Reply via email to