Hi Guozang,

Thanks for looking into this. I was using 0.11.0.0 version of the library
earlier when I was getting the CommitFailed exception and the tasks were
terminating. The application config then was Replication Factor = 2, Num
Stream Threads = 1, Consumer Max Poll Records = 1000 & Consumer Max Poll
Interval = 2147483647. The streams config code (*Streams Config While Using
0.11.0.0*) is given below and the logs of the application while using
0.11.0.0 can be downloaded from
https://www.dropbox.com/s/hx1e5mknf9gx5z0/commit_failed_error.log?dl=0

I have upgraded the libraries to 0.11.0.1 and ran into some other issues.
Though the CommitFailed error logs are still showing up with 0.11.0.1 the
tasks are not getting terminated, but the app quickly runs out of memory
(GC overhead limit exceeded) and the CPU is choked, which was not the case
earlier. The logs are available @
https://www.dropbox.com/s/x6oehtuoqrwjj0i/oom_gc_overhead.log?dl=0 and
application config is also given below (*Streams Config While Using
0.11.0.1*).  Since I am not sure what configuration helps reduce the
CommitFailed error, which I think could be one of the reasons for the CPU
choke and eventually cause an OOM, I have gone ahead and used all possible
configuration parameters, but still no luck.

It would be great if you could shed some light on this as to what could be
causing this problem.

*Streams Config While Using 0.11.0.0 *

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
1000)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


*Streams Config While Using 0.11.0.1*

val props = Properties()
props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2)

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
10000)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
10000)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
true)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
10000)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
Int.MAX_VALUE)
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
30000)

streams = KafkaStreams(builder, StreamsConfig(props))
streams.start()


Thanks,
Tony

On Thu, Nov 2, 2017 at 4:39 PM, Tony John <tonyjohnant...@gmail.com> wrote:

> Hi All,
>
> I am facing CommitFailedException in my streams application. As per the
> log I tried changing the max.poll.interval.ms and max.poll.records. But
> both didn't help. PFA the full stack trace of the exception and below is
> the streams configuration used. What else could be wrong?
>
> val props = Properties()
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, EngineConfig.APPLICATION_ID)
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, EngineConfig.KAFKA_SERVERS)
> props.put(StreamsConfig.STATE_DIR_CONFIG, EngineConfig.STATE_STORE_DIR)
> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2)
> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1)
> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "INFO")
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
>  1000)
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
>  Int.MAX_VALUE)
>
> streams = KafkaStreams(builder, StreamsConfig(props))
> streams.start()
>
>
> Thanks,
> Tony
>

Reply via email to