Hi Ted, I will try out 0.11.0.1 version. Is there any upgrade which needs to be done on kafka broker side as well ?
On Sat, Sep 16, 2017 at 5:18 AM, Ted Yu <yuzhih...@gmail.com> wrote: > Hi, > Were you using 0.11.0.0 ? > > I ask this because some related fixes, KAFKA-5167 and KAFKA-5152, are only > in 0.11.0.1 > > Mind trying out 0.11.0.1 release and see whether the problem persists ? > > On Fri, Sep 15, 2017 at 12:52 PM, Ted Yu <yuzhih...@gmail.com> wrote: > > > bq. 1) Reduced MAX_POLL_RECORDS_CONFIG to 5000 (previously 50000) > > > > Was there a typo ? (the two parameters in your email had the same name) > > > > Is it possible that you were hitting KAFKA-5397 ? > > > > On Fri, Sep 15, 2017 at 12:40 PM, dev loper <spark...@gmail.com> wrote: > > > >> Hi Damian, > >> > >> I have repeated my tests with slight configuration change. The current > >> logs captured for "StreamThread" keyword has more relevant logs when > >> compared to logs which i shared previously. I started the application on > >> instances 100,101 and 102 simultaneously with below configuration > >> > >> 1) Reduced MAX_POLL_RECORDS_CONFIG to 5000 (previously 50000) > >> 2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously nteger.MAXVALUE) > >> > >> When the application started all three instances started processing for > >> first few minutes everything went well. After that I could see that > >> "StreamThread100" error consumer was going for a toss and it started > >> closing and creating the consumers for a while exactly with the pattern > of > >> logs I mentioned in my previous email and after some time I could see > that > >> " StreamThread100" stopped processing messages with below exception and > the > >> other two continued processing messages without any issues. > >> > >> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > >> be completed since the group has already rebalanced and assigned the > >> partitions to another member. This means that the time between > subsequent > >> calls to poll() was longer than the configured max.poll.interval.ms, > >> which typically implies that the poll loop is spending too much time > >> message processing. You can address this either by increasing the > session > >> timeout or by reducing the maximum size of batches returned in poll() > with > >> max.poll.records. > >> > >> I think since the consumers were starting and stopping there was no poll > >> made form the system . Since I reduced Reduced MAX_POLL_RECORDS_CONFIG > >> =60000 and the processors were getting closed and started which might > have > >> resulted in the "CommitFailedException due to non avialability of > >> processing processors. > >> > >> After some time the issue got propagated to other servers, I have > >> attached the relevant logs with this mail Kindly go through this and > let me > >> know how I can solve this issue ? > >> > >> > >> > >> > >> <https://mail.google.com/mail/?ui=2&ik=6aa5d30a60&view=att& > th=15e8701650e040b7&attid=0.3&disp=safe&realattid=f_j7m9sld82&zw> > >> > >> On Fri, Sep 15, 2017 at 10:33 PM, dev loper <spark...@gmail.com> wrote: > >> > >>> Hi Ted, > >>> > >>> What should I be looking in broker logs ? I haven't looked at the > broker > >>> side since my spark application processing from the same topic with a > >>> different group id is able to process well. > >>> > >>> On Fri, Sep 15, 2017 at 3:30 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >>> > >>>> Is there some clue in broker logs ? > >>>> > >>>> Thanks > >>>> > >>>> On Thu, Sep 14, 2017 at 11:19 PM, dev loper <spark...@gmail.com> > wrote: > >>>> > >>>>> Dear Kafka Users, > >>>>> > >>>>> I am fairly new to Kafka Streams . I have deployed two instances of > >>>>> Kafka 0.11 brokers on AWS M3.Xlarge insatnces. I have created a > topic with > >>>>> 36 partitions .and speperate application writes to this topic and it > >>>>> produces records at the rate of 10000 messages per second. I have > threes > >>>>> instances of AWS M4.xlarge instance where my Kafka streams > application is > >>>>> running which consumes these messages produced by the other > application. > >>>>> The application starts up fine working fine and its processing > messages on > >>>>> the first instance, but when I start the same application on other > >>>>> instances it is not starting even though the process is alive it is > not > >>>>> processing messages.Also I could see the other instances takes a > long time > >>>>> to start . > >>>>> > >>>>> Apart from first instance, other instances I could see the consumer > >>>>> getting added and removed repeatedly and I couldn't see any message > >>>>> processing at all . I have attached the detailed logs where this > behavior > >>>>> is observed. > >>>>> > >>>>> Consumer is getting started with below log in these instances and > >>>>> getting stopped with below log (* detailed logs attached *) > >>>>> > >>>>> INFO | 21:59:30 | consumer.ConsumerConfig (AbstractConfig.java:223) > - > >>>>> ConsumerConfig values: > >>>>> auto.commit.interval.ms = 5000 > >>>>> auto.offset.reset = latest > >>>>> bootstrap.servers = [l-mykafkainstancekafka5101:9092, > >>>>> l-mykafkainstancekafka5102:9092] > >>>>> check.crcs = true > >>>>> client.id = > >>>>> connections.max.idle.ms = 540000 > >>>>> enable.auto.commit = false > >>>>> exclude.internal.topics = true > >>>>> fetch.max.bytes = 52428800 > >>>>> fetch.max.wait.ms = 500 > >>>>> fetch.min.bytes = 1 > >>>>> group.id = myKafka-kafkareplica101Sept08 > >>>>> heartbeat.interval.ms = 3000 > >>>>> interceptor.classes = null > >>>>> internal.leave.group.on.close = true > >>>>> isolation.level = read_uncommitted > >>>>> key.deserializer = class mx.july.jmx.proximity.kafka.Ka > >>>>> fkaKryoCodec > >>>>> max.partition.fetch.bytes = 1048576 > >>>>> max.poll.interval.ms = 300000 > >>>>> max.poll.records = 500 > >>>>> metadata.max.age.ms = 300000 > >>>>> metric.reporters = [] > >>>>> metrics.num.samples = 2 > >>>>> metrics.recording.level = INFO > >>>>> metrics.sample.window.ms = 30000 > >>>>> partition.assignment.strategy = [class > >>>>> org.apache.kafka.clients.consumer.RangeAssignor] > >>>>> receive.buffer.bytes = 65536 > >>>>> reconnect.backoff.max.ms = 1000 > >>>>> reconnect.backoff.ms = 50 > >>>>> request.timeout.ms = 305000 > >>>>> retry.backoff.ms = 100 > >>>>> sasl.jaas.config = null > >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit > >>>>> sasl.kerberos.min.time.before.relogin = 60000 > >>>>> sasl.kerberos.service.name = null > >>>>> sasl.kerberos.ticket.renew.jitter = 0.05 > >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8 > >>>>> sasl.mechanism = GSSAPI > >>>>> security.protocol = PLAINTEXT > >>>>> send.buffer.bytes = 131072 > >>>>> session.timeout.ms = 10000 > >>>>> ssl.cipher.suites = null > >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > >>>>> ssl.endpoint.identification.algorithm = null > >>>>> ssl.key.password = null > >>>>> ssl.keymanager.algorithm = SunX509 > >>>>> ssl.keystore.location = null > >>>>> ssl.keystore.password = null > >>>>> ssl.keystore.type = JKS > >>>>> ssl.protocol = TLS > >>>>> ssl.provider = null > >>>>> ssl.secure.random.implementation = null > >>>>> ssl.trustmanager.algorithm = PKIX > >>>>> ssl.truststore.location = null > >>>>> ssl.truststore.password = null > >>>>> ssl.truststore.type = JKS > >>>>> value.deserializer = class my.dev.MessageUpdateCodec > >>>>> > >>>>> > >>>>> DEBUG | 21:59:30 | consumer.KafkaConsumer (KafkaConsumer.java:1617) - > >>>>> The Kafka consumer has closed. and the whole process repeats. > >>>>> > >>>>> > >>>>> > >>>>> Below you can find my startup code for kafkastreams and the > parameters > >>>>> which I have configured for starting the kafkastreams application . > >>>>> > >>>>> private static Properties settings = new Properties(); > >>>>> settings.put(StreamsConfig.APPLICATION_ID_CONFIG, > >>>>> "mykafkastreamsapplication"); > >>>>> settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest > >>>>> "); > >>>>> settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10 > >>>>> 000"); > >>>>> settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000 > >>>>> "); > >>>>> settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,Inte > >>>>> ger.MAX_VALUE); > >>>>> settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, > "10000"); > >>>>> settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG," > >>>>> 60000"); > >>>>> > >>>>> KStreamBuilder builder = new KStreamBuilder(); > >>>>> KafkaStreams streams = new KafkaStreams(builder, settings); > >>>>> builder.addSource(..... > >>>>> .addProcessor ............. > >>>>> .addProcessor ........ > >>>>> > >>>>> .addStateStore(............... > ....).persistent().build(),"my > >>>>> processor") > >>>>> .addSink .............. > >>>>> . addSink .............. > >>>>> streams.start(); > >>>>> > >>>>> and I am using a Simple processor to process my logic .. > >>>>> > >>>>> public class InfoProcessor extends AbstractProcessor<Key, Update> { > >>>>> private static Logger logger = Logger.getLogger( > InfoProcessor.class); > >>>>> private ProcessorContext context; > >>>>> private KeyValueStore<Key, Info> infoStore; > >>>>> > >>>>> @Override > >>>>> @SuppressWarnings("unchecked") > >>>>> public void init(ProcessorContext context) { > >>>>> this.context = context; > >>>>> this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000); > >>>>> infoStore = (KeyValueStore<Key, Info>) > >>>>> context.getStateStore("InfoStore"); > >>>>> } > >>>>> > >>>>> @Override > >>>>> public void process(Key key, Update update) { > >>>>> try { > >>>>> if (key != null && update != null) { > >>>>> Info info = infoStore.get(key); > >>>>> // merge logic > >>>>> infoStore.put(key, info); > >>>>> } > >>>>> > >>>>> } catch (Exception e) { > >>>>> logger.error(e.getMessage(), e); > >>>>> } finally { > >>>>> } > >>>>> context.commit(); > >>>>> } > >>>>> > >>>>> @Override > >>>>> public void punctuate(long timestamp) { > >>>>> try { > >>>>> KeyValueIterator<Key, Info> iter = this.infoStore.all(); > >>>>> while (iter.hasNext()) { > >>>>> // processing logic > >>>>> > >>>>> } > >>>>> iter.close(); > >>>>> context.commit(); > >>>>> } catch (Exception e) { > >>>>> logger.error(e.getMessage(), e); > >>>>> } > >>>>> } > >>>>> > >>>>> > >>>>> > >>>>> > >>>>> > >>>> > >>> > >> > > >