Hi Sachin,

In 0.10.2.1 we've changed the default value of max.poll.interval.ms (to avoid 
rebalancing during recovery) as well as the default value of the streams 
producer retries (to retry during a temporary broker failure). I think you are 
aware of the changes, but just double checking. You don't need to wait for 
0.10.2.1, you can make the changes directly yourself:

final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, ID);
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
Integer.toString(Integer.MAX_VALUE));

This doesn't address the RocksDB issue though, still looking into that.

Thanks
Eno

> On 9 Apr 2017, at 22:55, Sachin Mittal <sjmit...@gmail.com> wrote:
> 
> Let me try to get the debug log when this error happens.
> 
> Right now we have three instances each with 4 threads consuming from 12
> partition topic.
> So one thread per partition.
> 
> The application is running fine much better than before. Now it usually
> runs for a week even during peak load.
> 
> Sometime out of blue either rocksdb throws an exception with a single
> character (which I guess is a known issue with rocks db fixed in some next
> release).
> Or the producer gets timed out while committing some changelog topic
> record. I had increased the timeout from 30 seconds to 180 seconds, but it
> still throws exception for that time also.
> 
> Not sure if these are due to VM issue or network.
> 
> But whenever something like this happens, the application goes into
> rebalance and soon things take turn for worse. Soon some of the threads go
> into deadlock with above stack trace and application is now in perpetual
> rebalance state.
> 
> Only way to resolve this is kill all instances using -9 and restart the
> instances one by one.
> 
> So also long as we have a steady state of one thread per partition
> everything is working fine. I am still working out a way to limit the
> changelog topic size by more aggressive compaction and let me see if that
> will make things better.
> 
> I will try to get the logs when this happens next time.
> 
> Thanks
> Sachin
> 
> 
> 
> On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska <eno.there...@gmail.com> wrote:
> 
>> Hi Sachin,
>> 
>> It's not necessarily a deadlock. Do you have any debug traces from those
>> nodes? Also would be useful to know the config (e.g., how many partitions
>> do you have and how many app instances.)
>> 
>> Thanks
>> Eno
>> 
>>> On 9 Apr 2017, at 04:45, Sachin Mittal <sjmit...@gmail.com> wrote:
>>> 
>>> Hi,
>>> In my streams applications cluster in one or more instances I see some
>>> threads always waiting with the following stack.
>>> 
>>> Every time I check on jstack I see the following trace.
>>> 
>>> Is this some kind of new deadlock that we have failed to identify.
>>> 
>>> Thanks
>>> Sachin
>>> 
>>> here is the stack trace:
>>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ----------------------------------------------
>>> "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf
>>> runnable [0x00007fb7cb4f6000]
>>>  java.lang.Thread.State: RUNNABLE
>>>       at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>       at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>       at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.
>> java:93)
>>>       at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>>>       - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3)
>>>       - locked <0x0000000701c50c88> (a java.util.Collections$
>>> UnmodifiableSet)
>>>       - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl)
>>>       at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>       at org.apache.kafka.common.network.Selector.select(
>>> Selector.java:489)
>>>       at org.apache.kafka.common.network.Selector.poll(
>> Selector.java:298)
>>>       at org.apache.kafka.clients.NetworkClient.poll(
>>> NetworkClient.java:349)
>>>       at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
>>>       - locked <0x0000000701c5da48> (a org.apache.kafka.clients.
>>> consumer.internals.ConsumerNetworkClient)
>>>       at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
>>>       at org.apache.kafka.clients.consumer.internals.
>>> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:
>> 138)
>>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> retrieveOffsetsByTimes(Fetcher.java:422)
>>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> resetOffset(Fetcher.java:370)
>>>       at org.apache.kafka.clients.consumer.internals.Fetcher.
>>> resetOffsetsIfNeeded(Fetcher.java:227)
>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> updateFetchPositions(KafkaConsumer.java:1592)
>>>       at org.apache.kafka.clients.consumer.KafkaConsumer.
>>> position(KafkaConsumer.java:1265)
>>>       at org.apache.kafka.streams.processor.internals.
>>> ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213)
>> 
>> 

Reply via email to