Hi Sachin, In 0.10.2.1 we've changed the default value of max.poll.interval.ms (to avoid rebalancing during recovery) as well as the default value of the streams producer retries (to retry during a temporary broker failure). I think you are aware of the changes, but just double checking. You don't need to wait for 0.10.2.1, you can make the changes directly yourself:
final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, ID); ... props.put(ProducerConfig.RETRIES_CONFIG, 10); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); This doesn't address the RocksDB issue though, still looking into that. Thanks Eno > On 9 Apr 2017, at 22:55, Sachin Mittal <sjmit...@gmail.com> wrote: > > Let me try to get the debug log when this error happens. > > Right now we have three instances each with 4 threads consuming from 12 > partition topic. > So one thread per partition. > > The application is running fine much better than before. Now it usually > runs for a week even during peak load. > > Sometime out of blue either rocksdb throws an exception with a single > character (which I guess is a known issue with rocks db fixed in some next > release). > Or the producer gets timed out while committing some changelog topic > record. I had increased the timeout from 30 seconds to 180 seconds, but it > still throws exception for that time also. > > Not sure if these are due to VM issue or network. > > But whenever something like this happens, the application goes into > rebalance and soon things take turn for worse. Soon some of the threads go > into deadlock with above stack trace and application is now in perpetual > rebalance state. > > Only way to resolve this is kill all instances using -9 and restart the > instances one by one. > > So also long as we have a steady state of one thread per partition > everything is working fine. I am still working out a way to limit the > changelog topic size by more aggressive compaction and let me see if that > will make things better. > > I will try to get the logs when this happens next time. > > Thanks > Sachin > > > > On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska <eno.there...@gmail.com> wrote: > >> Hi Sachin, >> >> It's not necessarily a deadlock. Do you have any debug traces from those >> nodes? Also would be useful to know the config (e.g., how many partitions >> do you have and how many app instances.) >> >> Thanks >> Eno >> >>> On 9 Apr 2017, at 04:45, Sachin Mittal <sjmit...@gmail.com> wrote: >>> >>> Hi, >>> In my streams applications cluster in one or more instances I see some >>> threads always waiting with the following stack. >>> >>> Every time I check on jstack I see the following trace. >>> >>> Is this some kind of new deadlock that we have failed to identify. >>> >>> Thanks >>> Sachin >>> >>> here is the stack trace: >>> ------------------------------------------------------------ >> ------------------------------------------------------------ >> ---------------------------------------------- >>> "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf >>> runnable [0x00007fb7cb4f6000] >>> java.lang.Thread.State: RUNNABLE >>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) >>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) >>> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl. >> java:93) >>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) >>> - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3) >>> - locked <0x0000000701c50c88> (a java.util.Collections$ >>> UnmodifiableSet) >>> - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl) >>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) >>> at org.apache.kafka.common.network.Selector.select( >>> Selector.java:489) >>> at org.apache.kafka.common.network.Selector.poll( >> Selector.java:298) >>> at org.apache.kafka.clients.NetworkClient.poll( >>> NetworkClient.java:349) >>> at org.apache.kafka.clients.consumer.internals. >>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) >>> - locked <0x0000000701c5da48> (a org.apache.kafka.clients. >>> consumer.internals.ConsumerNetworkClient) >>> at org.apache.kafka.clients.consumer.internals. >>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) >>> at org.apache.kafka.clients.consumer.internals. >>> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java: >> 138) >>> at org.apache.kafka.clients.consumer.internals.Fetcher. >>> retrieveOffsetsByTimes(Fetcher.java:422) >>> at org.apache.kafka.clients.consumer.internals.Fetcher. >>> resetOffset(Fetcher.java:370) >>> at org.apache.kafka.clients.consumer.internals.Fetcher. >>> resetOffsetsIfNeeded(Fetcher.java:227) >>> at org.apache.kafka.clients.consumer.KafkaConsumer. >>> updateFetchPositions(KafkaConsumer.java:1592) >>> at org.apache.kafka.clients.consumer.KafkaConsumer. >>> position(KafkaConsumer.java:1265) >>> at org.apache.kafka.streams.processor.internals. >>> ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:213) >> >>