Jacob Gur created KAFKA-4799:
--------------------------------

             Summary: session timeout during event processing shuts down stream
                 Key: KAFKA-4799
                 URL: https://issues.apache.org/jira/browse/KAFKA-4799
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.1.1
         Environment: kafka streams client running on os x, with docker machine 
running broker
            Reporter: Jacob Gur
            Priority: Critical


I have a simple stream application like this:

{code:title=Part of my class|borderStyle=solid}
        private <T> IConsumerSubscription buildSubscriptionStream(
                        Class<T> clazz, Consumer<T> consumer, String group,
                        Function<KStreamBuilder, KStream<String, String>> 
topicStreamFunc)
        {
                KStreamBuilder builder = new KStreamBuilder();

                KStream<String, String> stream = topicStreamFunc.apply(builder);
                stream.foreach((k, v) -> {
                        try {
                                T value = 
_jsonObjectMapper.mapFromJsonString(v, clazz);
                                consumer.accept(value);

                                Logger.trace("Consumed message {}", value);

                        } catch (Throwable th) {
                                Logger.warn("Error while consuming message", 
th);
                        }
                });

                final KafkaStreams streams = new KafkaStreams(builder, 
constructProperties(group));
                streams.start();

                return streams::close;
        }
{code}

There is just one client running this application stream.

If I run the client in a debugger with a breakpoint on the event processor 
(i.e., inside the foreach lambda) with debugger suspending all threads for 
perhaps more than 10 seconds, then when I resume the application:
Actual behavior - the stream shuts down
Expected behavior - the stream should recover, perhaps temporarily removed from 
partition but then re-added and recovered.

It looks like what happens is this:
1) The kafka client session times out.
2) The partition is revoked
3) The streams library has a rebalance listener that tries to commit offsets, 
but that commit fails due to a rebalance exception.
4) Stream shuts down.

Steps 3 and 4 occur in StreamThread's rebalance listener.

It seems that it should be more resilient and recover just like a regular 
KafkaConsumer would. Its partition would be revoked, and then it would get it 
back again and resume processing at the last offset.

Is current behavior expected and I'm not understanding the intention? Or is 
this a bug?

Thanks!





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to