Tried with EARLIEST option also, no luck :( On Fri, May 5, 2017 at 3:49 PM, Sushil Apex <sushil.aa...@gmail.com> wrote:
> @Vikram the folder /tmp/FromKafka is already created. > @Chaitanya yes I am pusing messages in kafka topic using consoleProducer > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic > kafka2hdfs > > @Ajay > will try with the initial offset to EARLIEST. > > Thank you > Sushil > > On Fri, May 5, 2017 at 2:54 PM, AJAY GUPTA <ajaygit...@gmail.com> wrote: > >> Also, you may be required to specify the initial offset to EARLIEST. >> >> Ajay >> >> On Fri, May 5, 2017 at 2:35 PM, vikram patil <patilvik...@gmail.com> >> wrote: >> >>> Hi Sushil, >>> >>> Have you provided configuration specifying hdfs directory and file for >>> an application? >>> You may have to create /tmp/fromKafka directory in hdfs. >>> Thanks & Regards, >>> Vikram >>> >>> On Fri, May 5, 2017 at 2:30 PM, Sushil Apex <sushil.aa...@gmail.com> >>> wrote: >>> > >>> > I am using Apex 3.5.0 and kafka 0.9 >>> > malhar library used is 3.6.0 >>> > >>> > I am following the example from https://github.com/DataTorrent >>> /examples/blob/master/tutorials/kafka/src/main/java/com/ >>> example/myapexapp/KafkaApp.java >>> > >>> > I am putting messages in Kafka topic using consolekafkaProducer >>> provided by Kafka, but I am not able to read these messages in Apex >>> DAG(created based on above link). >>> > >>> > I am running the apex apa file through apex-cli >>> > >>> > Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch: >>> 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8 >>> > apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa >>> > 1. Kafka2HDFS >>> > 2. MyFirstApplication >>> > Choose application: 1 >>> > >>> > and nothing happens after this, I can see the messages put in >>> consoleConsumer in kafka logs >>> > >>> > Properties used are >>> > >>> > <property> >>> > <name>dt.operator.kafkaIn.prop.topics</name> >>> > <value>kafka2hdfs</value> >>> > </property> >>> > >>> > <property> >>> > <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name> >>> > <value>localhost:2181</value> >>> > </property> >>> > <property> >>> > <name>dt.operator.kafkaIn.prop.clusters</name> >>> > <value>localhost:9092</value> >>> > </property> >>> > <property> >>> > <name>dt.operator.kafkaIn.prop.initialPartitionCount</name> >>> > <value>1</value> >>> > </property> >>> > >>> > >>> > Application Code >>> > >>> > KafkaSinglePortByteArrayInputOperator in >>> > = dag.addOperator("kafkaIn", new >>> KafkaSinglePortByteArrayInputOperator()); >>> > >>> > LineOutputOperator out = dag.addOperator("fileOut", new >>> LineOutputOperator()); >>> > >>> > dag.addStream("dataf", in.outputPort, out.input); >>> > >>> > >>> > I am not able to understand what config I am missing here? >>> >> >> >