@Vikram the folder /tmp/FromKafka is already created. @Chaitanya yes I am pusing messages in kafka topic using consoleProducer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka2hdfs
@Ajay will try with the initial offset to EARLIEST. Thank you Sushil On Fri, May 5, 2017 at 2:54 PM, AJAY GUPTA <ajaygit...@gmail.com> wrote: > Also, you may be required to specify the initial offset to EARLIEST. > > Ajay > > On Fri, May 5, 2017 at 2:35 PM, vikram patil <patilvik...@gmail.com> > wrote: > >> Hi Sushil, >> >> Have you provided configuration specifying hdfs directory and file for >> an application? >> You may have to create /tmp/fromKafka directory in hdfs. >> Thanks & Regards, >> Vikram >> >> On Fri, May 5, 2017 at 2:30 PM, Sushil Apex <sushil.aa...@gmail.com> >> wrote: >> > >> > I am using Apex 3.5.0 and kafka 0.9 >> > malhar library used is 3.6.0 >> > >> > I am following the example from https://github.com/DataTorrent >> /examples/blob/master/tutorials/kafka/src/main/java/ >> com/example/myapexapp/KafkaApp.java >> > >> > I am putting messages in Kafka topic using consolekafkaProducer >> provided by Kafka, but I am not able to read these messages in Apex >> DAG(created based on above link). >> > >> > I am running the apex apa file through apex-cli >> > >> > Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch: >> 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8 >> > apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa >> > 1. Kafka2HDFS >> > 2. MyFirstApplication >> > Choose application: 1 >> > >> > and nothing happens after this, I can see the messages put in >> consoleConsumer in kafka logs >> > >> > Properties used are >> > >> > <property> >> > <name>dt.operator.kafkaIn.prop.topics</name> >> > <value>kafka2hdfs</value> >> > </property> >> > >> > <property> >> > <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name> >> > <value>localhost:2181</value> >> > </property> >> > <property> >> > <name>dt.operator.kafkaIn.prop.clusters</name> >> > <value>localhost:9092</value> >> > </property> >> > <property> >> > <name>dt.operator.kafkaIn.prop.initialPartitionCount</name> >> > <value>1</value> >> > </property> >> > >> > >> > Application Code >> > >> > KafkaSinglePortByteArrayInputOperator in >> > = dag.addOperator("kafkaIn", new >> KafkaSinglePortByteArrayInputOperator()); >> > >> > LineOutputOperator out = dag.addOperator("fileOut", new >> LineOutputOperator()); >> > >> > dag.addStream("dataf", in.outputPort, out.input); >> > >> > >> > I am not able to understand what config I am missing here? >> > >