lucasbru commented on code in PR #20762:
URL: https://github.com/apache/kafka/pull/20762#discussion_r2463850367


##########
core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala:
##########
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, 
StreamsGroupHeartbeatResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{StreamsGroupHeartbeatRequest, 
StreamsGroupHeartbeatResponse}
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.common.config.ConfigResource
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertNull, assertTrue}
+
+import java.util.Collections
+import scala.jdk.CollectionConverters._
+
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  serverProperties = Array(
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer,streams")
+  )
+)
+class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest
+  def testInternalTopicsCreation(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-member-1"
+    val groupId = "test-group"
+    val inputTopicName = "input-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create input topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = inputTopicName,
+        numPartitions = 2
+      )
+
+      // Wait for topics to be available
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(inputTopicName)
+      }, msg = s"Input topic $inputTopicName is not available")
+
+      // Create topology with internal topics (changelog and repartition 
topics)
+      val topology = createTopologyWithInternalTopics(inputTopicName, groupId)
+
+      // Send heartbeat with topology containing internal topics
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {

Review Comment:
   In the base class, there is a helper method `streamsGroupHeartbeat`, I think 
you may have defined it yourself. Couldn't we reuse it here?



##########
core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala:
##########
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, 
StreamsGroupHeartbeatResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{StreamsGroupHeartbeatRequest, 
StreamsGroupHeartbeatResponse}
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.common.config.ConfigResource
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertNull, assertTrue}
+
+import java.util.Collections
+import scala.jdk.CollectionConverters._
+
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  serverProperties = Array(
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer,streams")
+  )
+)
+class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest
+  def testInternalTopicsCreation(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-member-1"
+    val groupId = "test-group"
+    val inputTopicName = "input-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create input topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = inputTopicName,
+        numPartitions = 2
+      )
+
+      // Wait for topics to be available
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(inputTopicName)
+      }, msg = s"Input topic $inputTopicName is not available")
+
+      // Create topology with internal topics (changelog and repartition 
topics)
+      val topology = createTopologyWithInternalTopics(inputTopicName, groupId)
+
+      // Send heartbeat with topology containing internal topics
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(0)
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().activeTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setStandbyTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().standbyTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setWarmupTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().warmupTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
+        streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code()
+      }, "StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Verify the heartbeat was successful
+      assert(streamsGroupHeartbeatResponse != null, 
"StreamsGroupHeartbeatResponse should not be null")
+      assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId())
+      assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch())
+
+      // Wait for internal topics to be created
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic and 
$expectedRepartitionTopic were not created")
+
+      // Verify the internal topics exist and have correct properties
+      val changelogTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedChangelogTopic)).allTopicNames().get()
+      val repartitionTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedRepartitionTopic)).allTopicNames().get()
+
+      assertTrue(changelogTopicDescription.containsKey(expectedChangelogTopic),
+        s"Changelog topic $expectedChangelogTopic should exist")
+      
assertTrue(repartitionTopicDescription.containsKey(expectedRepartitionTopic),
+        s"Repartition topic $expectedRepartitionTopic should exist")
+
+      // Verify topic configurations
+      val changelogTopic = 
changelogTopicDescription.get(expectedChangelogTopic)
+      val repartitionTopic = 
repartitionTopicDescription.get(expectedRepartitionTopic)
+
+      // Both topics should have 2 partitions (matching the input topic)
+      assertEquals(2, changelogTopic.partitions().size(),
+        s"Changelog topic should have 2 partitions, but has 
${changelogTopic.partitions().size()}")
+      assertEquals(2, repartitionTopic.partitions().size(),
+        s"Repartition topic should have 2 partitions, but has 
${repartitionTopic.partitions().size()}")
+
+      // Verify replication factor
+      assertEquals(1, changelogTopic.partitions().get(0).replicas().size(),
+        s"Changelog topic should have replication factor 1")
+      assertEquals(1, repartitionTopic.partitions().get(0).replicas().size(),
+        s"Repartition topic should have replication factor 1")
+    } finally {
+      admin.close()
+    }
+  }
+
+  @ClusterTest
+  def testDynamicGroupConfig(): Unit = {
+    val admin = cluster.admin()
+    val memberId1 = "test-member-1"
+    val memberId2 = "test-member-2"
+    val groupId = "test-group"
+    val topicName = "test-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = topicName,
+        numPartitions = 2
+      )
+      // Wait for topic to be available
+      TestUtils.waitUntilTrue(() => {
+        admin.listTopics().names().get().contains(topicName)
+      }, msg = s"Topic $topicName is not available to the group coordinator")
+
+      val topology = createTopologyWithInternalTopics(topicName, groupId)
+
+      // First member joins the group
+      var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(0)
+            .setRebalanceTimeoutMs(1000)
+            .setProcessId("process-1")
+            .setActiveTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().activeTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setStandbyTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().standbyTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setWarmupTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().warmupTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code()
+      }, "First StreamsGroupHeartbeatRequest did not succeed within the 
timeout period.")
+
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic or 
$expectedRepartitionTopic were not created")
+
+    // Second member joins the group
+    var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+        new StreamsGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId2)
+          .setMemberEpoch(0)
+          .setRebalanceTimeoutMs(1000)
+          .setProcessId("process-2")
+          .setActiveTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().activeTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setStandbyTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().standbyTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setWarmupTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().warmupTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setTopology(topology)
+      ).build(0)
+
+      streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+      streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code()
+    }, "Second StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Both members continue to send heartbeats with their assigned tasks
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            
.setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.activeTasks()))
+            
.setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.standbyTasks()))
+            
.setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.warmupTasks()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code()
+      }, "First member rebalance heartbeat did not succeed within the timeout 
period.")
+
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            
.setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.activeTasks()))
+            
.setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.standbyTasks()))
+            
.setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.warmupTasks()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+        streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code()
+      }, "Second member rebalance heartbeat did not succeed within the timeout 
period.")
+
+    // Verify initial state with no standby tasks
+    assertEquals(0, streamsGroupHeartbeatResponse1.data.standbyTasks().size(), 
"Member 1 should have no standby tasks initially")
+    assertEquals(0, streamsGroupHeartbeatResponse2.data.standbyTasks().size(), 
"Member 2 should have no standby tasks initially")
+
+      // Change streams.num.standby.replicas = 1
+      val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, 
groupId)
+      val alterConfigOp = new AlterConfigOp(
+        new ConfigEntry("streams.num.standby.replicas", "1"),
+        AlterConfigOp.OpType.SET
+      )
+      val configChanges = Map(groupConfigResource -> 
List(alterConfigOp).asJavaCollection).asJava
+      val options = new org.apache.kafka.clients.admin.AlterConfigsOptions()
+      admin.incrementalAlterConfigs(configChanges, options).all().get()
+
+      // Send heartbeats to trigger rebalance after config change
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Collections.emptyList())
+            .setStandbyTasks(Collections.emptyList())
+            .setWarmupTasks(Collections.emptyList())
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse1.data.standbyTasks() != null
+      }, "First member heartbeat after config change did not succeed within 
the timeout period.")
+
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Collections.emptyList())
+            .setStandbyTasks(Collections.emptyList())
+            .setWarmupTasks(Collections.emptyList())
+        ).build(0)
+
+        streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+        streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse2.data.standbyTasks() != null
+      }, "Second member heartbeat after config change did not succeed within 
the timeout period.")
+
+      // Verify that at least one member has active tasks
+      val member1HasActive = 
streamsGroupHeartbeatResponse1.data.activeTasks().size() > 0
+      val member2HasActive = 
streamsGroupHeartbeatResponse2.data.activeTasks().size() > 0
+      assertTrue(member1HasActive || member2HasActive, "At least one member 
should have active tasks after config change")
+
+      // Verify that at least one member has standby tasks
+      val member1HasStandby = 
streamsGroupHeartbeatResponse1.data.standbyTasks().size() > 0
+      val member2HasStandby = 
streamsGroupHeartbeatResponse2.data.standbyTasks().size() > 0
+      assertTrue(member1HasStandby || member2HasStandby, "At least one member 
should have standby tasks after config change")
+

Review Comment:
   Could add this to be more precise:
   
   ```
   // With 2 members and streams.num.standby.replicas=1, each active task 
should have 1 standby
   val totalActiveTasks = member1HasActive.size + member2HasActive.size
   val totalStandbyTasks = member1HasStandby.size + member2HasStandby.size
   assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task should 
have one standby")
   ```



##########
core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala:
##########
@@ -0,0 +1,579 @@
+package kafka.server
+
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, 
StreamsGroupHeartbeatResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{StreamsGroupHeartbeatRequest, 
StreamsGroupHeartbeatResponse}
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.common.config.ConfigResource
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertNull, assertTrue}
+
+import java.util.Collections
+import scala.jdk.CollectionConverters._
+
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  serverProperties = Array(
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer,streams")
+  )
+)
+class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest
+  def testInternalTopicsCreation(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-member-1"
+    val groupId = "test-group"
+    val inputTopicName = "input-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create input topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = inputTopicName,
+        numPartitions = 2
+      )
+
+      // Wait for topics to be available
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(inputTopicName)
+      }, msg = s"Input topic $inputTopicName is not available")
+
+      // Create topology with internal topics (changelog and repartition 
topics)
+      val topology = createTopologyWithInternalTopics(inputTopicName, groupId)
+
+      // Send heartbeat with topology containing internal topics
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(0)
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().activeTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setStandbyTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().standbyTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setWarmupTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().warmupTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
+        streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code()
+      }, "StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Verify the heartbeat was successful
+      assert(streamsGroupHeartbeatResponse != null, 
"StreamsGroupHeartbeatResponse should not be null")
+      assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId())
+      assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch())
+
+      // Wait for internal topics to be created
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic and 
$expectedRepartitionTopic were not created")
+
+      // Verify the internal topics exist and have correct properties
+      val changelogTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedChangelogTopic)).allTopicNames().get()
+      val repartitionTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedRepartitionTopic)).allTopicNames().get()
+
+      assertTrue(changelogTopicDescription.containsKey(expectedChangelogTopic),
+        s"Changelog topic $expectedChangelogTopic should exist")
+      
assertTrue(repartitionTopicDescription.containsKey(expectedRepartitionTopic),
+        s"Repartition topic $expectedRepartitionTopic should exist")
+
+      // Verify topic configurations
+      val changelogTopic = 
changelogTopicDescription.get(expectedChangelogTopic)
+      val repartitionTopic = 
repartitionTopicDescription.get(expectedRepartitionTopic)
+
+      // Both topics should have 2 partitions (matching the input topic)
+      assertEquals(2, changelogTopic.partitions().size(),
+        s"Changelog topic should have 2 partitions, but has 
${changelogTopic.partitions().size()}")
+      assertEquals(2, repartitionTopic.partitions().size(),
+        s"Repartition topic should have 2 partitions, but has 
${repartitionTopic.partitions().size()}")
+
+      // Verify replication factor
+      assertEquals(1, changelogTopic.partitions().get(0).replicas().size(),
+        s"Changelog topic should have replication factor 1")
+      assertEquals(1, repartitionTopic.partitions().get(0).replicas().size(),
+        s"Repartition topic should have replication factor 1")
+    } finally {
+      admin.close()
+    }
+  }
+
+  @ClusterTest
+  def testDynamicGroupConfig(): Unit = {
+    val admin = cluster.admin()
+    val memberId1 = "test-member-1"
+    val memberId2 = "test-member-2"
+    val groupId = "test-group"
+    val topicName = "test-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = topicName,
+        numPartitions = 2
+      )
+      // Wait for topic to be available
+      TestUtils.waitUntilTrue(() => {
+        admin.listTopics().names().get().contains(topicName)
+      }, msg = s"Topic $topicName is not available to the group coordinator")
+
+      val topology = createTopologyWithInternalTopics(topicName, groupId)
+
+      // First member joins the group
+      var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(0)
+            .setRebalanceTimeoutMs(1000)
+            .setProcessId("process-1")
+            .setActiveTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().activeTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setStandbyTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().standbyTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setWarmupTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().warmupTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code()
+      }, "First StreamsGroupHeartbeatRequest did not succeed within the 
timeout period.")
+
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic or 
$expectedRepartitionTopic were not created")
+
+    // Second member joins the group
+    var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+        new StreamsGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId2)
+          .setMemberEpoch(0)
+          .setRebalanceTimeoutMs(1000)
+          .setProcessId("process-2")
+          .setActiveTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().activeTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setStandbyTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().standbyTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setWarmupTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().warmupTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setTopology(topology)
+      ).build(0)
+
+      streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+      streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code()
+    }, "Second StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Both members continue to send heartbeats with their assigned tasks
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            
.setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.activeTasks()))
+            
.setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.standbyTasks()))
+            
.setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.warmupTasks()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code()
+      }, "First member rebalance heartbeat did not succeed within the timeout 
period.")
+
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            
.setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.activeTasks()))
+            
.setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.standbyTasks()))
+            
.setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.warmupTasks()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+        streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code()
+      }, "Second member rebalance heartbeat did not succeed within the timeout 
period.")
+
+    // Verify initial state with no standby tasks
+    assertEquals(0, streamsGroupHeartbeatResponse1.data.standbyTasks().size(), 
"Member 1 should have no standby tasks initially")

Review Comment:
   indentation seems off



##########
core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala:
##########
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, 
StreamsGroupHeartbeatResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{StreamsGroupHeartbeatRequest, 
StreamsGroupHeartbeatResponse}
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.common.config.ConfigResource
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertNull, assertTrue}
+
+import java.util.Collections
+import scala.jdk.CollectionConverters._
+
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  serverProperties = Array(
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer,streams")
+  )
+)
+class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest
+  def testInternalTopicsCreation(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-member-1"
+    val groupId = "test-group"
+    val inputTopicName = "input-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create input topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = inputTopicName,
+        numPartitions = 2
+      )
+
+      // Wait for topics to be available
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(inputTopicName)
+      }, msg = s"Input topic $inputTopicName is not available")
+
+      // Create topology with internal topics (changelog and repartition 
topics)
+      val topology = createTopologyWithInternalTopics(inputTopicName, groupId)
+
+      // Send heartbeat with topology containing internal topics
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(0)
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().activeTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setStandbyTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().standbyTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setWarmupTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().warmupTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
+        streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code()
+      }, "StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Verify the heartbeat was successful
+      assert(streamsGroupHeartbeatResponse != null, 
"StreamsGroupHeartbeatResponse should not be null")
+      assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId())
+      assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch())
+
+      // Wait for internal topics to be created
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic and 
$expectedRepartitionTopic were not created")
+
+      // Verify the internal topics exist and have correct properties
+      val changelogTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedChangelogTopic)).allTopicNames().get()
+      val repartitionTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedRepartitionTopic)).allTopicNames().get()
+
+      assertTrue(changelogTopicDescription.containsKey(expectedChangelogTopic),
+        s"Changelog topic $expectedChangelogTopic should exist")
+      
assertTrue(repartitionTopicDescription.containsKey(expectedRepartitionTopic),
+        s"Repartition topic $expectedRepartitionTopic should exist")
+
+      // Verify topic configurations
+      val changelogTopic = 
changelogTopicDescription.get(expectedChangelogTopic)
+      val repartitionTopic = 
repartitionTopicDescription.get(expectedRepartitionTopic)
+
+      // Both topics should have 2 partitions (matching the input topic)
+      assertEquals(2, changelogTopic.partitions().size(),
+        s"Changelog topic should have 2 partitions, but has 
${changelogTopic.partitions().size()}")
+      assertEquals(2, repartitionTopic.partitions().size(),
+        s"Repartition topic should have 2 partitions, but has 
${repartitionTopic.partitions().size()}")
+
+      // Verify replication factor
+      assertEquals(1, changelogTopic.partitions().get(0).replicas().size(),
+        s"Changelog topic should have replication factor 1")
+      assertEquals(1, repartitionTopic.partitions().get(0).replicas().size(),
+        s"Repartition topic should have replication factor 1")
+    } finally {
+      admin.close()
+    }
+  }
+
+  @ClusterTest
+  def testDynamicGroupConfig(): Unit = {
+    val admin = cluster.admin()
+    val memberId1 = "test-member-1"
+    val memberId2 = "test-member-2"
+    val groupId = "test-group"
+    val topicName = "test-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = topicName,
+        numPartitions = 2
+      )
+      // Wait for topic to be available
+      TestUtils.waitUntilTrue(() => {
+        admin.listTopics().names().get().contains(topicName)
+      }, msg = s"Topic $topicName is not available to the group coordinator")
+
+      val topology = createTopologyWithInternalTopics(topicName, groupId)
+
+      // First member joins the group
+      var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(0)
+            .setRebalanceTimeoutMs(1000)
+            .setProcessId("process-1")
+            .setActiveTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().activeTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setStandbyTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().standbyTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setWarmupTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().warmupTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code()
+      }, "First StreamsGroupHeartbeatRequest did not succeed within the 
timeout period.")
+
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic or 
$expectedRepartitionTopic were not created")
+
+    // Second member joins the group
+    var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+        new StreamsGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId2)
+          .setMemberEpoch(0)
+          .setRebalanceTimeoutMs(1000)
+          .setProcessId("process-2")
+          .setActiveTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().activeTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setStandbyTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().standbyTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setWarmupTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().warmupTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setTopology(topology)
+      ).build(0)
+
+      streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+      streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code()
+    }, "Second StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Both members continue to send heartbeats with their assigned tasks
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            
.setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.activeTasks()))
+            
.setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.standbyTasks()))
+            
.setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.warmupTasks()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code()
+      }, "First member rebalance heartbeat did not succeed within the timeout 
period.")
+
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            
.setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.activeTasks()))
+            
.setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.standbyTasks()))
+            
.setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.warmupTasks()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+        streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code()
+      }, "Second member rebalance heartbeat did not succeed within the timeout 
period.")
+
+    // Verify initial state with no standby tasks
+    assertEquals(0, streamsGroupHeartbeatResponse1.data.standbyTasks().size(), 
"Member 1 should have no standby tasks initially")
+    assertEquals(0, streamsGroupHeartbeatResponse2.data.standbyTasks().size(), 
"Member 2 should have no standby tasks initially")
+
+      // Change streams.num.standby.replicas = 1
+      val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, 
groupId)
+      val alterConfigOp = new AlterConfigOp(
+        new ConfigEntry("streams.num.standby.replicas", "1"),
+        AlterConfigOp.OpType.SET
+      )
+      val configChanges = Map(groupConfigResource -> 
List(alterConfigOp).asJavaCollection).asJava
+      val options = new org.apache.kafka.clients.admin.AlterConfigsOptions()
+      admin.incrementalAlterConfigs(configChanges, options).all().get()
+
+      // Send heartbeats to trigger rebalance after config change
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Collections.emptyList())
+            .setStandbyTasks(Collections.emptyList())
+            .setWarmupTasks(Collections.emptyList())
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse1.data.standbyTasks() != null
+      }, "First member heartbeat after config change did not succeed within 
the timeout period.")
+
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Collections.emptyList())
+            .setStandbyTasks(Collections.emptyList())
+            .setWarmupTasks(Collections.emptyList())
+        ).build(0)
+
+        streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+        streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse2.data.standbyTasks() != null
+      }, "Second member heartbeat after config change did not succeed within 
the timeout period.")
+
+      // Verify that at least one member has active tasks
+      val member1HasActive = 
streamsGroupHeartbeatResponse1.data.activeTasks().size() > 0
+      val member2HasActive = 
streamsGroupHeartbeatResponse2.data.activeTasks().size() > 0
+      assertTrue(member1HasActive || member2HasActive, "At least one member 
should have active tasks after config change")
+
+      // Verify that at least one member has standby tasks
+      val member1HasStandby = 
streamsGroupHeartbeatResponse1.data.standbyTasks().size() > 0
+      val member2HasStandby = 
streamsGroupHeartbeatResponse2.data.standbyTasks().size() > 0
+      assertTrue(member1HasStandby || member2HasStandby, "At least one member 
should have standby tasks after config change")
+
+    } finally {
+      admin.close()
+    }
+  }
+
+  @ClusterTest(
+    types = Array(Type.KRAFT),
+    serverProperties = Array(
+      new ClusterConfigProperty(key = "group.streams.heartbeat.interval.ms", 
value = "500"),
+      new ClusterConfigProperty(key = 
"group.streams.min.heartbeat.interval.ms", value = "500"),
+      new ClusterConfigProperty(key = "group.streams.session.timeout.ms", 
value = "501"),
+      new ClusterConfigProperty(key = "group.streams.min.session.timeout.ms", 
value = "501")
+    )
+  )
+  def testMemberJoiningAndExpiring(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-member-1"
+    val groupId = "test-group"
+    val topicName = "test-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = topicName,
+        numPartitions = 2
+      )
+      TestUtils.waitUntilTrue(() => {
+        admin.listTopics().names().get().contains(topicName)
+      }, msg = s"Topic $topicName is not available to the group coordinator")
+
+      val topology = createMockTopology(topicName)
+
+      // First member joins the group
+      val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequest.Builder(
+        new StreamsGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId)
+          .setMemberEpoch(0)
+          .setRebalanceTimeoutMs(1000)
+          .setActiveTasks(java.util.Collections.emptyList())
+          .setStandbyTasks(java.util.Collections.emptyList())
+          .setWarmupTasks(java.util.Collections.emptyList())
+          .setTopology(topology)
+      ).build(0)
+
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        streamsGroupHeartbeatResponse = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
+        streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code()
+      }, "StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      val memberEpoch = streamsGroupHeartbeatResponse.data.memberEpoch()
+      assertEquals(1, memberEpoch)
+
+      // Blocking the thread for 1 sec so that the session times out and the 
member needs to rejoin
+      Thread.sleep(1000)
+
+      // Prepare the next heartbeat which should fail due to member expiration
+      val expiredHeartbeatRequest = new StreamsGroupHeartbeatRequest.Builder(
+        new StreamsGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId)
+          .setMemberEpoch(memberEpoch)
+          .setRebalanceTimeoutMs(1000)
+          
.setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.activeTasks()))
+          
.setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.standbyTasks()))
+          
.setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse.data.warmupTasks()))
+      ).build(0)
+
+      TestUtils.waitUntilTrue(() => {
+        val expiredResponse = 
connectAndReceive[StreamsGroupHeartbeatResponse](expiredHeartbeatRequest)
+        expiredResponse.data.errorCode == Errors.UNKNOWN_MEMBER_ID.code() &&
+          expiredResponse.data.memberEpoch() == 0

Review Comment:
   Looks like there is a && missing here. Part of the boolean condition is just 
dropped.



##########
core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala:
##########
@@ -0,0 +1,594 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry}
+import org.apache.kafka.common.message.{StreamsGroupHeartbeatRequestData, 
StreamsGroupHeartbeatResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{StreamsGroupHeartbeatRequest, 
StreamsGroupHeartbeatResponse}
+import org.apache.kafka.common.test.ClusterInstance
+import org.apache.kafka.common.test.api.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+import org.apache.kafka.common.config.ConfigResource
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, 
assertNull, assertTrue}
+
+import java.util.Collections
+import scala.jdk.CollectionConverters._
+
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  serverProperties = Array(
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer,streams")
+  )
+)
+class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+
+  @ClusterTest
+  def testInternalTopicsCreation(): Unit = {
+    val admin = cluster.admin()
+    val memberId = "test-member-1"
+    val groupId = "test-group"
+    val inputTopicName = "input-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create input topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = inputTopicName,
+        numPartitions = 2
+      )
+
+      // Wait for topics to be available
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(inputTopicName)
+      }, msg = s"Input topic $inputTopicName is not available")
+
+      // Create topology with internal topics (changelog and repartition 
topics)
+      val topology = createTopologyWithInternalTopics(inputTopicName, groupId)
+
+      // Send heartbeat with topology containing internal topics
+      var streamsGroupHeartbeatResponse: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(0)
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().activeTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setStandbyTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().standbyTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setWarmupTasks(Option(streamsGroupHeartbeatResponse)
+              .map(r => convertTaskIds(r.data().warmupTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest)
+        streamsGroupHeartbeatResponse.data.errorCode == Errors.NONE.code()
+      }, "StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Verify the heartbeat was successful
+      assert(streamsGroupHeartbeatResponse != null, 
"StreamsGroupHeartbeatResponse should not be null")
+      assertEquals(memberId, streamsGroupHeartbeatResponse.data.memberId())
+      assertEquals(1, streamsGroupHeartbeatResponse.data.memberEpoch())
+
+      // Wait for internal topics to be created
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic and 
$expectedRepartitionTopic were not created")
+
+      // Verify the internal topics exist and have correct properties
+      val changelogTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedChangelogTopic)).allTopicNames().get()
+      val repartitionTopicDescription = 
admin.describeTopics(java.util.Collections.singletonList(expectedRepartitionTopic)).allTopicNames().get()
+
+      assertTrue(changelogTopicDescription.containsKey(expectedChangelogTopic),
+        s"Changelog topic $expectedChangelogTopic should exist")
+      
assertTrue(repartitionTopicDescription.containsKey(expectedRepartitionTopic),
+        s"Repartition topic $expectedRepartitionTopic should exist")
+
+      // Verify topic configurations
+      val changelogTopic = 
changelogTopicDescription.get(expectedChangelogTopic)
+      val repartitionTopic = 
repartitionTopicDescription.get(expectedRepartitionTopic)
+
+      // Both topics should have 2 partitions (matching the input topic)
+      assertEquals(2, changelogTopic.partitions().size(),
+        s"Changelog topic should have 2 partitions, but has 
${changelogTopic.partitions().size()}")
+      assertEquals(2, repartitionTopic.partitions().size(),
+        s"Repartition topic should have 2 partitions, but has 
${repartitionTopic.partitions().size()}")
+
+      // Verify replication factor
+      assertEquals(1, changelogTopic.partitions().get(0).replicas().size(),
+        s"Changelog topic should have replication factor 1")
+      assertEquals(1, repartitionTopic.partitions().get(0).replicas().size(),
+        s"Repartition topic should have replication factor 1")
+    } finally {
+      admin.close()
+    }
+  }
+
+  @ClusterTest
+  def testDynamicGroupConfig(): Unit = {
+    val admin = cluster.admin()
+    val memberId1 = "test-member-1"
+    val memberId2 = "test-member-2"
+    val groupId = "test-group"
+    val topicName = "test-topic"
+
+    try {
+      TestUtils.createOffsetsTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq
+      )
+
+      // Create topic
+      TestUtils.createTopicWithAdmin(
+        admin = admin,
+        brokers = cluster.brokers.values().asScala.toSeq,
+        controllers = cluster.controllers().values().asScala.toSeq,
+        topic = topicName,
+        numPartitions = 2
+      )
+      // Wait for topic to be available
+      TestUtils.waitUntilTrue(() => {
+        admin.listTopics().names().get().contains(topicName)
+      }, msg = s"Topic $topicName is not available to the group coordinator")
+
+      val topology = createTopologyWithInternalTopics(topicName, groupId)
+
+      // First member joins the group
+      var streamsGroupHeartbeatResponse1: StreamsGroupHeartbeatResponse = null
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(0)
+            .setRebalanceTimeoutMs(1000)
+            .setProcessId("process-1")
+            .setActiveTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().activeTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setStandbyTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().standbyTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setWarmupTasks(Option(streamsGroupHeartbeatResponse1)
+              .map(r => convertTaskIds(r.data().warmupTasks()))
+              .getOrElse(Collections.emptyList()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code()
+      }, "First StreamsGroupHeartbeatRequest did not succeed within the 
timeout period.")
+
+      val expectedChangelogTopic = s"$groupId-subtopology-1-changelog"
+      val expectedRepartitionTopic = s"$groupId-subtopology-1-repartition"
+      TestUtils.waitUntilTrue(() => {
+        val topicNames = admin.listTopics().names().get()
+        topicNames.contains(expectedChangelogTopic) && 
topicNames.contains(expectedRepartitionTopic)
+      }, msg = s"Internal topics $expectedChangelogTopic or 
$expectedRepartitionTopic were not created")
+
+    // Second member joins the group
+    var streamsGroupHeartbeatResponse2: StreamsGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+        new StreamsGroupHeartbeatRequestData()
+          .setGroupId(groupId)
+          .setMemberId(memberId2)
+          .setMemberEpoch(0)
+          .setRebalanceTimeoutMs(1000)
+          .setProcessId("process-2")
+          .setActiveTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().activeTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setStandbyTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().standbyTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setWarmupTasks(Option(streamsGroupHeartbeatResponse2)
+            .map(r => convertTaskIds(r.data().warmupTasks()))
+            .getOrElse(Collections.emptyList()))
+          .setTopology(topology)
+      ).build(0)
+
+      streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+      streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code()
+    }, "Second StreamsGroupHeartbeatRequest did not succeed within the timeout 
period.")
+
+      // Both members continue to send heartbeats with their assigned tasks
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            
.setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.activeTasks()))
+            
.setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.standbyTasks()))
+            
.setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse1.data.warmupTasks()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code()
+      }, "First member rebalance heartbeat did not succeed within the timeout 
period.")
+
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            
.setActiveTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.activeTasks()))
+            
.setStandbyTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.standbyTasks()))
+            
.setWarmupTasks(convertTaskIds(streamsGroupHeartbeatResponse2.data.warmupTasks()))
+            .setTopology(topology)
+        ).build(0)
+
+        streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+        streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code()
+      }, "Second member rebalance heartbeat did not succeed within the timeout 
period.")
+
+    // Verify initial state with no standby tasks
+    assertEquals(0, streamsGroupHeartbeatResponse1.data.standbyTasks().size(), 
"Member 1 should have no standby tasks initially")
+    assertEquals(0, streamsGroupHeartbeatResponse2.data.standbyTasks().size(), 
"Member 2 should have no standby tasks initially")
+
+      // Change streams.num.standby.replicas = 1
+      val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, 
groupId)
+      val alterConfigOp = new AlterConfigOp(
+        new ConfigEntry("streams.num.standby.replicas", "1"),
+        AlterConfigOp.OpType.SET
+      )
+      val configChanges = Map(groupConfigResource -> 
List(alterConfigOp).asJavaCollection).asJava
+      val options = new org.apache.kafka.clients.admin.AlterConfigsOptions()
+      admin.incrementalAlterConfigs(configChanges, options).all().get()
+
+      // Send heartbeats to trigger rebalance after config change
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest1 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(streamsGroupHeartbeatResponse1.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Collections.emptyList())
+            .setStandbyTasks(Collections.emptyList())
+            .setWarmupTasks(Collections.emptyList())
+        ).build(0)
+
+        streamsGroupHeartbeatResponse1 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest1)
+        streamsGroupHeartbeatResponse1.data.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse1.data.standbyTasks() != null
+      }, "First member heartbeat after config change did not succeed within 
the timeout period.")
+
+      TestUtils.waitUntilTrue(() => {
+        val streamsGroupHeartbeatRequest2 = new 
StreamsGroupHeartbeatRequest.Builder(
+          new StreamsGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId2)
+            .setMemberEpoch(streamsGroupHeartbeatResponse2.data.memberEpoch())
+            .setRebalanceTimeoutMs(1000)
+            .setActiveTasks(Collections.emptyList())
+            .setStandbyTasks(Collections.emptyList())
+            .setWarmupTasks(Collections.emptyList())
+        ).build(0)
+
+        streamsGroupHeartbeatResponse2 = 
connectAndReceive[StreamsGroupHeartbeatResponse](streamsGroupHeartbeatRequest2)
+        streamsGroupHeartbeatResponse2.data.errorCode == Errors.NONE.code() &&
+          streamsGroupHeartbeatResponse2.data.standbyTasks() != null
+      }, "Second member heartbeat after config change did not succeed within 
the timeout period.")
+
+      // Verify that at least one member has active tasks
+      val member1HasActive = 
streamsGroupHeartbeatResponse1.data.activeTasks().size() > 0

Review Comment:
   nit: member1HasActiveTasks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to