I have done these changes and also set ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000 Now producer sometimes fails after 3 minutes earlier it used to fail at 30 seconds (default value).
So I was wondering what would be the reason of the same and how high should this value go. Thanks Sachin On Mon, Apr 10, 2017 at 5:00 PM, Eno Thereska <eno.there...@gmail.com> wrote: > Hi Sachin, > > In 0.10.2.1 we've changed the default value of max.poll.interval.ms (to > avoid rebalancing during recovery) as well as the default value of the > streams producer retries (to retry during a temporary broker failure). I > think you are aware of the changes, but just double checking. You don't > need to wait for 0.10.2.1, you can make the changes directly yourself: > > final Properties props = new Properties(); > props.put(StreamsConfig.APPLICATION_ID_CONFIG, ID); > ... > props.put(ProducerConfig.RETRIES_CONFIG, 10); > props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, > Integer.toString(Integer.MAX_VALUE)); > > This doesn't address the RocksDB issue though, still looking into that. > > Thanks > Eno > > > On 9 Apr 2017, at 22:55, Sachin Mittal <sjmit...@gmail.com> wrote: > > > > Let me try to get the debug log when this error happens. > > > > Right now we have three instances each with 4 threads consuming from 12 > > partition topic. > > So one thread per partition. > > > > The application is running fine much better than before. Now it usually > > runs for a week even during peak load. > > > > Sometime out of blue either rocksdb throws an exception with a single > > character (which I guess is a known issue with rocks db fixed in some > next > > release). > > Or the producer gets timed out while committing some changelog topic > > record. I had increased the timeout from 30 seconds to 180 seconds, but > it > > still throws exception for that time also. > > > > Not sure if these are due to VM issue or network. > > > > But whenever something like this happens, the application goes into > > rebalance and soon things take turn for worse. Soon some of the threads > go > > into deadlock with above stack trace and application is now in perpetual > > rebalance state. > > > > Only way to resolve this is kill all instances using -9 and restart the > > instances one by one. > > > > So also long as we have a steady state of one thread per partition > > everything is working fine. I am still working out a way to limit the > > changelog topic size by more aggressive compaction and let me see if that > > will make things better. > > > > I will try to get the logs when this happens next time. > > > > Thanks > > Sachin > > > > > > > > On Sun, Apr 9, 2017 at 6:05 PM, Eno Thereska <eno.there...@gmail.com> > wrote: > > > >> Hi Sachin, > >> > >> It's not necessarily a deadlock. Do you have any debug traces from those > >> nodes? Also would be useful to know the config (e.g., how many > partitions > >> do you have and how many app instances.) > >> > >> Thanks > >> Eno > >> > >>> On 9 Apr 2017, at 04:45, Sachin Mittal <sjmit...@gmail.com> wrote: > >>> > >>> Hi, > >>> In my streams applications cluster in one or more instances I see some > >>> threads always waiting with the following stack. > >>> > >>> Every time I check on jstack I see the following trace. > >>> > >>> Is this some kind of new deadlock that we have failed to identify. > >>> > >>> Thanks > >>> Sachin > >>> > >>> here is the stack trace: > >>> ------------------------------------------------------------ > >> ------------------------------------------------------------ > >> ---------------------------------------------- > >>> "StreamThread-4" #20 prio=5 os_prio=0 tid=0x00007fb814be3000 nid=0x19bf > >>> runnable [0x00007fb7cb4f6000] > >>> java.lang.Thread.State: RUNNABLE > >>> at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > >>> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > >>> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl. > >> java:93) > >>> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > >>> - locked <0x0000000701c50c98> (a sun.nio.ch.Util$3) > >>> - locked <0x0000000701c50c88> (a java.util.Collections$ > >>> UnmodifiableSet) > >>> - locked <0x0000000701c4f6a8> (a sun.nio.ch.EPollSelectorImpl) > >>> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > >>> at org.apache.kafka.common.network.Selector.select( > >>> Selector.java:489) > >>> at org.apache.kafka.common.network.Selector.poll( > >> Selector.java:298) > >>> at org.apache.kafka.clients.NetworkClient.poll( > >>> NetworkClient.java:349) > >>> at org.apache.kafka.clients.consumer.internals. > >>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) > >>> - locked <0x0000000701c5da48> (a org.apache.kafka.clients. > >>> consumer.internals.ConsumerNetworkClient) > >>> at org.apache.kafka.clients.consumer.internals. > >>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203) > >>> at org.apache.kafka.clients.consumer.internals. > >>> ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java: > >> 138) > >>> at org.apache.kafka.clients.consumer.internals.Fetcher. > >>> retrieveOffsetsByTimes(Fetcher.java:422) > >>> at org.apache.kafka.clients.consumer.internals.Fetcher. > >>> resetOffset(Fetcher.java:370) > >>> at org.apache.kafka.clients.consumer.internals.Fetcher. > >>> resetOffsetsIfNeeded(Fetcher.java:227) > >>> at org.apache.kafka.clients.consumer.KafkaConsumer. > >>> updateFetchPositions(KafkaConsumer.java:1592) > >>> at org.apache.kafka.clients.consumer.KafkaConsumer. > >>> position(KafkaConsumer.java:1265) > >>> at org.apache.kafka.streams.processor.internals. > >>> ProcessorStateManager.restoreActiveState(ProcessorStateManager.java: > 213) > >> > >> > >