Re: Auto-commit and camel-kafka

2015-07-05 Thread Willem Jiang
Hi Michael,

I think we don’t take the consideration of committing the offset during the 
shutdown processing.
Please feel free to create a JIRA[1] and send out a patch of it.

[1]https://issues.apache.org/jira/browse/CAMEL

--  
Willem Jiang


Blog: http://willemjiang.blogspot.com (English)  
http://jnn.iteye.com (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem



On July 5, 2015 at 6:42:31 AM, Michael J. Kitchin (mcoyote...@gmail.com) wrote:
> Hi there,
>  
> These are questions re: the official camel-kafka integration. Since the
> issues touch on both camel and kafka I'm sending to both users lists in
> search of answers.
>  
> - - - - -
>  
> I have a simple, inonly, point-to-point, synchronous camel route (a)
> consuming from kafka topic (consumer), (b) running the resulting exchanges
> (messages) through a processor chain and (c) forwarding the outcome on to
> somewhere else.
>  
> If the runtime dies while exchanges are in this pipeline, I want things to
> pick up where they left off when restarted. I'm not picky about seeing the
> same data more than once (as long as it's recent), I just can't lose
> anything.
>  
> In brief, everything's working great except this failure/recovery part --
> in-flight exchanges are getting lost and there is no, apparent re-delivery
> on restart.
>  
> I think this has to do with consumer auto-commit, which is the default
> behavior. My reading of the kafka and camel-kafka docs suggests disabling
> auto-commit will give me what I want, but when I try it I'm not seeing
> re-delivery kick off when I restart.
>  
> After disabling auto-commit, II walked through this in the debugger, FWIW.
> If interested, you can see the code, here:
> -
> https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
>   
>  
> ...Specifically:
> - Processing is underway line 148: processor.process(exchange);
> - Camel triggers a shutdown and this returns, as normal without any
> exchange exception check
> - The barrier await hits at line
> 165: berrier.await(endpoint.getBarrierAwaitTimeoutMs(),
> TimeUnit.MILLISECONDS);
> - This aligns with the rest of the streams and triggers the barrier action,
> which in turn performs the commit at line 193: consumer.commitOffsets();
>  
> Since any exception from line 148 is suppressed and there's no subsequent
> interrupted() or exchange exception check it looks like there's no way to
> to signal not to commit and in-flight exchanges are a guaranteed loss.
>  
> Does this sound correct?
>  
> If so, barring a change from the maintainers I figure I might fork this
> code and optionally bypass the consumer.commitOffsets(); during shutdown
> or when an exchange carries an exception after the
> processor.process(exchange) call.
>  
> Thoughts?
>  
> Please let me know if I may provide any additional information. Thanks.
>  
> -Regards,
> MjK
>  
> - - - - -
>  
> Michael J. Kitchin
> Senior Software Engineer
> Operational Systems, Inc.
> 4450 Arapahoe Avenue, Suite 100
> Boulder, CO 80303
>  
> Phone: 719-271-6476
> Email: michael.kitc...@opsysinc.com
> Web: www.opsysinc.com
>  



Auto-commit and camel-kafka

2015-07-04 Thread Michael J. Kitchin
Hi there,

These are questions re: the official camel-kafka integration. Since the
issues touch on both camel and kafka I'm sending to both users lists in
search of answers.

- - - - -

I have a simple, inonly, point-to-point, synchronous camel route (a)
consuming from kafka topic (consumer), (b) running the resulting exchanges
(messages) through a processor chain and (c) forwarding the outcome on to
somewhere else.

If the runtime dies while exchanges are in this pipeline, I want things to
pick up where they left off when restarted. I'm not picky about seeing the
same data more than once (as long as it's recent), I just can't lose
anything.

In brief, everything's working great except this failure/recovery part --
in-flight exchanges are getting lost and there is no, apparent re-delivery
on restart.

I think this has to do with consumer auto-commit, which is the default
behavior. My reading of the kafka and camel-kafka docs suggests disabling
auto-commit will give me what I want, but when I try it I'm not seeing
re-delivery kick off when I restart.

After disabling auto-commit, II walked through this in the debugger, FWIW.
If interested, you can see the code, here:
-
https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java

...Specifically:
- Processing is underway line 148: processor.process(exchange);
- Camel triggers a shutdown and this returns, as normal without any
exchange exception check
- The barrier await hits at line
165: berrier.await(endpoint.getBarrierAwaitTimeoutMs(),
TimeUnit.MILLISECONDS);
- This aligns with the rest of the streams and triggers the barrier action,
which in turn performs the commit at line 193: consumer.commitOffsets();

Since any exception from line 148 is suppressed and there's no subsequent
interrupted() or exchange exception check it looks like there's no way to
to signal not to commit and in-flight exchanges are a guaranteed loss.

Does this sound correct?

If so, barring a change from the maintainers I figure I might fork this
code and optionally bypass the consumer.commitOffsets(); during shutdown
 or when an exchange carries an exception after the
processor.process(exchange) call.

Thoughts?

Please let me know if I may provide any additional information. Thanks.

-Regards,
MjK

- - - - -

Michael J. Kitchin
Senior Software Engineer
Operational Systems, Inc.
4450 Arapahoe Avenue, Suite 100
Boulder, CO 80303

Phone: 719-271-6476
Email: michael.kitc...@opsysinc.com
Web: www.opsysinc.com


Questions re: auto-commit and camel-kafka

2015-07-02 Thread mkitchin
Hi there,

These are questions re: the official camel-kafka integration. Since the
issues touch on both camel and kafka I'm sending to both users lists in
search of answers.

- - - -

I have a simple, inonly, point-to-point, synchronous camel route (a)
consuming from kafka topic (consumer), (b) running the resulting exchanges
(messages) through a processor chain and (c) forwarding the outcome on to
somewhere else.

If the runtime dies while exchanges are in this pipeline, I want things to
pick up where they left off when restarted. I'm not picky about seeing the
same data more than once (as long as it's recent), I just can't lose
anything.

In brief, everything's working great except this failure/recovery part --
in-flight exchanges are getting lost and there is no, apparent re-delivery
on restart. My reading of the JMX data suggests the kafka logs are intact.

I think this has to do with consumer auto-commit, which is the default
behavior. My reading of the kafka and camel-kafka docs suggests disabling
auto-commit will give me what I want, but when I try it I'm not seeing
re-delivery kick off when I restart.

So, first question:
(1) Is auto-commit off the key to getting what I want and/or what else
might I need to do?

- - - - -

Meanwhile, on the producer side I'm seeing the first (and only the first)
message apparently get eaten. It's possible it's being buffered, but it
never seems to timeout. There are no error messages on startup and the
camel context, routes, etc. appear to have started successfully. The second
message and everything that follows is golden.

The payloads are ~70-character byte arrays, if it makes a difference.

Second question, then:
(2) Is there a batching setting or something else I might be overlooking
behind this behavior?

- - - - -

Thanks, in advance for your time and consideration. We've been impressed
with kafka so far and are looking forward to employing it in production.


Please let me know if I may provide any additional information. Thanks.


-Regards,
MjK




--
View this message in context: 
http://camel.465427.n5.nabble.com/Questions-re-auto-commit-and-camel-kafka-tp5768832.html
Sent from the Camel - Users mailing list archive at Nabble.com.