Hi Guozang, Thanks for looking into this. I was using 0.11.0.0 version of the library earlier when I was getting the CommitFailed exception and the tasks were terminating. The application config then was Replication Factor = 2, Num Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll Interval = 2147483647. The streams config code (*Streams Config While Using 0.11.0.0*) is given below and the logs of the application while using 0.11.0.0 can be downloaded from https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0
I have upgraded the libraries to 0.11.0.1 and ran into some other issues. Though the CommitFailed error logs are still showing up with 0.11.0.1 the tasks are not getting terminated, but the app quickly runs out of memory (GC overhead limit exceeded) and the CPU is choked, which was not the case earlier. The logs are available @ https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and application config is also given below (*Streams Config While Using 0.11.0.1*). Since I am not sure what configuration helps reduce the CommitFailed error, which I think could be one of the reasons for the CPU choke and eventually cause an OOM, I have gone ahead and used all possible configuration parameters, but still no luck. It would be great if you could shed some light on this as to what could be causing this problem. *Streams Config While Using 0.11.0.0 * val props = Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1000) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Int.MAX_VALUE) streams = KafkaStreams(builder, StreamsConfig(props)) streams.start() *Streams Config While Using 0.11.0.1* val props = Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), 10000) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), 10000) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), true) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 10000) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), Int.MAX_VALUE) props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 30000) streams = KafkaStreams(builder, StreamsConfig(props)) streams.start() Thanks, Tony On Thu, Nov 2, 2017 at 4:39 PM, Tony John <tonyjohnant...@gmail.com> wrote: > Hi All, > > I am facing CommitFailedException in my streams application. As per the > log I tried changing the max.poll.interval.ms and max.poll.records. But > both didn't help. PFA the full stack trace of the exception and below is > the streams configuration used. What else could be wrong? > > val props = Properties() > props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID) > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS) > props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR) > props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2) > props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1) > props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO") > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), > 1000) > props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), > Int.MAX_VALUE) > > streams = KafkaStreams(builder, StreamsConfig(props)) > streams.start() > > > Thanks, > Tony >