lucasbru opened a new pull request, #17433: URL: https://github.com/apache/kafka/pull/17433
First implementation of auto-topic creation of internal topics for KIP-1071. It’s only a sketch, there are some parts missing, unit tests are insufficient, etc. But the draft works end-to-end, automatically creating all required internal topics for a streams application upon initialization, with smoke test integration test passing. When initializing the topology, the client sends information about the internal topics required for the application. The key concept is that the topology description being sent to the broker is parametric in the number of partitions -- that is, the broker checks the number of partitions for the input partition, and derives the number of partitions for the internal topic. This logic is ported from the existing streams partition assignor. The basic flow is that, upon receiving the initialize RPC, the group coordinator checks for the right ACL permissions, validates if the request is valid, then updates the topology record in the consumer offset topic, determines the right number of partitions for all internal topics and finally triggers a corresponding CreateTopic request. Code for generating and validating internal topic configuration lives inside the group coordinator, but the actual creation of the topics is performed from within KafkaApis. Compared to the schema in KIP-1071, I had to add copartition groups to the initialization call. Copartititon groups are not necessarily the same as subtopologies. For example, if we merge two topics, the two topics are in separate copartition groups but the same subtopology. If we join, they are in the same copartition group. So we need to model copartition groups separately in the RPC, otherwise the broker cannot derive the right topic configurations. There is a dependency between updating the topology record in the consumer offset topic and creating the internal topics. In case of failures, one could be executed without the other. In this implementation, we update the topology record first, and then create internal topics. The topology record will be the single source of truth. If internal topic creation fails, we need to detect this inside the client. Right now, the heartbeat will respond with error if internal topics are missing. Once we merge the heartbeat and the initialization RPC, we should get the behavior that any attempt to join the group (with a topology description) will also be an attempt to create any missing internal topics. Surprising, existing RPCs (TopicMetadata and FindCoordinator) auto-create topics asynchronously — so a create topic request is started, but the result is never checked. I followed the same route for this first version, but I intend to change this. You’d have to check the broker logs if internal topics cannot be created. We can do some validation up-front, but the final confirmation that internal topics were created comes from the response of the internal CreateTopics request, so I would like to only respond to the StreamsGroupInitialize once I have received the result of the CreateTopic RPCs. We need some extra logic for revalidating internal topics. With this change, the set of tasks is defined by the TopicsImage in the group coordinator and the partition-parametric topology description. The group epoch needs to be bumped when either changes, we need to make sure to cache the result of validating and instantiating a topology against a current set of input topics, so that we don’t have to redo it on every heartbeat. After fail-over, we need to reconstruct this cache from the topology and subscription records in the offset topic. This is not fully implemented yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
