Hi All, Sometimes I used to see below exception before Consumer getting disconnected. I am not sure whether it has any relevance with this issue I am facing.
DEBUG | 05:16:24 | clients.NetworkClient (NetworkClient.java:702) - Node 0 disconnected. DEBUG | 05:16:24 | clients.NetworkClient (NetworkClient.java:702) - Node 1 disconnected. DEBUG | 05:16:24 | network.Selector (Selector.java:399) - Connection with mykafkainstancekafka5101.mydomain.com/34.220.36.96 disconnected java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:87) ~[kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75) ~[kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203) ~[kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167) ~[kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:379) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.common.network.Selector.poll(Selector.java:326) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:314) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.getAllTopicMetadata(Fetcher.java:294) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1410) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:67) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:140) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) [kafka-clients-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [kafka-streams-0.11.0.0.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) [kafka-streams-0.11.0.0.jar:?] DEBUG | 05:16:24 | network.Selector (Selector.java:399) - Connection with l-mykafkainstancekafka5101/172.31.7.209 disconnected java.io.EOFException On Sat, Sep 16, 2017 at 1:04 PM, dev loper <spark...@gmail.com> wrote: > Thank you Ted for your inputs. I have also observed the similarity in > logs. But I am not sure how to resolve the issue. > > @Bill, I repeated my tests by reducing the max MAX_POLL_RECORDS to 500. I > could see the consumer getting started with 500. But the same problem exits > even after the configuration change as well. The consumer for one of the > instance will go down and the whole process repeats which I mentioned in my > previous mail. > > On Sat, Sep 16, 2017 at 9:08 AM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Though there was no LockException in your log, there is still enough >> similarity between your log and the one posted by johnchou: >> >> https://issues.apache.org/jira/browse/KAFKA-5397?focusedComm >> entId=16162689&page=com.atlassian.jira.plugin.system. >> issuetabpanels:comment-tabpanel#comment-16162689 >> >> Even the line numbers match: sendOffsetCommitRequest >> (ConsumerCoordinator.java:725) >> >> Reducing MAX_POLL_RECORDS_CONFIG may lessen the symptom but that would >> not meet your throughput goal. >> >> On Fri, Sep 15, 2017 at 8:09 PM, dev loper <spark...@gmail.com> wrote: >> >>> Hi All , >>> >>> @Bill, >>> >>> I will reduce the MAX_POLL_RECORDS to 500/1000 and I will share the >>> results shortly. >>> >>> @Ted, >>> >>> Yes I reduced MAX_POLL_RECORDS_CONFIG from 50000 to 5000 . It was not a >>> typo . Do you think 50000 is way too high for an kafkaStreams Application >>> ? My Spark application which I was trying to replace with kafka streams >>> was processing 250000 messages per 5 second batch, That was the reason I >>> set 50000 records for MAX_POLL_RECORDS_CONFIG . >>> >>> I don't think I have hit KAFKA-5397since I couldn't find any instance " >>> org.apache.kafka.streams.errors.LockException" in my logs. I could see >>> below exception , but these exceptions are triggered long after the >>> application stopped consuming any messages . >>> >>> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException: >>> stream-thread failed to suspend stream tasks >>> User provided listener org.apache.kafka.streams.proce >>> ssor.internals.StreamThread$RebalanceListener for group >>> myKafka-kafkareplica101Sept08 failed on partition revocation >>> >>> @Damian , I figured out the pattern below, I don't know whether it >>> helps. The streamthread logs which I shared, Did it help?. I >>> >>> >>> I couldn't figure of the reason why the consumers are getting closed >>> while its getting allocated and suddenly stopped without any reason. If you >>> look at the pattern >>> >>> 1) Consumer is getting Created with Config Values Supplied by the >>> Application. >>> 2) Adding Sensors >>> 3) Fetching API Version and Initiating Connections to Kafka Brokers >>> (NetworkClient.java) >>> 4) Sending metadata request and Recorded API Version From Broker >>> (NetworkClient.java) >>> 5) Updated cluster metadata version (Some Incremental verison ) to >>> Cluster >>> 6) Discovered coordinator (One of the kafka Brokers) >>> 7) Initiating connection to coordinator (One of the kafka Brokers) >>> 8) fetching committed offsets for partitions ( >>> ConsumerCoordinator.java:826) >>> 9) Recorded API versions for node >>> 10) Resetting offset for partition( for all the partitions) >>> 11) Handling ListOffsetResponse ( for all the partitions) >>> 12) Removing Sensors ( I couldn't see any exceptions though) >>> 13) consumer.KafkaConsumer (KafkaConsumer.java:1617) - The Kafka >>> consumer has closed. ( >>> I couldn't see any exceptions though, I couldn't figure out the reason >>> why KafkaConsumer Closed , no reasons provided in the logs) >>> 14) The kafka streams application goes back o step 1 and creates the >>> consumer again.( the whole process is repeated but the application 90% of >>> the time never recovers) >>> 15) Since I have introduced the limit for MAX_POLL_RECORDS_CONFIG >>> =60000 (previouslyI nteger.MAXVALUE), I could See below exception after 60 >>> seconds of consumer retry. >>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot >>> be completed since the group has already rebalancedAfter >>> >>> I am not sure what could be wrong on my side and how I can resolve this >>> issue so that my application start processing messages consistently. >>> >>> Thanks >>> Dev >>> >>> On Sat, Sep 16, 2017 at 2:02 AM, Bill Bejeck <b...@confluent.io> wrote: >>> >>>> Hi, >>>> >>>> Could you set MAX_POLL_RECORDS to something lower like 500 or 1000 and >>>> try >>>> again? >>>> >>>> Thanks, >>>> Bill >>>> >>>> On Fri, Sep 15, 2017 at 3:40 PM, dev loper <spark...@gmail.com> wrote: >>>> >>>> > Hi Damian, >>>> > >>>> > I have repeated my tests with slight configuration change. The current >>>> > logs captured for "StreamThread" keyword has more relevant logs when >>>> > compared to logs which i shared previously. I started the application >>>> on >>>> > instances 100,101 and 102 simultaneously with below configuration >>>> > >>>> > 1) Reduced MAX_POLL_RECORDS_CONFIG to 5000 (previously 50000) >>>> > 2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously >>>> nteger.MAXVALUE) >>>> > >>>> > When the application started all three instances started processing >>>> for >>>> > first few minutes everything went well. After that I could see that >>>> > "StreamThread100" error consumer was going for a toss and it started >>>> > closing and creating the consumers for a while exactly with the >>>> pattern of >>>> > logs I mentioned in my previous email and after some time I could see >>>> that >>>> > " StreamThread100" stopped processing messages with below exception >>>> and the >>>> > other two continued processing messages without any issues. >>>> > >>>> > org.apache.kafka.clients.consumer.CommitFailedException: Commit >>>> cannot be >>>> > completed since the group has already rebalanced and assigned the >>>> > partitions to another member. This means that the time between >>>> subsequent >>>> > calls to poll() was longer than the configured max.poll.interval.ms, >>>> > which typically implies that the poll loop is spending too much time >>>> > message processing. You can address this either by increasing the >>>> session >>>> > timeout or by reducing the maximum size of batches returned in poll() >>>> with >>>> > max.poll.records. >>>> > >>>> > I think since the consumers were starting and stopping there was no >>>> poll >>>> > made form the system . Since I reduced Reduced MAX_POLL_RECORDS_CONFIG >>>> > =60000 and the processors were getting closed and started which >>>> might have >>>> > resulted in the "CommitFailedException due to non avialability of >>>> > processing processors. >>>> > >>>> > After some time the issue got propagated to other servers, I have >>>> attached >>>> > the relevant logs with this mail Kindly go through this and let me >>>> know how >>>> > I can solve this issue ? >>>> > >>>> > >>>> > >>>> > >>>> > <https://mail.google.com/mail/?ui=2&ik=6aa5d30a60&view=att&t >>>> h=15e8701650e040b7&attid=0.3&disp=safe&realattid=f_j7m9sld82&zw> >>>> > >>>> > On Fri, Sep 15, 2017 at 10:33 PM, dev loper <spark...@gmail.com> >>>> wrote: >>>> > >>>> >> Hi Ted, >>>> >> >>>> >> What should I be looking in broker logs ? I haven't looked at the >>>> broker >>>> >> side since my spark application processing from the same topic with a >>>> >> different group id is able to process well. >>>> >> >>>> >> On Fri, Sep 15, 2017 at 3:30 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >> >>>> >>> Is there some clue in broker logs ? >>>> >>> >>>> >>> Thanks >>>> >>> >>>> >>> On Thu, Sep 14, 2017 at 11:19 PM, dev loper <spark...@gmail.com> >>>> wrote: >>>> >>> >>>> >>>> Dear Kafka Users, >>>> >>>> >>>> >>>> I am fairly new to Kafka Streams . I have deployed two instances of >>>> >>>> Kafka 0.11 brokers on AWS M3.Xlarge insatnces. I have created a >>>> topic with >>>> >>>> 36 partitions .and speperate application writes to this topic and >>>> it >>>> >>>> produces records at the rate of 10000 messages per second. I have >>>> threes >>>> >>>> instances of AWS M4.xlarge instance where my Kafka streams >>>> application is >>>> >>>> running which consumes these messages produced by the other >>>> application. >>>> >>>> The application starts up fine working fine and its processing >>>> messages on >>>> >>>> the first instance, but when I start the same application on other >>>> >>>> instances it is not starting even though the process is alive it >>>> is not >>>> >>>> processing messages.Also I could see the other instances takes a >>>> long time >>>> >>>> to start . >>>> >>>> >>>> >>>> Apart from first instance, other instances I could see the >>>> consumer >>>> >>>> getting added and removed repeatedly and I couldn't see any message >>>> >>>> processing at all . I have attached the detailed logs where this >>>> behavior >>>> >>>> is observed. >>>> >>>> >>>> >>>> Consumer is getting started with below log in these instances and >>>> >>>> getting stopped with below log (* detailed logs attached *) >>>> >>>> >>>> >>>> INFO | 21:59:30 | consumer.ConsumerConfig >>>> (AbstractConfig.java:223) - >>>> >>>> ConsumerConfig values: >>>> >>>> auto.commit.interval.ms = 5000 >>>> >>>> auto.offset.reset = latest >>>> >>>> bootstrap.servers = [l-mykafkainstancekafka5101:9092, >>>> >>>> l-mykafkainstancekafka5102:9092] >>>> >>>> check.crcs = true >>>> >>>> client.id = >>>> >>>> connections.max.idle.ms = 540000 >>>> >>>> enable.auto.commit = false >>>> >>>> exclude.internal.topics = true >>>> >>>> fetch.max.bytes = 52428800 >>>> >>>> fetch.max.wait.ms = 500 >>>> >>>> fetch.min.bytes = 1 >>>> >>>> group.id = myKafka-kafkareplica101Sept08 >>>> >>>> heartbeat.interval.ms = 3000 >>>> >>>> interceptor.classes = null >>>> >>>> internal.leave.group.on.close = true >>>> >>>> isolation.level = read_uncommitted >>>> >>>> key.deserializer = class mx.july.jmx.proximity.kafka.Ka >>>> fkaKryoCodec >>>> >>>> max.partition.fetch.bytes = 1048576 >>>> >>>> max.poll.interval.ms = 300000 >>>> >>>> max.poll.records = 500 >>>> >>>> metadata.max.age.ms = 300000 >>>> >>>> metric.reporters = [] >>>> >>>> metrics.num.samples = 2 >>>> >>>> metrics.recording.level = INFO >>>> >>>> metrics.sample.window.ms = 30000 >>>> >>>> partition.assignment.strategy = [class >>>> >>>> org.apache.kafka.clients.consumer.RangeAssignor] >>>> >>>> receive.buffer.bytes = 65536 >>>> >>>> reconnect.backoff.max.ms = 1000 >>>> >>>> reconnect.backoff.ms = 50 >>>> >>>> request.timeout.ms = 305000 >>>> >>>> retry.backoff.ms = 100 >>>> >>>> sasl.jaas.config = null >>>> >>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>> >>>> sasl.kerberos.min.time.before.relogin = 60000 >>>> >>>> sasl.kerberos.service.name = null >>>> >>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>> >>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>> >>>> sasl.mechanism = GSSAPI >>>> >>>> security.protocol = PLAINTEXT >>>> >>>> send.buffer.bytes = 131072 >>>> >>>> session.timeout.ms = 10000 >>>> >>>> ssl.cipher.suites = null >>>> >>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>> >>>> ssl.endpoint.identification.algorithm = null >>>> >>>> ssl.key.password = null >>>> >>>> ssl.keymanager.algorithm = SunX509 >>>> >>>> ssl.keystore.location = null >>>> >>>> ssl.keystore.password = null >>>> >>>> ssl.keystore.type = JKS >>>> >>>> ssl.protocol = TLS >>>> >>>> ssl.provider = null >>>> >>>> ssl.secure.random.implementation = null >>>> >>>> ssl.trustmanager.algorithm = PKIX >>>> >>>> ssl.truststore.location = null >>>> >>>> ssl.truststore.password = null >>>> >>>> ssl.truststore.type = JKS >>>> >>>> value.deserializer = class my.dev.MessageUpdateCodec >>>> >>>> >>>> >>>> >>>> >>>> DEBUG | 21:59:30 | consumer.KafkaConsumer >>>> (KafkaConsumer.java:1617) - >>>> >>>> The Kafka consumer has closed. and the whole process repeats. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> Below you can find my startup code for kafkastreams and the >>>> parameters >>>> >>>> which I have configured for starting the kafkastreams application . >>>> >>>> >>>> >>>> private static Properties settings = new Properties(); >>>> >>>> settings.put(StreamsConfig.APPLICATION_ID_CONFIG, >>>> >>>> "mykafkastreamsapplication"); >>>> >>>> settings.put(ConsumerConfig.A >>>> UTO_OFFSET_RESET_CONFIG,"latest"); >>>> >>>> settings.put(ConsumerConfig.H >>>> EARTBEAT_INTERVAL_MS_CONFIG,"10 >>>> >>>> 000"); >>>> >>>> settings.put(ConsumerConfig.S >>>> ESSION_TIMEOUT_MS_CONFIG,"30000"); >>>> >>>> settings.put(ConsumerConfig.M >>>> AX_POLL_INTERVAL_MS_CONFIG,Inte >>>> >>>> ger.MAX_VALUE); >>>> >>>> settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, >>>> "10000"); >>>> >>>> settings.put(ConsumerConfig.C >>>> ONNECTIONS_MAX_IDLE_MS_CONFIG," >>>> >>>> 60000"); >>>> >>>> >>>> >>>> KStreamBuilder builder = new KStreamBuilder(); >>>> >>>> KafkaStreams streams = new KafkaStreams(builder, settings); >>>> >>>> builder.addSource(..... >>>> >>>> .addProcessor ............. >>>> >>>> .addProcessor ........ >>>> >>>> >>>> >>>> .addStateStore(............... >>>> ....).persistent().build(),"my >>>> >>>> processor") >>>> >>>> .addSink .............. >>>> >>>> . addSink .............. >>>> >>>> streams.start(); >>>> >>>> >>>> >>>> and I am using a Simple processor to process my logic .. >>>> >>>> >>>> >>>> public class InfoProcessor extends AbstractProcessor<Key, Update> { >>>> >>>> private static Logger logger = Logger.getLogger(InfoProcessor >>>> .class); >>>> >>>> private ProcessorContext context; >>>> >>>> private KeyValueStore<Key, Info> infoStore; >>>> >>>> >>>> >>>> @Override >>>> >>>> @SuppressWarnings("unchecked") >>>> >>>> public void init(ProcessorContext context) { >>>> >>>> this.context = context; >>>> >>>> this.context.schedule(Constants.BATCH_DURATION_SECONDS * >>>> 1000); >>>> >>>> infoStore = (KeyValueStore<Key, Info>) >>>> >>>> context.getStateStore("InfoStore"); >>>> >>>> } >>>> >>>> >>>> >>>> @Override >>>> >>>> public void process(Key key, Update update) { >>>> >>>> try { >>>> >>>> if (key != null && update != null) { >>>> >>>> Info info = infoStore.get(key); >>>> >>>> // merge logic >>>> >>>> infoStore.put(key, info); >>>> >>>> } >>>> >>>> >>>> >>>> } catch (Exception e) { >>>> >>>> logger.error(e.getMessage(), e); >>>> >>>> } finally { >>>> >>>> } >>>> >>>> context.commit(); >>>> >>>> } >>>> >>>> >>>> >>>> @Override >>>> >>>> public void punctuate(long timestamp) { >>>> >>>> try { >>>> >>>> KeyValueIterator<Key, Info> iter = this.infoStore.all(); >>>> >>>> while (iter.hasNext()) { >>>> >>>> // processing logic >>>> >>>> >>>> >>>> } >>>> >>>> iter.close(); >>>> >>>> context.commit(); >>>> >>>> } catch (Exception e) { >>>> >>>> logger.error(e.getMessage(), e); >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>> >>>> >> >>>> > >>>> >>> >>> >> >