cadonna commented on code in PR #17695:
URL: https://github.com/apache/kafka/pull/17695#discussion_r1830947659
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -627,6 +626,99 @@ private List<TaskIds> convertTaskIdCollection(final
Set<StreamsAssignmentInterfa
.collect(Collectors.toList());
}
+ private List<StreamsGroupHeartbeatRequestData.Subtopology>
getTopologyFromStreams() {
+ final Map<String, StreamsAssignmentInterface.Subtopology>
subTopologyMap = streamsInterface.subtopologyMap();
+ final List<StreamsGroupHeartbeatRequestData.Subtopology>
subtopologies = new ArrayList<>(subTopologyMap.size());
+ for (final Map.Entry<String,
StreamsAssignmentInterface.Subtopology> subtopology :
subTopologyMap.entrySet()) {
+
subtopologies.add(getSubtopologyFromStreams(subtopology.getKey(),
subtopology.getValue()));
+ }
+ return subtopologies;
+ }
+
+ private static StreamsGroupHeartbeatRequestData.Subtopology
getSubtopologyFromStreams(final String subtopologyName,
+
final StreamsAssignmentInterface.Subtopology subtopology) {
+ final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData
= new StreamsGroupHeartbeatRequestData.Subtopology();
+ subtopologyData.setSubtopologyId(subtopologyName);
+ ArrayList<String> sortedSourceTopics = new
ArrayList<>(subtopology.sourceTopics);
+ Collections.sort(sortedSourceTopics);
+ subtopologyData.setSourceTopics(sortedSourceTopics);
+ ArrayList<String> sortedSinkTopics = new
ArrayList<>(subtopology.repartitionSinkTopics);
+ Collections.sort(sortedSinkTopics);
+ subtopologyData.setRepartitionSinkTopics(sortedSinkTopics);
+
subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology));
+
subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology));
+ subtopologyData.setCopartitionGroups(
+ getCopartitionGroupsFromStreams(subtopology.copartitionGroups,
subtopologyData));
+ return subtopologyData;
+ }
+
+ private static List<CopartitionGroup> getCopartitionGroupsFromStreams(
+ final Collection<Set<String>> copartitionGroups,
+ final StreamsGroupHeartbeatRequestData.Subtopology
subtopologyData) {
Review Comment:
nit:
```suggestion
private static List<CopartitionGroup>
getCopartitionGroupsFromStreams(final Collection<Set<String>> copartitionGroups,
final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData) {
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2288,18 +2259,40 @@ private
CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord>
records
);
- if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
- // The subscription metadata is updated when the refresh deadline
has been reached.
- subscriptionMetadata = group.computeSubscriptionMetadata(
+ // 2. Initialize/Update the group topology.
Review Comment:
If those different steps are so important, why not exporting them to methods
and give them meaningful names?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -627,6 +626,99 @@ private List<TaskIds> convertTaskIdCollection(final
Set<StreamsAssignmentInterfa
.collect(Collectors.toList());
}
+ private List<StreamsGroupHeartbeatRequestData.Subtopology>
getTopologyFromStreams() {
+ final Map<String, StreamsAssignmentInterface.Subtopology>
subTopologyMap = streamsInterface.subtopologyMap();
+ final List<StreamsGroupHeartbeatRequestData.Subtopology>
subtopologies = new ArrayList<>(subTopologyMap.size());
+ for (final Map.Entry<String,
StreamsAssignmentInterface.Subtopology> subtopology :
subTopologyMap.entrySet()) {
+
subtopologies.add(getSubtopologyFromStreams(subtopology.getKey(),
subtopology.getValue()));
+ }
+ return subtopologies;
+ }
+
+ private static StreamsGroupHeartbeatRequestData.Subtopology
getSubtopologyFromStreams(final String subtopologyName,
+
final StreamsAssignmentInterface.Subtopology subtopology) {
+ final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData
= new StreamsGroupHeartbeatRequestData.Subtopology();
+ subtopologyData.setSubtopologyId(subtopologyName);
+ ArrayList<String> sortedSourceTopics = new
ArrayList<>(subtopology.sourceTopics);
+ Collections.sort(sortedSourceTopics);
+ subtopologyData.setSourceTopics(sortedSourceTopics);
+ ArrayList<String> sortedSinkTopics = new
ArrayList<>(subtopology.repartitionSinkTopics);
+ Collections.sort(sortedSinkTopics);
+ subtopologyData.setRepartitionSinkTopics(sortedSinkTopics);
+
subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology));
+
subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology));
+ subtopologyData.setCopartitionGroups(
+ getCopartitionGroupsFromStreams(subtopology.copartitionGroups,
subtopologyData));
+ return subtopologyData;
+ }
+
+ private static List<CopartitionGroup> getCopartitionGroupsFromStreams(
+ final Collection<Set<String>> copartitionGroups,
+ final StreamsGroupHeartbeatRequestData.Subtopology
subtopologyData) {
+
+ final Map<String, Short> sourceTopicsMap =
+ IntStream.range(0, subtopologyData.sourceTopics().size())
+ .boxed()
+
.collect(Collectors.toMap(subtopologyData.sourceTopics()::get,
Integer::shortValue));
+
+ final Map<String, Short> repartitionSourceTopics =
+ IntStream.range(0,
subtopologyData.repartitionSourceTopics().size())
+ .boxed()
+ .collect(
+ Collectors.toMap(x ->
subtopologyData.repartitionSourceTopics().get(x).name(),
+ Integer::shortValue));
+
+ return copartitionGroups.stream()
+ .map(x -> getCopartitionGroupFromStreams(x, sourceTopicsMap,
repartitionSourceTopics))
+ .collect(Collectors.toList());
+ }
+
+ private static CopartitionGroup getCopartitionGroupFromStreams(
+ final Set<String> topicNames,
+ final Map<String, Short> sourceTopicsMap,
+ final Map<String, Short> repartitionSourceTopics) {
Review Comment:
Same nit as above
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -213,6 +242,39 @@ void testFullStaticInformationWhenJoining() {
assertEquals(1, request.data().clientTags().size());
assertEquals("clientTag1", request.data().clientTags().get(0).key());
assertEquals("value2", request.data().clientTags().get(0).value());
+ assertEquals(streamsAssignmentInterface.topologyId(),
request.data().topologyId());
+ assertNotNull(request.data().topology());
+ final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies
= request.data().topology();
+ assertEquals(1, subtopologies.size());
+ final StreamsGroupHeartbeatRequestData.Subtopology subtopology =
subtopologies.get(0);
+ assertEquals(subtopologyName1, subtopology.subtopologyId());
+ assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"),
subtopology.sourceTopics());
+ assertEquals(Arrays.asList("repartitionSinkTopic1",
"repartitionSinkTopic2", "repartitionSinkTopic3"),
subtopology.repartitionSinkTopics());
+ assertEquals(repartitionSourceTopics.size(),
subtopology.repartitionSourceTopics().size());
+ subtopology.repartitionSourceTopics().forEach(topicInfo -> {
+ final StreamsAssignmentInterface.TopicInfo repartitionTopic =
repartitionSourceTopics.get(topicInfo.name());
+ assertEquals(repartitionTopic.numPartitions.get(),
topicInfo.partitions());
+ assertEquals(repartitionTopic.replicationFactor.get(),
topicInfo.replicationFactor());
+ });
+ assertEquals(changelogTopics.size(),
subtopology.stateChangelogTopics().size());
+ subtopology.stateChangelogTopics().forEach(topicInfo -> {
+ assertTrue(changelogTopics.containsKey(topicInfo.name()));
+ assertEquals(0, topicInfo.partitions());
+ final StreamsAssignmentInterface.TopicInfo changelogTopic =
changelogTopics.get(topicInfo.name());
+ assertEquals(changelogTopic.replicationFactor.get(),
topicInfo.replicationFactor());
+ });
+
Review Comment:
nit:
```suggestion
```
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -627,6 +626,99 @@ private List<TaskIds> convertTaskIdCollection(final
Set<StreamsAssignmentInterfa
.collect(Collectors.toList());
}
+ private List<StreamsGroupHeartbeatRequestData.Subtopology>
getTopologyFromStreams() {
+ final Map<String, StreamsAssignmentInterface.Subtopology>
subTopologyMap = streamsInterface.subtopologyMap();
+ final List<StreamsGroupHeartbeatRequestData.Subtopology>
subtopologies = new ArrayList<>(subTopologyMap.size());
+ for (final Map.Entry<String,
StreamsAssignmentInterface.Subtopology> subtopology :
subTopologyMap.entrySet()) {
+
subtopologies.add(getSubtopologyFromStreams(subtopology.getKey(),
subtopology.getValue()));
Review Comment:
Why do you not also sort this list?
As far as I can see the consumer coordinator iterates over this list and
writes the content into the coordinator record.
In general, why do we not sort all lists in the consumer-coordinator. We do
not specify in the KIP that the lists need to be sorted as far as I remember.
Thus, the sorting is rather an implementation detail.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3879,103 +3878,83 @@ class KafkaApis(val requestChannel: RequestChannel,
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
}
- def handleStreamsGroupInitialize(request: RequestChannel.Request):
CompletableFuture[Unit] = {
- // TODO: The unit tests for this method are insufficient. Once we merge
initialize with group heartbeat, we have to extend the tests to cover ACLs and
internal topic creation
- val streamsGroupInitializeRequest =
request.body[StreamsGroupInitializeRequest]
+ def handleStreamsGroupHeartbeat(request: RequestChannel.Request):
CompletableFuture[Unit] = {
+ val streamsGroupHeartbeatRequest =
request.body[StreamsGroupHeartbeatRequest]
if (!isStreamsGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the
default). If the
// new one is not enabled, we fail directly here.
- requestHelper.sendMaybeThrottle(request,
streamsGroupInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ requestHelper.sendMaybeThrottle(request,
streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
- } else if (!authHelper.authorize(request.context, READ, GROUP,
streamsGroupInitializeRequest.data.groupId)) {
- requestHelper.sendMaybeThrottle(request,
streamsGroupInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ } else if (!authHelper.authorize(request.context, READ, GROUP,
streamsGroupHeartbeatRequest.data.groupId)) {
+ requestHelper.sendMaybeThrottle(request,
streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val requestContext = request.context
- val internalTopics: Map[String,
StreamsGroupInitializeRequestData.TopicInfo] = {
-
streamsGroupInitializeRequest.data().topology().asScala.flatMap(subtopology =>
- subtopology.repartitionSourceTopics().iterator().asScala ++
subtopology.stateChangelogTopics().iterator().asScala
- ).map(x => x.name() -> x).toMap
- }
-
- val prohibitedInternalTopics =
internalTopics.keys.filter(Topic.isInternal)
- if (prohibitedInternalTopics.nonEmpty) {
- val errorResponse = new StreamsGroupInitializeResponseData()
- errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
- errorResponse.setErrorMessage(f"Use of Kafka internal topics
${prohibitedInternalTopics.mkString(",")} as Kafka Streams internal topics is
prohibited.")
- requestHelper.sendMaybeThrottle(request, new
StreamsGroupInitializeResponse(errorResponse))
- return CompletableFuture.completedFuture[Unit](())
- }
+ if (streamsGroupHeartbeatRequest.data().topology() != null) {
+ val requiredTopics: Seq[String] =
+
streamsGroupHeartbeatRequest.data().topology().iterator().asScala.flatMap(subtopology
=>
+ (subtopology.sourceTopics().iterator().asScala:Iterator[String])
+ ++
(subtopology.repartitionSinkTopics().iterator().asScala:Iterator[String])
+ ++
(subtopology.repartitionSourceTopics().iterator().asScala.map(_.name()):Iterator[String])
+ ++
(subtopology.stateChangelogTopics().iterator().asScala.map(_.name()):Iterator[String])
+ ).toSeq
+
+ // Checking early for valid topology names, since we don't want to
pass those to `authHelper`.
Review Comment:
I do not understand this comment. Should it be "valid topic names"? But
then, why "valid"? Here you check whether internal topics are used. On line
3914 you check for validity.
IMO, the comment is not needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]