Hello All,

I am trying to create a Consumer using Apache Camel for a topic in Apache Kafka.
I am using Camel 2.17.0 and Kafka 0.10  and JDK 1.8.
I have attached a file, KafkaCamelTestConsumer.java which is a standalone 
application trying to read from a topic  "test1"created in Apache Kafka
I am producing messages from the console and also was successful to produce 
messages using a Camel program in the topic "test1", but not able to consume 
messages. Ideally, it should get printed, but nothing seems to happen. The log 
says that the route has started but does not process any message.

Please help to confirm if there is anything wrong with the below syntax:
from("kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest"
 +
                
"&consumersCount=1&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                + 
"valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                + 
"&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true").split()
                .body()
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange)
                            throws Exception {
                        String messageKey = "";
                        if (exchange.getIn() != null) {
                            Message message = exchange.getIn();
                            Integer partitionId = (Integer) message
                                    .getHeader(KafkaConstants.PARTITION);
                            String topicName = (String) message
                                    .getHeader(KafkaConstants.TOPIC);
                            if (message.getHeader(KafkaConstants.KEY) != null)
                                messageKey = (String) message
                                        .getHeader(KafkaConstants.KEY);
                            Object data = message.getBody();


                            System.out.println("topicName :: "
                                    + topicName + " partitionId :: "
                                    + partitionId + " messageKey :: "
                                    + messageKey + " message :: "
                                    + data + "\n");
                        }
                    }
                
}).to("file://C:/swati/?fileName=MyOutputFile.txt&charset=utf-8");
    }
});

I have also tried with the basic parameters as below and it still fails to read 
messages.
from("kafka:localhost:9092?topic=test1&groupId=testingGroupNew&autoOffsetReset=earliest")

Any help on this will be greatly appreciated.

Thanks in advance

Thanks & Regards
Swati


This e-mail and any attachments to it (the "Communication") is, unless 
otherwise stated, confidential, may contain copyright material and is for the 
use only of the intended recipient. If you receive the Communication in error, 
please notify the sender immediately by return e-mail, delete the Communication 
and the return e-mail, and do not read, copy, retransmit or otherwise deal with 
it. Any views expressed in the Communication are those of the individual sender 
only, unless expressly stated to be those of Australia and New Zealand Banking 
Group Limited ABN 11 005 357 522, or any of its related entities including ANZ 
Bank New Zealand Limited (together "ANZ"). ANZ does not accept liability in 
connection with the integrity of or errors in the Communication, computer 
virus, data corruption, interference or delay arising from or in respect of the 
Communication.

Reply via email to