[
https://issues.apache.org/jira/browse/KAFKA-7286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16608362#comment-16608362
]
ASF GitHub Bot commented on KAFKA-7286:
---------------------------------------
hachikuji closed pull request #5500: KAFKA-7286: Avoid getting stuck loading
large metadata records
URL: https://github.com/apache/kafka/pull/5500
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 6bd0a5a0d52..940ec716e36 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -510,7 +510,9 @@ class GroupMetadataManager(brokerId: Int,
case Some(log) =>
var currOffset = log.logStartOffset
- lazy val buffer = ByteBuffer.allocate(config.loadBufferSize)
+
+ // buffer may not be needed if records are read from memory
+ var buffer = ByteBuffer.allocate(0)
// loop breaks if leader changes at any time during the load, since
getHighWatermark is -1
val loadedOffsets = mutable.Map[GroupTopicPartition,
CommitRecordMetadataAndOffset]()
@@ -524,7 +526,20 @@ class GroupMetadataManager(brokerId: Int,
val memRecords = fetchDataInfo.records match {
case records: MemoryRecords => records
case fileRecords: FileRecords =>
- buffer.clear()
+ val sizeInBytes = fileRecords.sizeInBytes
+ val bytesNeeded = Math.max(config.loadBufferSize, sizeInBytes)
+
+ // minOneMessage = true in the above log.read means that the
buffer may need to be grown to ensure progress can be made
+ if (buffer.capacity < bytesNeeded) {
+ if (config.loadBufferSize < bytesNeeded)
+ warn(s"Loaded offsets and group metadata from
$topicPartition with buffer larger ($bytesNeeded bytes) than " +
+ s"configured offsets.load.buffer.size
(${config.loadBufferSize} bytes)")
+
+ buffer = ByteBuffer.allocate(bytesNeeded)
+ } else {
+ buffer.clear()
+ }
+
fileRecords.readInto(buffer, 0)
MemoryRecords.readableRecords(buffer)
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index a3585154451..50d96c30734 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -296,7 +296,8 @@ class TransactionStateManager(brokerId: Int,
warn(s"Attempted to load offsets and group metadata from
$topicPartition, but found no log")
case Some(log) =>
- lazy val buffer =
ByteBuffer.allocate(config.transactionLogLoadBufferSize)
+ // buffer may not be needed if records are read from memory
+ var buffer = ByteBuffer.allocate(0)
// loop breaks if leader changes at any time during the load, since
getHighWatermark is -1
var currOffset = log.logStartOffset
@@ -311,6 +312,19 @@ class TransactionStateManager(brokerId: Int,
val memRecords = fetchDataInfo.records match {
case records: MemoryRecords => records
case fileRecords: FileRecords =>
+ val sizeInBytes = fileRecords.sizeInBytes
+ val bytesNeeded =
Math.max(config.transactionLogLoadBufferSize, sizeInBytes)
+
+ // minOneMessage = true in the above log.read means that the
buffer may need to be grown to ensure progress can be made
+ if (buffer.capacity < bytesNeeded) {
+ if (config.transactionLogLoadBufferSize < bytesNeeded)
+ warn(s"Loaded offsets and group metadata from
$topicPartition with buffer larger ($bytesNeeded bytes) than " +
+ s"configured transaction.state.log.load.buffer.size
(${config.transactionLogLoadBufferSize} bytes)")
+
+ buffer = ByteBuffer.allocate(bytesNeeded)
+ } else {
+ buffer.clear()
+ }
buffer.clear()
fileRecords.readInto(buffer, 0)
MemoryRecords.readableRecords(buffer)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b651549f1e8..9aada23e868 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -656,7 +656,7 @@ object KafkaConfig {
val GroupInitialRebalanceDelayMsDoc = "The amount of time the group
coordinator will wait for more consumers to join a new group before performing
the first rebalance. A longer delay means potentially fewer rebalances, but
increases the time until processing begins."
/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry
associated with an offset commit"
- val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets
segments when loading offsets into the cache."
+ val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets
segments when loading offsets into the cache (soft-limit, overridden if records
are too large)."
val OffsetsTopicReplicationFactorDoc = "The replication factor for the
offsets topic (set higher to ensure availability). " +
"Internal topic creation will fail until the cluster size meets this
replication factor requirement."
val OffsetsTopicPartitionsDoc = "The number of partitions for the offset
commit topic (should not change after deployment)"
@@ -673,7 +673,7 @@ object KafkaConfig {
val TransactionsMaxTimeoutMsDoc = "The maximum allowed timeout for
transactions. " +
"If a client’s requested transaction time exceed this, then the broker
will return an error in InitProducerIdRequest. This prevents a client from too
large of a timeout, which can stall consumers reading from topics included in
the transaction."
val TransactionsTopicMinISRDoc = "Overridden " + MinInSyncReplicasProp + "
config for the transaction topic."
- val TransactionsLoadBufferSizeDoc = "Batch size for reading from the
transaction log segments when loading producer ids and transactions into the
cache."
+ val TransactionsLoadBufferSizeDoc = "Batch size for reading from the
transaction log segments when loading producer ids and transactions into the
cache (soft-limit, overridden if records are too large)."
val TransactionsTopicReplicationFactorDoc = "The replication factor for the
transaction topic (set higher to ensure availability). " +
"Internal topic creation will fail until the cluster size meets this
replication factor requirement."
val TransactionsTopicPartitionsDoc = "The number of partitions for the
transaction topic (should not change after deployment)."
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 21c13658e79..bcc18a41620 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -645,6 +645,38 @@ class GroupMetadataManagerTest {
assertEquals(None, groupMetadataManager.getGroup(groupId))
}
+ @Test
+ def testLoadGroupWithLargeGroupMetadataRecord() {
+ val groupMetadataTopicPartition = groupTopicPartition
+ val startOffset = 15L
+ val committedOffsets = Map(
+ new TopicPartition("foo", 0) -> 23L,
+ new TopicPartition("foo", 1) -> 455L,
+ new TopicPartition("bar", 0) -> 8992L
+ )
+
+ // create a GroupMetadata record larger then offsets.load.buffer.size
(here at least 16 bytes larger)
+ val assignmentSize = OffsetConfig.DefaultLoadBufferSize + 16
+ val memberId = "98098230493"
+
+ val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
+ protocolType = "consumer", protocol = "range", memberId, assignmentSize)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+ offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+ expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+ EasyMock.replay(replicaManager)
+
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _
=> ())
+
+ val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group
was not loaded into the cache"))
+ committedOffsets.foreach { case (topicPartition, offset) =>
+ assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+ }
+ }
+
@Test
def testOffsetWriteAfterGroupRemoved(): Unit = {
// this test case checks the following scenario:
@@ -1606,7 +1638,7 @@ class GroupMetadataManagerTest {
val apiVersion = KAFKA_1_1_IV0
val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets,
apiVersion = apiVersion, retentionTime = Some(100))
val memberId = "98098230493"
- val groupMetadataRecord = buildStableGroupRecordWithMember(generation,
protocolType, protocol, memberId, apiVersion)
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation,
protocolType, protocol, memberId, apiVersion = apiVersion)
val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
@@ -1711,13 +1743,14 @@ class GroupMetadataManagerTest {
protocolType: String,
protocol: String,
memberId: String,
+ assignmentSize: Int = 0,
apiVersion: ApiVersion =
ApiVersion.latestVersion): SimpleRecord = {
val memberProtocols = List((protocol, Array.emptyByteArray))
val member = new MemberMetadata(memberId, groupId, "clientId",
"clientHost", 30000, 10000, protocolType, memberProtocols)
val group = GroupMetadata.loadGroup(groupId, Stable, generation,
protocolType, protocol, memberId,
if (apiVersion >= KAFKA_2_1_IV0) Some(time.milliseconds()) else None,
Seq(member), time)
val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
- val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group,
Map(memberId -> Array.empty[Byte]), apiVersion)
+ val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group,
Map(memberId -> new Array[Byte](assignmentSize)), apiVersion)
new SimpleRecord(groupMetadataKey, groupMetadataValue)
}
@@ -1754,6 +1787,8 @@ class GroupMetadataManagerTest {
EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
.andReturn(FetchDataInfo(LogOffsetMetadata(startOffset),
fileRecordsMock))
+
EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
+
val bufferCapture = EasyMock.newCapture[ByteBuffer]
fileRecordsMock.readInto(EasyMock.capture(bufferCapture),
EasyMock.anyInt())
EasyMock.expectLastCall().andAnswer(new IAnswer[Unit] {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index 873b88d1104..060e07e7327 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -259,6 +259,8 @@ class TransactionCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurren
EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
.andReturn(FetchDataInfo(LogOffsetMetadata(startOffset),
fileRecordsMock))
+
EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
+
val bufferCapture = EasyMock.newCapture[ByteBuffer]
fileRecordsMock.readInto(EasyMock.capture(bufferCapture),
EasyMock.anyInt())
EasyMock.expectLastCall().andAnswer(new IAnswer[Unit] {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 34b82d9ea55..74bbe336b3c 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -586,6 +586,8 @@ class TransactionStateManagerTest {
EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
.andReturn(FetchDataInfo(LogOffsetMetadata(startOffset),
fileRecordsMock))
+
EasyMock.expect(fileRecordsMock.sizeInBytes()).andStubReturn(records.sizeInBytes)
+
val bufferCapture = EasyMock.newCapture[ByteBuffer]
fileRecordsMock.readInto(EasyMock.capture(bufferCapture),
EasyMock.anyInt())
EasyMock.expectLastCall().andAnswer(new IAnswer[Unit] {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Loading offsets and group metadata hangs with large group metadata records
> --------------------------------------------------------------------------
>
> Key: KAFKA-7286
> URL: https://issues.apache.org/jira/browse/KAFKA-7286
> Project: Kafka
> Issue Type: Bug
> Reporter: Flavien Raynaud
> Assignee: Flavien Raynaud
> Priority: Minor
>
> When a (Kafka-based) consumer group contains many members, group metadata
> records (in the {{__consumer-offsets}} topic) may happen to be quite large.
> Increasing the {{message.max.bytes}} makes storing these records possible.
> Loading them when a broker restart is done via
> [doLoadGroupsAndOffsets|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L504].
> However, this method relies on the {{offsets.load.buffer.size}}
> configuration to create a
> [buffer|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L513]
> that will contain the records being loaded.
> If a group metadata record is too large for this buffer, the loading method
> will get stuck trying to load records (in a tight loop) into a buffer that
> cannot accommodate a single record.
> ----
> For example, if the {{__consumer-offsets-9}} partition contains a record
> smaller than {{message.max.bytes}} but larger than
> {{offsets.load.buffer.size}}, logs would indicate the following:
> {noformat}
> ...
> [2018-08-13 21:00:21,073] INFO [GroupMetadataManager brokerId=0] Scheduling
> loading of offsets and group metadata from __consumer_offsets-9
> (kafka.coordinator.group.GroupMetadataManager)
> ...
> {noformat}
> But logs will never contain the expected {{Finished loading offsets and group
> metadata from ...}} line.
> Consumers whose group are assigned to this partition will see {{Marking the
> coordinator dead}} and will never be able to stabilize and make progress.
> ----
> From what I could gather in the code, it seems that:
> -
> [fetchDataInfo|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L522]
> returns at least one record (even if larger than
> {{offsets.load.buffer.size}}, thanks to {{minOneMessage = true}})
> - No fully-readable record is stored in the buffer with
> [fileRecords.readInto(buffer,
> 0)|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L528]
> (too large to fit in the buffer)
> -
> [memRecords.batches|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L532]
> returns an empty iterator
> -
> [currOffset|https://github.com/apache/kafka/blob/418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L590]
> never advances, hence loading the partition hangs forever.
> ----
> It would be great to let the partition load even if a record is larger than
> the configured {{offsets.load.buffer.size}} limit. The fact that
> {{minOneMessage = true}} when reading records seems to indicate it might be a
> good idea for the buffer to accommodate at least one record.
> If you think the limit should stay a hard limit, then at least adding a log
> line indicating {{offsets.load.buffer.size}} is not large enough and should
> be increased. Otherwise, one can only guess and dig through the code to
> figure out what is happening :)
> I will try to open a PR with the first idea (allowing large records to be
> read when needed) soon, but any feedback from anyone who also had the same
> issue in the past would be appreciated :)
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)