@Vikram the folder /tmp/FromKafka is already created.
@Chaitanya yes I am pusing messages in kafka topic using consoleProducer
 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
kafka2hdfs

@Ajay
will try with the initial offset to EARLIEST.

Thank you
Sushil

On Fri, May 5, 2017 at 2:54 PM, AJAY GUPTA <ajaygit...@gmail.com> wrote:

> Also, you may be required to specify the initial offset to EARLIEST.
>
> Ajay
>
> On Fri, May 5, 2017 at 2:35 PM, vikram patil <patilvik...@gmail.com>
> wrote:
>
>> Hi Sushil,
>>
>> Have you provided configuration specifying hdfs directory and file for
>> an application?
>> You may have to create /tmp/fromKafka directory in hdfs.
>> Thanks & Regards,
>> Vikram
>>
>> On Fri, May 5, 2017 at 2:30 PM, Sushil Apex <sushil.aa...@gmail.com>
>> wrote:
>> >
>> > I am using Apex 3.5.0 and kafka 0.9
>> > malhar library used is 3.6.0
>> >
>> > I am following the example from https://github.com/DataTorrent
>> /examples/blob/master/tutorials/kafka/src/main/java/
>> com/example/myapexapp/KafkaApp.java
>> >
>> > I am putting messages in Kafka topic using consolekafkaProducer
>> provided by Kafka, but I am not able to read these messages in Apex
>> DAG(created based on above link).
>> >
>> > I am running the apex apa file through apex-cli
>> >
>> > Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch:
>> 6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8
>> > apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa
>> >   1. Kafka2HDFS
>> >   2. MyFirstApplication
>> > Choose application: 1
>> >
>> > and nothing happens after this, I can see the messages put in
>> consoleConsumer in kafka logs
>> >
>> > Properties used are
>> >
>> > <property>
>> >   <name>dt.operator.kafkaIn.prop.topics</name>
>> >   <value>kafka2hdfs</value>
>> > </property>
>> >
>> > <property>
>> >   <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name>
>> >   <value>localhost:2181</value>
>> > </property>
>> > <property>
>> >   <name>dt.operator.kafkaIn.prop.clusters</name>
>> >   <value>localhost:9092</value>
>> > </property>
>> > <property>
>> >   <name>dt.operator.kafkaIn.prop.initialPartitionCount</name>
>> >   <value>1</value>
>> > </property>
>> >
>> >
>> > Application Code
>> >
>> > KafkaSinglePortByteArrayInputOperator in
>> >                 = dag.addOperator("kafkaIn", new
>> KafkaSinglePortByteArrayInputOperator());
>> >
>> >         LineOutputOperator out = dag.addOperator("fileOut", new
>> LineOutputOperator());
>> >
>> >         dag.addStream("dataf", in.outputPort, out.input);
>> >
>> >
>> > I am not able to understand what config I am missing here?
>>
>
>

Reply via email to