Yeah that's a good point, I'm not taking action then. Eno
On Mon, May 8, 2017 at 10:38 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Hey, > > I am not against opening a JIRA, but I am wondering what we should > describe/report there. If I understand the scenario correctly, João uses > a custom RocksDB store and calls seek() in user code land. As it is a > bug in RocksDB that seek takes so long, I am not sure what we could > improve within Streams to prevent this? The only thing I am seeing > right now is that we could reduce `max.poll.interval.ms` that we just > increased to guard against failure for long stat recreation phases. > > Any thoughts? > > > -Matthias > > > On 5/3/17 8:48 AM, João Peixoto wrote: > > That'd be great as I'm not familiar with the protocol there > > On Wed, May 3, 2017 at 8:41 AM Eno Thereska <eno.there...@gmail.com> > wrote: > > > >> Cool, thanks, shall we open a JIRA? > >> > >> Eno > >>> On 3 May 2017, at 16:16, João Peixoto <joao.harti...@gmail.com> wrote: > >>> > >>> Actually I need to apologize, I pasted the wrong issue, I meant to > paste > >>> https://github.com/facebook/rocksdb/issues/261. > >>> > >>> RocksDB did not produce a crash report since it didn't actually crash. > I > >>> performed thread dumps on stale and not-stale instances which revealed > >> the > >>> common behavior and I collect and plot several Kafka metrics, including > >>> "punctuate" durations, therefore I know it took a long time and > >> eventually > >>> finished. > >>> > >>> Joao > >>> > >>> On Wed, May 3, 2017 at 6:22 AM Eno Thereska <eno.there...@gmail.com> > >> wrote: > >>> > >>>> Hi there, > >>>> > >>>> Thanks for double checking. Does RocksDB actually crash or produce a > >> crash > >>>> dump? I’m curious how you know that the issue is > >>>> https://github.com/facebook/rocksdb/issues/1121 < > >>>> https://github.com/facebook/rocksdb/issues/1121>, so just double > >> checking > >>>> with you. > >>>> > >>>> If that’s indeed the case, do you mind opening a JIRA (a copy-paste of > >> the > >>>> below should suffice)? Alternatively let us know and we’ll open it. > >> Sounds > >>>> like we should handle this better. > >>>> > >>>> Thanks, > >>>> Eno > >>>> > >>>> > >>>>> On May 3, 2017, at 5:49 AM, João Peixoto <joao.harti...@gmail.com> > >>>> wrote: > >>>>> > >>>>> I believe I found the root cause of my problem. I seem to have hit > this > >>>>> RocksDB bug https://github.com/facebook/rocksdb/issues/1121 > >>>>> > >>>>> On my stream configuration I have a custom transformer used for > >>>>> deduplicating records, highly inspired in the > >>>>> EventDeduplicationLambdaIntegrationTest > >>>>> < > >>>> > >> https://github.com/confluentinc/examples/blob/3. > 2.x/kafka-streams/src/test/java/io/confluent/examples/streams/ > EventDeduplicationLambdaIntegrationTest.java#L161 > >>>>> > >>>>> but > >>>>> adjusted to my use case, special emphasis on the "punctuate" method. > >>>>> > >>>>> All the stale instances had the main stream thread "RUNNING" the > >>>>> "punctuate" method of this transformer, which in term was running > >> RocksDB > >>>>> "seekToFirst". > >>>>> > >>>>> Also during my debugging one such instance finished the "punctuate" > >>>> method, > >>>>> which took ~11h, exactly the time the instance was stuck for. > >>>>> Changing the backing state store from "persistent" to "inMemory" > solved > >>>> my > >>>>> issue, at least after several days running, no stuck instances. > >>>>> > >>>>> This leads me to ask, shouldn't Kafka detect such a situation fairly > >>>>> quickly? Instead of just stopping polling? My guess is that the > >> heartbeat > >>>>> thread which now is separate continues working fine, since by > >> definition > >>>>> the stream runs a message through the whole pipeline this step > probably > >>>>> just looked like it was VERY slow. Not sure what the best approach > here > >>>>> would be. > >>>>> > >>>>> PS The linked code clearly states "This code is for demonstration > >>>> purposes > >>>>> and was not tested for production usage" so that's on me > >>>>> > >>>>> On Tue, May 2, 2017 at 11:20 AM Matthias J. Sax < > matth...@confluent.io > >>> > >>>>> wrote: > >>>>> > >>>>>> Did you check the logs? Maybe you need to increase log level to > DEBUG > >> to > >>>>>> get some more information. > >>>>>> > >>>>>> Did you double check committed offsets via > >> bin/kafka-consumer-groups.sh? > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 4/28/17 9:22 AM, João Peixoto wrote: > >>>>>>> My stream gets stale after a while and it simply does not receive > any > >>>> new > >>>>>>> messages, aka does not poll. > >>>>>>> > >>>>>>> I'm using Kafka Streams 0.10.2.1 (same happens with 0.10.2.0) and > the > >>>>>>> brokers are running 0.10.1.1. > >>>>>>> > >>>>>>> The stream state is RUNNING and there are no exceptions in the > logs. > >>>>>>> > >>>>>>> Looking at the JMX metrics, the threads are there and running, just > >> not > >>>>>>> doing anything. > >>>>>>> The metric "consumer-coordinator-metrics > > >> heartbeat-response-time-max" > >>>>>>> (The max time taken to receive a response to a heartbeat request) > >> reads > >>>>>>> 43,361 seconds (almost 12 hours) which is consistent with the time > of > >>>> the > >>>>>>> hang. Shouldn't this trigger a failure somehow? > >>>>>>> > >>>>>>> The stream configuration looks something like this: > >>>>>>> > >>>>>>> Properties props = new Properties(); > >>>>>>> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, > >>>>>>> CustomTimestampExtractor.class.getName()); > >>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName); > >>>>>>> props.put(StreamsConfig.CLIENT_ID_CONFIG, streamName); > >>>>>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > >>>>>>> myConfig.getBrokerList()); > >>>>>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > >>>>>>> Serdes.String().getClass().getName()); > >>>>>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > >>>>>>> Serdes.ByteArray().getClass().getName()); > >>>>>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > >>>>>>> myConfig.getCommitIntervalMs()); // 5000 > >>>>>>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, > "DEBUG"); > >>>>>>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, > >>>>>>> myConfig.getStreamThreadsCount()); // 1 > >>>>>>> props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > >>>>>>> myConfig.getMaxCacheBytes()); // 524_288_000L > >>>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > >>>>>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50); > >>>>>>> > >>>>>>> The stream LEFT JOINs 2 topics, one of them being a KTable, and > >> outputs > >>>>>> to > >>>>>>> another topic. > >>>>>>> > >>>>>>> Thanks in advance for the help! > >>>>>>> > >>>>>> > >>>>>> > >>>> > >>>> > >> > >> > > > >