I am using Apex 3.5.0 and kafka 0.9
malhar library used is 3.6.0

I am following the example from
https://github.com/DataTorrent/examples/blob/master/tutorials/kafka/src/main/java/com/example/myapexapp/KafkaApp.java

I am putting messages in Kafka topic using consolekafkaProducer provided by
Kafka, but I am not able to read these messages in Apex DAG(created based
on above link).

I am running the apex apa file through apex-cli

Apex CLI 3.5.0 06.12.2016 @ 22:11:51 PST rev: 6de8828 branch:
6de8828e4f3d5734d0a6f9c1be0aa7057cb60ac8
apex> launch --local /home/cloudera/myapexapp-1.0-SNAPSHOT.apa
  1. Kafka2HDFS
  2. MyFirstApplication
Choose application: 1

and nothing happens after this, I can see the messages put in
consoleConsumer in kafka logs

Properties used are

<property>
  <name>dt.operator.kafkaIn.prop.topics</name>
  <value>kafka2hdfs</value>
</property>

<property>
  <name>dt.operator.kafkaIn.prop.consumer.zookeeper</name>
  <value>localhost:2181</value>
</property>
<property>
  <name>dt.operator.kafkaIn.prop.clusters</name>
  <value>localhost:9092</value>
</property>
<property>
  <name>dt.operator.kafkaIn.prop.initialPartitionCount</name>
  <value>1</value>
</property>


Application Code

KafkaSinglePortByteArrayInputOperator in
                = dag.addOperator("kafkaIn", new
KafkaSinglePortByteArrayInputOperator());

        LineOutputOperator out = dag.addOperator("fileOut", new
LineOutputOperator());

        dag.addStream("dataf", in.outputPort, out.input);


I am not able to understand what config I am missing here?

Reply via email to