artemlivshits commented on code in PR #14774:
URL: https://github.com/apache/kafka/pull/14774#discussion_r1419832287
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -976,6 +975,231 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ /**
+ * Append messages to offsets topic, and wait for them to be replicated to
other replicas;
+ * the callback function will be triggered either when timeout or the
required acks are satisfied;
+ * if the callback function itself is already synchronized on some object
then pass this object to avoid deadlock.
+ *
+ * Noted that all pending delayed check operations are stored in a queue.
All callers to ReplicaManager.appendRecords()
+ * are expected to call ActionQueue.tryCompleteActions for all affected
partitions, without holding any conflicting
+ * locks.
+ *
+ * @param timeout maximum time we will wait to append
before returning
+ * @param requiredAcks number of replicas who must
acknowledge the append before sending the response
+ * @param internalTopicsAllowed boolean indicating whether internal
topics can be appended to
+ * @param origin source of the append request (ie,
client, replication, coordinator)
+ * @param entriesPerPartition the records per partition to be
appended
+ * @param responseCallback callback for sending the response
+ * @param delayedProduceLock lock for the delayed actions
+ * @param recordValidationStatsCallback callback for updating stats on
record conversions
+ * @param requestLocal container for the stateful instances
scoped to this request -- this must correspond to the
+ * thread calling this method
+ * @param actionQueue the action queue to use.
ReplicaManager#defaultActionQueue is used by default.
+ * @param verificationGuards the mapping from topic partition to
verification guards if transaction verification is used
+ * @param preAppendErrors the mapping from topic partition to
LogAppendResult for errors that occurred before appending
+ */
+ def appendForGroup(timeout: Long,
Review Comment:
We should also add a comment that would reflect 2 points:
1. (For the maintainer of this code) -- this code must not return until the
local write is done, it is an important invariant that the callers rely upon.
Otherwise it looks like a generic async call that can return and continue
asynchronously at any point. This way, if an additional async stage is
required in this function before the the local write is complete, the
maintainer would know to hunt down all usages of this function and figure out
the correct action.
2. (For the caller of this code) -- a quick example of the full workflow of
how the caller should use this method: call
maybeStartTransactionVerificationForPartition with a callback that would call
this method.
##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -339,156 +340,183 @@ class GroupMetadataManager(brokerId: Int,
delayedProduceLock = Some(group.lock),
responseCallback = callback,
requestLocal = requestLocal,
- transactionalId = transactionalId)
+ verificationGuards = verificationGuards,
+ preAppendErrors = preAppendErrors)
+ }
+
+ def generateOffsetRecords(magicValue: Byte,
+ isTxnOffsetCommit: Boolean,
+ groupId: String,
+ offsetTopicPartition: TopicPartition,
+ filteredOffsetMetadata: Map[TopicIdPartition,
OffsetAndMetadata],
+ producerId: Long,
+ producerEpoch: Short): Map[TopicPartition,
MemoryRecords] = {
+ // We always use CREATE_TIME, like the producer. The conversion to
LOG_APPEND_TIME (if necessary) happens automatically.
+ val timestampType = TimestampType.CREATE_TIME
+ val timestamp = time.milliseconds()
+
+ val records = filteredOffsetMetadata.map { case (topicIdPartition,
offsetAndMetadata) =>
+ val key = GroupMetadataManager.offsetCommitKey(groupId,
topicIdPartition.topicPartition)
+ val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata,
interBrokerProtocolVersion)
+ new SimpleRecord(timestamp, key, value)
+ }
+ val buffer =
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue,
compressionType, records.asJava))
+
+ if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
+ throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting to
make a transaction offset commit with an invalid magic: " + magicValue)
+
+ val builder = MemoryRecords.builder(buffer, magicValue, compressionType,
timestampType, 0L, time.milliseconds(),
+ producerId, producerEpoch, 0, isTxnOffsetCommit,
RecordBatch.NO_PARTITION_LEADER_EPOCH)
+
+ records.foreach(builder.append)
+ Map(offsetTopicPartition -> builder.build())
+ }
+
+ def createPutCacheCallback(isTxnOffsetCommit: Boolean,
+ group: GroupMetadata,
+ consumerId: String,
+ offsetMetadata: immutable.Map[TopicIdPartition,
OffsetAndMetadata],
+ filteredOffsetMetadata: Map[TopicIdPartition,
OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicIdPartition,
Errors] => Unit,
+ producerId: Long = RecordBatch.NO_PRODUCER_ID,
+ records: Map[TopicPartition, MemoryRecords],
+ preAppendErrors: Map[TopicPartition,
LogAppendResult] = Map.empty): Map[TopicPartition, PartitionResponse] => Unit =
{
+ val offsetTopicPartition = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
+ // set the callback function to insert offsets into cache after log append
completed
+ def putCacheCallback(responseStatus: Map[TopicPartition,
PartitionResponse]): Unit = {
+ // the append response should only contain the topics partition
+ if (responseStatus.size != 1 ||
!responseStatus.contains(offsetTopicPartition))
+ throw new IllegalStateException("Append status %s should only have one
partition %s"
+ .format(responseStatus, offsetTopicPartition))
+
+ // construct the commit response status and insert
+ // the offset and metadata to cache if the append status has no error
+ val status = responseStatus(offsetTopicPartition)
+
+ val responseError = group.inLock {
+ if (status.error == Errors.NONE) {
+ if (!group.is(Dead)) {
+ filteredOffsetMetadata.forKeyValue { (topicIdPartition,
offsetAndMetadata) =>
+ if (isTxnOffsetCommit)
+ group.onTxnOffsetCommitAppend(producerId, topicIdPartition,
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+ else
+ group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+ }
+ }
+
+ // Record the number of offsets committed to the log
+ offsetCommitsSensor.record(records.size)
+
+ Errors.NONE
+ } else {
+ if (!group.is(Dead)) {
+ if (!group.hasPendingOffsetCommitsFromProducer(producerId))
+ removeProducerGroup(producerId, group.groupId)
+ filteredOffsetMetadata.forKeyValue { (topicIdPartition,
offsetAndMetadata) =>
+ if (isTxnOffsetCommit)
+ group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
+ else
+ group.failPendingOffsetWrite(topicIdPartition,
offsetAndMetadata)
+ }
+ }
+
+ debug(s"Offset commit $filteredOffsetMetadata from group
${group.groupId}, consumer $consumerId " +
+ s"with generation ${group.generationId} failed when appending to
log due to ${status.error.exceptionName}")
+
+ // transform the log append error code to the corresponding the
commit status error code
+ status.error match {
+ case Errors.UNKNOWN_TOPIC_OR_PARTITION
+ | Errors.NOT_ENOUGH_REPLICAS
+ | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
+ Errors.COORDINATOR_NOT_AVAILABLE
+
+ case Errors.NOT_LEADER_OR_FOLLOWER
+ | Errors.KAFKA_STORAGE_ERROR =>
+ Errors.NOT_COORDINATOR
+
+ case Errors.MESSAGE_TOO_LARGE
+ | Errors.RECORD_LIST_TOO_LARGE
+ | Errors.INVALID_FETCH_SIZE =>
+ Errors.INVALID_COMMIT_OFFSET_SIZE
+
+ case other => other
+ }
+ }
+ }
+
+ // compute the final error codes for the commit response
+ val commitStatus = offsetMetadata.map { case (topicIdPartition,
offsetAndMetadata) =>
+ if (!validateOffsetMetadataLength(offsetAndMetadata.metadata))
+ (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
+ else if (preAppendErrors.contains(topicIdPartition.topicPartition))
+ (topicIdPartition,
preAppendErrors(topicIdPartition.topicPartition).error)
+ else
+ (topicIdPartition, responseError)
+ }
+
+ // finally trigger the callback logic passed from the API layer
+ responseCallback(commitStatus)
+ }
+ putCacheCallback
}
/**
* Store offsets by appending it to the replicated log and then inserting to
cache
*/
def storeOffsets(group: GroupMetadata,
consumerId: String,
+ offsetTopicPartition: TopicPartition,
offsetMetadata: immutable.Map[TopicIdPartition,
OffsetAndMetadata],
responseCallback: immutable.Map[TopicIdPartition, Errors]
=> Unit,
- transactionalId: String = null,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
- requestLocal: RequestLocal = RequestLocal.NoCaching): Unit
= {
- // first filter out partitions with offset metadata size exceeding limit
- val filteredOffsetMetadata = offsetMetadata.filter { case (_,
offsetAndMetadata) =>
- validateOffsetMetadataLength(offsetAndMetadata.metadata)
- }
-
+ requestLocal: RequestLocal = RequestLocal.NoCaching,
+ verificationGuard: Option[VerificationGuard]): Unit = {
group.inLock {
if (!group.hasReceivedConsistentOffsetCommits)
warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has
received offset commits from consumers as well " +
s"as transactional producers. Mixing both types of offset commits
will generally result in surprises and " +
s"should be avoided.")
}
- val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
- // construct the message set to append
+ val filteredOffsetMetadata = offsetMetadata.filter { case (_,
offsetAndMetadata) =>
+ validateOffsetMetadataLength(offsetAndMetadata.metadata)
+ }
if (filteredOffsetMetadata.isEmpty) {
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (k, _) => k ->
Errors.OFFSET_METADATA_TOO_LARGE }
responseCallback(commitStatus)
- } else {
- getMagic(partitionFor(group.groupId)) match {
- case Some(magicValue) =>
- // We always use CREATE_TIME, like the producer. The conversion to
LOG_APPEND_TIME (if necessary) happens automatically.
- val timestampType = TimestampType.CREATE_TIME
- val timestamp = time.milliseconds()
-
- val records = filteredOffsetMetadata.map { case (topicIdPartition,
offsetAndMetadata) =>
- val key = GroupMetadataManager.offsetCommitKey(group.groupId,
topicIdPartition.topicPartition)
- val value =
GroupMetadataManager.offsetCommitValue(offsetAndMetadata,
interBrokerProtocolVersion)
- new SimpleRecord(timestamp, key, value)
- }
- val offsetTopicPartition = new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
- val buffer =
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue,
compressionType, records.asJava))
-
- if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2)
- throw Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.exception("Attempting
to make a transaction offset commit with an invalid magic: " + magicValue)
-
- val builder = MemoryRecords.builder(buffer, magicValue,
compressionType, timestampType, 0L, time.milliseconds(),
- producerId, producerEpoch, 0, isTxnOffsetCommit,
RecordBatch.NO_PARTITION_LEADER_EPOCH)
-
- records.foreach(builder.append)
- val entries = Map(offsetTopicPartition -> builder.build())
-
- // set the callback function to insert offsets into cache after log
append completed
- def putCacheCallback(responseStatus: Map[TopicPartition,
PartitionResponse]): Unit = {
- // the append response should only contain the topics partition
- if (responseStatus.size != 1 ||
!responseStatus.contains(offsetTopicPartition))
- throw new IllegalStateException("Append status %s should only
have one partition %s"
- .format(responseStatus, offsetTopicPartition))
-
- // construct the commit response status and insert
- // the offset and metadata to cache if the append status has no
error
- val status = responseStatus(offsetTopicPartition)
-
- val responseError = group.inLock {
- if (status.error == Errors.NONE) {
- if (!group.is(Dead)) {
- filteredOffsetMetadata.forKeyValue { (topicIdPartition,
offsetAndMetadata) =>
- if (isTxnOffsetCommit)
- group.onTxnOffsetCommitAppend(producerId,
topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset),
offsetAndMetadata))
- else
- group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
- }
- }
-
- // Record the number of offsets committed to the log
- offsetCommitsSensor.record(records.size)
-
- Errors.NONE
- } else {
- if (!group.is(Dead)) {
- if (!group.hasPendingOffsetCommitsFromProducer(producerId))
- removeProducerGroup(producerId, group.groupId)
- filteredOffsetMetadata.forKeyValue { (topicIdPartition,
offsetAndMetadata) =>
- if (isTxnOffsetCommit)
- group.failPendingTxnOffsetCommit(producerId,
topicIdPartition)
- else
- group.failPendingOffsetWrite(topicIdPartition,
offsetAndMetadata)
- }
- }
-
- debug(s"Offset commit $filteredOffsetMetadata from group
${group.groupId}, consumer $consumerId " +
- s"with generation ${group.generationId} failed when
appending to log due to ${status.error.exceptionName}")
-
- // transform the log append error code to the corresponding
the commit status error code
- status.error match {
- case Errors.UNKNOWN_TOPIC_OR_PARTITION
- | Errors.NOT_ENOUGH_REPLICAS
- | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
- Errors.COORDINATOR_NOT_AVAILABLE
-
- case Errors.NOT_LEADER_OR_FOLLOWER
- | Errors.KAFKA_STORAGE_ERROR =>
- Errors.NOT_COORDINATOR
-
- case Errors.MESSAGE_TOO_LARGE
- | Errors.RECORD_LIST_TOO_LARGE
- | Errors.INVALID_FETCH_SIZE =>
- Errors.INVALID_COMMIT_OFFSET_SIZE
-
- case other => other
- }
- }
- }
-
- // compute the final error codes for the commit response
- val commitStatus = offsetMetadata.map { case (topicIdPartition,
offsetAndMetadata) =>
- if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
- (topicIdPartition, responseError)
- else
- (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
- }
+ return
+ }
- // finally trigger the callback logic passed from the API layer
- responseCallback(commitStatus)
- }
+ val magicOpt = getMagic(partitionFor(group.groupId))
+ if (magicOpt.isEmpty) {
+ val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
+ (topicIdPartition, Errors.NOT_COORDINATOR)
+ }
+ responseCallback(commitStatus)
+ return
+ }
- if (isTxnOffsetCommit) {
- group.inLock {
- addProducerGroup(producerId, group.groupId)
- group.prepareTxnOffsetCommit(producerId, offsetMetadata)
- }
- } else {
- group.inLock {
- group.prepareOffsetCommit(offsetMetadata)
- }
- }
+ val isTxnOffsetCommit = producerId != RecordBatch.NO_PRODUCER_ID
+ val records = generateOffsetRecords(magicOpt.get, isTxnOffsetCommit,
group.groupId, offsetTopicPartition, filteredOffsetMetadata, producerId,
producerEpoch)
+ val putCacheCallback = createPutCacheCallback(isTxnOffsetCommit, group,
consumerId, offsetMetadata, filteredOffsetMetadata, responseCallback,
producerId, records)
- appendForGroup(group, entries, requestLocal, putCacheCallback,
transactionalId)
+ val verificationGuards = verificationGuard match {
+ case Some(guard) => Map(offsetTopicPartition -> guard)
+ case None => Map.empty[TopicPartition, VerificationGuard]
+ }
- case None =>
- val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
- (topicIdPartition, Errors.NOT_COORDINATOR)
- }
- responseCallback(commitStatus)
+ group.inLock {
Review Comment:
I don't think we can just wrap this method in the lock to show the proper
intent -- the intent is that the lock must be already held by the caller
because the caller does some validation under the lock as well, and the
atomicity of that validation needs to be preserved across local write. Note
this is not the correctness issue (the lock is already taken outside, so any
random locking pattern works), it's a comment about how to make the code
maintenance better. The atomicity requirement is absolutely non-obvious and
required a lot of effort from multiple people to figure out and I think this
effort is not reflected in this change in any way -- the code got re-arranged
into some form that makes it work, but the underlying issue (unclear and
confusing atomicity invariants) is not addressed.
So I'd do 3 things:
1. Remove explicit locking.
2. Add a comment in the Java doc stating a requirement that this function
must be called under the lock.
3. Add a comment near the appendForGroup call that we rely on it not
returning until the local append is done to preserve atomicity protected by the
lock.
If it wasn't soon-to-be-dead code I'd probably do more with naming
conventions and asserts, but in this situation adding proper comments should be
good and easy.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]