Re: how to browse / search message in topic by consumertemplate

2018-11-14 Thread Willem Jiang
You cannot brower the message in a JMS topic[1].

[1]http://activemq.apache.org/can-you-browse-a-topic.html

Willem Jiang

Twitter: willemjiang
Weibo: 姜宁willem
On Wed, Nov 7, 2018 at 11:09 PM Wang Yan  wrote:
>
> Hey All,
>
> I need to use consumertemplate to search message
>
> i did something like below. but i have two concerns
>
> 1) it could not search particular message based on messageid
>
> 2) i just want to search or browse message, i dont want to digest it
>
> any suggestions or hints are more than welcome!
>
> @Override
> public void process(Exchange exchange) throws Exception {
> Result result = new Result();
> try {
> result.setMessageBody("MESSAGE NOT FOUND");
> if (exchange != null && exchange.getIn() != null) {
> String operationName =
> exchange.getIn().getHeader(CxfConstants.OPERATION_NAME, String.class);
> MessageContentsList msgList = (MessageContentsList)
> exchange.getIn().getBody();
> if (operationName.equalsIgnoreCase("getMessageFromTopic")) {
> String topic = (String) msgList.get(0);
> String messagid = (String) msgList.get(1);
>  Exchange ex = consumerTemplate.receive("activemq:topic:"+topic);
>  String messageid=  (String)ex.getIn().getHeader("JMSMessageID");
>  String messageBody=ex.getIn().getBody(String.class);
>  result.setMessageId(messageid);
>  result.setMessageBody(messageBody);
> }
> }
> } catch (Exception e) {
> LOG.error("erorr happened ", e);
> throw e;
> }
> exchange.getOut().setBody(result);


Camel Kafka Consumer KafkaException ignored

2018-11-14 Thread Webster Homer
I have an application that uses Camel 2.21.1 and Kafka 1.1

I had a bug where the deserializer for the consumer threw an exception when 
deserializing the message. I could see an exception in the log, but the 
consumer never handled the exception and the message was throwing an exception 
each time, forever. Why didn't camel see the exception so that the application 
code could commit that offset?

This is my consumer endpoint
@EndpointInject(uri = 
"kafka:{{deep.send.order.email}}?brokers={{deep.notify.kafka.server}}=orders-enrichment-grp&"+
"autoCommitEnable=false=true&" +
"maxPollRecords={{deep.kafka.poll.count}}&" +

"keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&" +

"valueDeserializer=com.sial.notifications.kafka.utilities.MessageJSONFormatter")

The problem was in the MessageJSONFormatter and I have fixed the problem. 
However, I expected that the exception be caught in Camel's kafka consumer and 
propagated to the global handler where the offset could be committed

How do I address an issue like this in the future?