Hi All:
I'm trying camel-kafka component and i have some issues when trying to
consume messages from kakfa server.

1. Is header "kafka.CONTENT_TYPE" required? In normal java app i've tried to
get messages like this:
from(endpoint).to("stream:out?");

My endpoint configured like this:

final KafkaEndpoint endpoint = new KafkaEndpoint("kafka://?", "",
component);
endpoint.setTopic("page_visits");
endpoint.setZookeeperHost(host);
endpoint.setZookeeperPort(port);
endpoint.setGroupId("myGroup");

And without defining this header whe i produce messages i've got NPE.
Looking at the source code i guess it happens here:

public Exchange createKafkaExchange(MessageAndMetadata<byte[], byte[]> mm) {
        Exchange exchange = new DefaultExchange(getCamelContext(),
getExchangePattern());

        Message message = new DefaultMessage();
        ...
        // NPE occurs here
        message.setHeader(KafkaConstants.KEY, new String(mm.key()));
        ...
}

2. I'm not able to get message from my kafka server. After setting header
kafka.CONTENT_TYPE and avoiding NPE body of returned exchange is always
null.
Looking in the onsumer sample described at  Consumer Group Example
<https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example>  
we can see that message obtainde like this:

ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
//This is message from kafka server
it.next().message();

And again looking at source code of  KafkaEndpoint
<https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java>
  
class message that came from server is ignored.

So are this two points bugs, or any suggestions how to resolve this issue
will be appreciated.

Thanks.



--
View this message in context: 
http://camel.465427.n5.nabble.com/Issue-with-consumer-in-camel-kafka-component-tp5748957.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to