AndrewJSchofield commented on code in PR #22368:
URL: https://github.com/apache/kafka/pull/22368#discussion_r3303648734
##########
core/src/main/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImpl.java:
##########
@@ -93,8 +95,8 @@ public boolean isDlqEnabledOnTopic(String topic) {
return false;
}
Object isEnabled =
props.get(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG);
- if (isEnabled instanceof Boolean) {
- return (boolean) isEnabled;
+ if (isEnabled instanceof String) {
Review Comment:
I'm surprised that this is necessary. This config has a boolean type
already, so I wonder whether that's being lost somehow on its way to the
metadata cache. For other similar configs, such as
`TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG`, the code just does
`Config.getBoolean`.
##########
core/src/test/java/kafka/server/share/ShareCoordinatorMetadataCacheHelperImplTest.java:
##########
@@ -680,14 +682,14 @@ public void
testIsDlqEnabledOnTopicReturnsFalseWhenConfigValueNotBoolean() {
mock(GroupConfigManager.class)
);
- assertFalse(cache.isDlqEnabledOnTopic("test-topic"));
+ assertTrue(cache.isDlqEnabledOnTopic("test-topic"));
}
@Test
- public void testIsDlqEnabledOnTopicReturnsTrue() {
+ public void testIsDlqEnabledOnTopicReturnsFalseValue() {
Review Comment:
nit: why `FalseValue` instead of `False`?
##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -126,15 +149,38 @@ public CompletableFuture<Void>
dlq(ShareGroupDLQRecordParameter param) {
return future;
}
+ // Visibility for tests
+ CompletableFuture<Void> dlq(ShareGroupDLQRecordParameter param, long
requestBackoffMs, long requestBackoffMaxMs, int maxRequestAttempts) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ProduceRequestHandler requestHandler = new
ProduceRequestHandler(param, future, requestBackoffMs, requestBackoffMaxMs,
maxRequestAttempts);
+ enqueue(requestHandler);
+ return future;
+ }
+
private void enqueue(ProduceRequestHandler requestHandler) {
sender.enqueue(requestHandler);
}
+ private void addRequestToNodeMap(Node node, ProduceRequestHandler handler)
{
+ if (!handler.isBatchable()) {
Review Comment:
This could do with some explanation I feel.
##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -204,6 +261,66 @@ public AbstractRequest.Builder<CreateTopicsRequest>
createTopicBuilder() throws
.setTopics(topicCollection));
}
+ public AbstractRequest.Builder<? extends AbstractRequest>
requestBuilder() {
+ throw new RuntimeException("Produce requests are batchable, hence
individual requests not needed.");
+ }
+
+ public void populateDLQTopicData() throws ConfigException {
+ Optional<String> dlqTopic =
cacheHelper.shareGroupDlqTopic(param.groupId());
+ if (dlqTopic.isEmpty()) {
+ throw new ConfigException(String.format("DLQ topic is not
configured for share group %s.", param.groupId()));
+ }
+
+ ShareGroupDLQMetadataCacheHelper.TopicPartitionData tpData =
cacheHelper.topicPartitionData(dlqTopic.get());
+
+ if (tpData.topicId().isEmpty()) {
+ throw new ConfigException(String.format("DLQ topic id could
not be found for share group %s with DLQ topic %s.", param.groupId(),
dlqTopic.get()));
+ }
+
+ if (tpData.numPartitions().isEmpty()) {
+ throw new ConfigException(String.format("DLQ topic partition
count not be found for share group %s with DLQ topic %s.", param.groupId(),
dlqTopic.get()));
Review Comment:
nit: Missing "could".
##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -332,6 +508,93 @@ private void handleCreateTopicsResponse(ClientResponse
response) {
requestErrorResponse(clientResponseError.exception());
}
}
+
+ private void handleProduceResponse(ClientResponse response) {
+ LOG.debug("Received ProduceRequestResponse {}", response);
+ produceRequestBackoff.incrementAttempt();
+ Errors clientResponseError =
checkResponseError(response).orElse(Errors.NONE);
+ String clientResponseErrorMessage = clientResponseError.message();
+
+ switch (clientResponseError) {
+ case NONE:
+ // Produce response received
+ ProduceResponse produceResponse = ((ProduceResponse)
response.responseBody());
+ ProduceResponseData.TopicProduceResponseCollection
produceResponseCollection = produceResponse.data().responses();
+ if (produceResponseCollection.isEmpty()) {
+ LOG.error("Received empty produce response for {} to
dlq topic node {}.", this, dlqPartitionLeaderNode());
Review Comment:
nit: Let's be consistent with use of `.` in the log messages. I would tend
not to include in log messages, but I would definitely go for consistency in
this source file.
##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
Review Comment:
nit: This method could call the overload beneath will fully specified
arguments.
##########
server-common/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java:
##########
@@ -238,7 +355,51 @@ public Optional<Throwable> validateDlqTopic() {
public boolean dlqTopicExists() {
Optional<String> shareGroupDlqTopic =
cacheHelper.shareGroupDlqTopic(param.groupId());
- return
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+ boolean isDlqTopicPresent =
shareGroupDlqTopic.filter(cacheHelper::containsTopic).isPresent();
+ if (isDlqTopicPresent) {
+ try {
+ populateDLQTopicData();
+ } catch (ConfigException e) {
+ return false;
+ }
+ addRequestToNodeMap(dlqPartitionLeaderNode, this);
+ }
+ return isDlqTopicPresent;
+ }
+
+ @Override
+ public String toString() {
+ return "ProduceRequestHandler(" +
+ "param: " + param + "\n" +
+ "dlqTopicData: " + dlqTopicPartitionData + "\n" +
+ ")";
+ }
+
+ private Header[] headers(long offset) {
+ List<Header> headers = new ArrayList<>();
+ headers.add(new RecordHeader("__dlq.errors.topic",
recordTopic().getBytes(StandardCharsets.UTF_8)));
Review Comment:
I would define some static strings for the header keys.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]