Michal Turek created KAFKA-6558: ----------------------------------- Summary: Pre-load topics metadata in Mirror maker Key: KAFKA-6558 URL: https://issues.apache.org/jira/browse/KAFKA-6558 Project: Kafka Issue Type: Improvement Components: tools Reporter: Michal Turek
Mirror maker starts consumption before topics metadata are loaded from destination Kafka, internally buffers messages in producer which may result in high GC, application slowdown, timeouts and {{OutOfMemoryError}}. We were forced to increase JVM heap from 1 GB to 4 GB today to survive the first few seconds during startup, Mirror maker typically needs about 400 MB long term. The probable issue is that the producer loads topics metadata lazily as part of {{producer.send()}}, the buffering happens in the sending phase. A lot of messages like below appeared in the logs. {noformat} 2018-02-13 10:30:42,524 INFO kafka.tools.MirrorMaker$ [twork-thread | producer-1]: Closing producer due to send failure. 2018-02-13 10:30:42,524 INFO che.kafka.clients.producer.KafkaProducer [twork-thread | producer-1]: Closing the Kafka producer with timeoutMillis = 0 ms. 2018-02-13 10:30:42,524 INFO che.kafka.clients.producer.KafkaProducer [twork-thread | producer-1]: Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms. 2018-02-13 10:30:42,524 ERROR .producer.internals.ErrorLoggingCallback [twork-thread | producer-1]: Error when sending message to topic AA_MASTER_018 with key: 4 bytes, value: 109 bytes with error: java.lang.IllegalStateException: Producer is closed forcefully. at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:522) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:502) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147) at java.lang.Thread.run(Thread.java:748) {noformat} h4. Proposed change - Build Kafka producer client. - *Warm it up by loading topics metadata from destination Kafka.* We use this approach quite successfully in our custom high-throughput application that is based on Kafka producer client. The metadata loading takes about 4 - 5 seconds. - Build Kafka consumer clients and start consumption. {noformat} // Warm up producer Set<String> topicNames = consumer.listTopics().keySet(); topicNames.forEach(topic -> producer.partitionsFor(topic)); {noformat} h4. Our context - Kafka 0.10.2.1 - About 1500 messages/s is typically consumed per Mirror maker process. - Mirror maker was stopped for about 1 hour, all the messages were buffering in the source Kafka. - Mirror maker was unable to start until we increased its JVM heap. {noformat} bootstrap.servers=... group.id=... security.protocol=SSL ssl.truststore.location=.../truststore.jks ssl.truststore.password=... key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer auto.offset.reset=latest enable.auto.commit=false partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor fetch.min.bytes=40960 session.timeout.ms=60000 {noformat} {noformat} bootstrap.servers=... key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer acks=1 retries=1 linger.ms=20 buffer.memory=134217728 batch.size=65536 compression.type=lz4 {noformat} {noformat} kafka.tools.MirrorMaker --new.consumer --num.streams 25 --consumer.config ... --producer.config ... --offset.commit.interval.ms=60000 --whitelist=... {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)