Thanks Chitanya. I was able to start job, but failing with below exception.
2016-12-20 12:44:47,768 INFO zookeeper.ZooKeeper (ZooKeeper.java:<init>(438))
- Initiating client connection, connectString=hltd435.hydc.sbc.com:2181
sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@40dd3977
2016-12-20 12:44:47,782 INFO zkclient.ZkClient
(ZkClient.java:waitForKeeperState(934)) - Waiting for keeper state SyncConnected
2016-12-20 12:44:47,786 INFO zookeeper.ClientCnxn
(ClientCnxn.java:logStartConnect(1019)) - Opening socket connection to server
localhost:2181. Will not attempt to authenticate using SASL (unknown error)
2016-12-20 12:44:47,787 INFO zookeeper.ClientCnxn
(ClientCnxn.java:primeConnection(864)) - Socket connection established to
localhost:2181, initiating session
2016-12-20 12:44:47,793 INFO zookeeper.ClientCnxn
(ClientCnxn.java:onConnected(1279)) - Session establishment complete on server
localhost:2181, sessionid = 0x158fcc42050001a, negotiated timeout = 30000
2016-12-20 12:44:47,795 INFO zkclient.ZkClient
(ZkClient.java:processStateChanged(711)) - zookeeper state changed
(SyncConnected)
2016-12-20 12:44:47,797 ERROR stram.StreamingAppMaster
(StreamingAppMaster.java:main(106)) - Exiting Application Master
java.lang.NoSuchMethodError:
kafka.utils.ZkUtils.getAllBrokersInCluster(Lorg/I0Itec/zkclient/ZkClient;)Lscala/collection/Seq;
at
com.datatorrent.contrib.kafka.KafkaMetadataUtil.getBrokers(KafkaMetadataUtil.java:113)
at
com.datatorrent.contrib.kafka.KafkaConsumer.initBrokers(KafkaConsumer.java:131)
at
com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.definePartitions(AbstractKafkaInputOperator.java:488)
at
com.datatorrent.stram.plan.physical.PhysicalPlan.initPartitioning(PhysicalPlan.java:752)
at
com.datatorrent.stram.plan.physical.PhysicalPlan.addLogicalOperator(PhysicalPlan.java:1676)
at
com.datatorrent.stram.plan.physical.PhysicalPlan.<init>(PhysicalPlan.java:378)
at
com.datatorrent.stram.StreamingContainerManager.<init>(StreamingContainerManager.java:418)
at
com.datatorrent.stram.StreamingContainerManager.getInstance(StreamingContainerManager.java:3023)
at
com.datatorrent.stram.StreamingAppMasterService.serviceInit(StreamingAppMasterService.java:551)
at
org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at
com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:102)
End of LogType:dt.log
LogType:launch_container.sh
Log Upload Time:Tue Dec 20 12:44:49 -0800 2016
LogLength:20175
Log Contents:
#!/bin/bash
Thanks and Regards,
Bibin John| Data Movement Technology Development
20205 North Creek Pkwy , Bothell, WA 98011 USA
• Office: (770) 235 5614 | Cell: (469) 648-9858
Email: [email protected]<mailto:[email protected]>
From: Chaitanya Chebolu [mailto:[email protected]]
Sent: Tuesday, December 20, 2016 2:51 AM
To: [email protected]
Subject: Re: Property Name for Kafka config
Hi John,
Please refer the below example application:
https://github.com/DataTorrent/examples/tree/master/tutorials/exactly-once
and documentation of Kafka Input Operator at
https://github.com/apache/apex-malhar/blob/master/docs/operators/kafkaInputOperator.md
Regards,
Chaitanya
On Tue, Dec 20, 2016 at 3:47 PM, JOHN, BIBIN
<[email protected]<mailto:[email protected]>> wrote:
All,
Could you please let me know name of the properties which I must use for below
operator?
KafkaSinglePortStringInputOperator