Tried with EARLIEST option also, no luck :(

On Fri, May 5, 2017 at 3:49 PM, Sushil Apex <sushil.aa...@gmail.com> wrote:

> @Vikram the folder /tmp/FromKafka is already created.
> @Chaitanya yes I am pusing messages in kafka topic using consoleProducer
>  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
> kafka2hdfs
>
> @Ajay
> will try with the initial offset to EARLIEST.
>
> Thank you
> Sushil
>
> On Fri, May 5, 2017 at 2:54 PM, AJAY GUPTA <ajaygit...@gmail.com> wrote:
>
>> Also, you may be required to specify the initial offset to EARLIEST.
>>
>> Ajay
>>
>> On Fri, May 5, 2017 at 2:35 PM, vikram patil <patilvik...@gmail.com>
>> wrote:
>>
>>> Hi Sushil,
>>>
>>> Have you provided configuration specifying hdfs directory and file for
>>> an application?
>>> You may have to create /tmp/fromKafka directory in hdfs.
>>> Thanks & Regards,
>>> Vikram
>>>
>>> On Fri, May 5, 2017 at 2:30 PM, Sushil Apex <sushil.aa...@gmail.com>
>>> wrote:
>>> >
>>> > I am using Apex 3.5.0 and kafka 0.9
>>> > malhar library used is 3.6.0
>>> >
>>> > I am following the example from https://github.com/DataTorrent
>>> /examples/blob/master/tutorials/kafka/src/main/java/com/
>>> example/myapexapp/KafkaApp.java
>>> >
>>> > I am putting messages in Kafka topic using consolekafkaProducer
>>> provided by Kafka, but I am not able to read these messages in Apex
>>> DAG(created based on above link).
>>> >
>>> > I am running the apex apa file through apex-cli
>>> >
>>> > Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch:
>>> 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8
>>> > apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa
>>> >   1. Kafka2HDFS
>>> >   2. MyFirstApplication
>>> > Choose application: 1
>>> >
>>> > and nothing happens after this, I can see the messages put in
>>> consoleConsumer in kafka logs
>>> >
>>> > Properties used are
>>> >
>>> > <property>
>>> >   <name>dt.operator.kafkaIn.prop.topics</name>
>>> >   <value>kafka2hdfs</value>
>>> > </property>
>>> >
>>> > <property>
>>> >   <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name>
>>> >   <value>localhost:2181</value>
>>> > </property>
>>> > <property>
>>> >   <name>dt.operator.kafkaIn.prop.clusters</name>
>>> >   <value>localhost:9092</value>
>>> > </property>
>>> > <property>
>>> >   <name>dt.operator.kafkaIn.prop.initialPartitionCount</name>
>>> >   <value>1</value>
>>> > </property>
>>> >
>>> >
>>> > Application Code
>>> >
>>> > KafkaSinglePortByteArrayInputOperator in
>>> >                 = dag.addOperator("kafkaIn", new
>>> KafkaSinglePortByteArrayInputOperator());
>>> >
>>> >         LineOutputOperator out = dag.addOperator("fileOut", new
>>> LineOutputOperator());
>>> >
>>> >         dag.addStream("dataf", in.outputPort, out.input);
>>> >
>>> >
>>> > I am not able to understand what config I am missing here?
>>>
>>
>>
>

Reply via email to