John, This looks like a dependency issue. Could you please provide the details of which version of Kafka and Malhar library are you using?
Regards, Chaitanya On Wed, Dec 21, 2016 at 2:39 AM, JOHN, BIBIN <[email protected]> wrote: > Thanks Chitanya. I was able to start job, but failing with below exception. > > > > 2016-12-20 12:44:47,768 INFO zookeeper.ZooKeeper > (ZooKeeper.java:<init>(438)) - Initiating client connection, connectString= > hltd435.hydc.sbc.com:2181 sessionTimeout=30000 > watcher=org.I0Itec.zkclient.ZkClient@40dd3977 > > 2016-12-20 12:44:47,782 INFO zkclient.ZkClient > (ZkClient.java:waitForKeeperState(934)) > - Waiting for keeper state SyncConnected > > 2016-12-20 12:44:47,786 INFO zookeeper.ClientCnxn > (ClientCnxn.java:logStartConnect(1019)) > - Opening socket connection to server localhost:2181. Will not attempt to > authenticate using SASL (unknown error) > > 2016-12-20 12:44:47,787 INFO zookeeper.ClientCnxn > (ClientCnxn.java:primeConnection(864)) > - Socket connection established to localhost:2181, initiating session > > 2016-12-20 12:44:47,793 INFO zookeeper.ClientCnxn > (ClientCnxn.java:onConnected(1279)) - Session establishment complete on > server localhost:2181, sessionid = 0x158fcc42050001a, negotiated timeout = > 30000 > > 2016-12-20 12:44:47,795 INFO zkclient.ZkClient > (ZkClient.java:processStateChanged(711)) > - zookeeper state changed (SyncConnected) > > 2016-12-20 12:44:47,797 ERROR stram.StreamingAppMaster > (StreamingAppMaster.java:main(106)) - Exiting Application Master > > java.lang.NoSuchMethodError: kafka.utils.ZkUtils. > getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;) > Lscala/collection/Seq; > > at com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers( > KafkaMetadataUtil.java:113) > > at com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers( > KafkaConsumer.java:131) > > at com.datatorrent.contrib.kafka.AbstractKafkaInputOperator. > definePartitions(AbstractKafkaInputOperator.java:488) > > at com.datatorrent.stram.plan.physical.PhysicalPlan. > initPartitioning(PhysicalPlan.java:752) > > at com.datatorrent.stram.plan.physical.PhysicalPlan. > addLogicalOperator(PhysicalPlan.java:1676) > > at com.datatorrent.stram.plan.physical.PhysicalPlan.<init>( > PhysicalPlan.java:378) > > at com.datatorrent.stram.StreamingContainerManager.<init>( > StreamingContainerManager.java:418) > > at com.datatorrent.stram.StreamingContainerManager.getInstance( > StreamingContainerManager.java:3023) > > at com.datatorrent.stram.StreamingAppMasterService.serviceInit( > StreamingAppMasterService.java:551) > > at org.apache.hadoop.service.AbstractService.init( > AbstractService.java:163) > > at com.datatorrent.stram.StreamingAppMaster.main( > StreamingAppMaster.java:102) > > End of LogType:dt.log > > > > LogType:launch_container.sh > > Log Upload Time:Tue Dec 20 12:44:49 -0800 2016 > > LogLength:20175 > > Log Contents: > > #!/bin/bash > > > > *Thanks and Regards,* > > *Bibin John| Data Movement Technology Development* > > *20205 North Creek Pkwy , Bothell, WA 98011 USA* > > *(* *Office: (770) 235 5614 | Cell: (469) 648-9858* > > *Email: *[email protected] > > > > *From:* Chaitanya Chebolu [mailto:[email protected]] > *Sent:* Tuesday, December 20, 2016 2:51 AM > *To:* [email protected] > *Subject:* Re: Property Name for Kafka config > > > > Hi John, > > > > Please refer the below example application: > > https://github.com/DataTorrent/examples/tree/master/tutorials/exactly-once > > and documentation of Kafka Input Operator at https://github.com/apache/ > apex-malhar/blob/master/docs/operators/kafkaInputOperator.md > > > > Regards, > > Chaitanya > > > > On Tue, Dec 20, 2016 at 3:47 PM, JOHN, BIBIN <[email protected]> wrote: > > All, > > Could you please let me know name of the properties which I must use for > below operator? > > > > KafkaSinglePortStringInputOperator > > >
