chia7712 commented on code in PR #22408:
URL: https://github.com/apache/kafka/pull/22408#discussion_r3414959898


##########
server/src/main/java/org/apache/kafka/server/ForwardingManager.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.network.Request;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public interface ForwardingManager {
+
+    /**
+     * Close the forwarding manager
+     */
+    void close();

Review Comment:
   Can we just let `ForwardingManager` extend `AutoCloseable`?



##########
server/src/main/java/org/apache/kafka/server/ForwardingManager.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.network.Request;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public interface ForwardingManager {
+
+    /**
+     * Close the forwarding manager
+     */
+    void close();
+
+    /**
+     * Forward given request to the active controller.
+     *
+     * @param originalRequest     The request to forward.
+     * @param responseCallback    A callback which takes in an 
`Optional<AbstractResponse>`.
+     *                            We will call this function with 
Optional.of(x) after the controller responds with x.
+     *                            Or, if the controller doesn't support the 
request version, we will complete
+     *                            the callback with Optional.empty().
+     */
+    default void forwardRequest(
+            Request originalRequest,
+            Consumer<Optional<AbstractResponse>> responseCallback
+    ) {
+        ByteBuffer buffer = originalRequest.buffer().duplicate();
+        buffer.flip();
+        forwardRequest(originalRequest.context(),
+                buffer,
+                originalRequest.startTimeNanos(),
+                originalRequest.body(AbstractRequest.class),
+                originalRequest::toString,
+                responseCallback);
+    }
+
+    /**
+     * Forward given request to the active controller.
+     *
+     * @param originalRequest     The request to forward.
+     * @param newRequestBody      The AbstractRequest we are sending.
+     * @param responseCallback    A callback which takes in an 
`Optional&lt;AbstractResponse&gt;`.
+     *                            We will call this function with 
Optional.of(x) after the controller responds with x.
+     *                            Or, if the controller doesn't support the 
request version, we will complete
+     *                            the callback with Optional.empty().
+     */
+    default void forwardRequest(
+            Request originalRequest,
+            AbstractRequest newRequestBody,
+            Consumer<Optional<AbstractResponse>> responseCallback) {
+        ByteBuffer buffer = 
newRequestBody.serializeWithHeader(originalRequest.header());
+        forwardRequest(originalRequest.context(),
+                buffer,
+                originalRequest.startTimeNanos(),
+                newRequestBody,
+                originalRequest::toString,
+                responseCallback);
+    }
+
+    /**
+     * Forward given request to the active controller.
+     *
+     * @param requestContext      The request context of the original envelope 
request.
+     * @param requestBufferCopy   The request buffer we want to send. This 
should not be the original
+     *                            byte buffer from the envelope request, since 
we will be mutating
+     *                            the position and limit fields. It should be 
a copy.
+     * @param requestCreationNs   The request creation timestamp in 
nanoseconds.
+     * @param requestBody         The AbstractRequest we are sending.
+     * @param requestToString     A callback which can be invoked to produce a 
human-readable description
+     *                            of the request.
+     * @param responseCallback    A callback which takes in an 
`Optional&lt;AbstractResponse&gt;`.
+     *                            We will call this function with 
Optional.of(x) after the controller responds with x.
+     *                            Or, if the controller doesn't support the 
request version, we will complete
+     *                            the callback with Optional.empty().
+     */
+    void forwardRequest(
+            RequestContext requestContext,

Review Comment:
   I guess we could simplify this signature
   ```java
       void forwardRequest(
               Request request,
               ByteBuffer requestBufferCopy,
               AbstractRequest requestBody,
               Consumer<Optional<AbstractResponse>> responseCallback);
   ```



##########
server/src/main/java/org/apache/kafka/server/ForwardingManager.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.network.Request;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public interface ForwardingManager {
+
+    /**
+     * Close the forwarding manager
+     */
+    void close();
+
+    /**
+     * Forward given request to the active controller.
+     *
+     * @param originalRequest     The request to forward.
+     * @param responseCallback    A callback which takes in an 
`Optional&lt;AbstractResponse&gt;`.
+     *                            We will call this function with 
Optional.of(x) after the controller responds with x.
+     *                            Or, if the controller doesn't support the 
request version, we will complete
+     *                            the callback with Optional.empty().
+     */
+    default void forwardRequest(
+            Request originalRequest,
+            Consumer<Optional<AbstractResponse>> responseCallback
+    ) {
+        ByteBuffer buffer = originalRequest.buffer().duplicate();
+        buffer.flip();
+        forwardRequest(originalRequest.context(),
+                buffer,
+                originalRequest.startTimeNanos(),
+                originalRequest.body(AbstractRequest.class),

Review Comment:
   This type check is redundant since the body is definitely an 
`AbstractRequest`. Maybe we could have a specialized body() method that simply 
returns `AbstractRequest`



##########
server/src/main/java/org/apache/kafka/server/ForwardingManagerImpl.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.EnvelopeRequest;
+import org.apache.kafka.common.requests.EnvelopeResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.server.common.ControllerRequestCompletionHandler;
+import org.apache.kafka.server.common.NodeToControllerChannelManager;
+import org.apache.kafka.server.metrics.ForwardingManagerMetrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+public class ForwardingManagerImpl implements ForwardingManager, AutoCloseable 
{
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForwardingManagerImpl.class);
+
+    private final NodeToControllerChannelManager channelManager;
+    private final ForwardingManagerMetrics forwardingManagerMetrics;
+
+    public ForwardingManagerImpl(NodeToControllerChannelManager 
channelManager, Metrics metrics) {
+        this.channelManager = channelManager;
+        this.forwardingManagerMetrics = new ForwardingManagerMetrics(metrics, 
channelManager.getTimeoutMs());
+    }
+
+    ForwardingManagerMetrics forwardingManagerMetrics() {
+        return forwardingManagerMetrics;
+    }
+
+    @Override
+    public void forwardRequest(
+            RequestContext requestContext,
+            ByteBuffer requestBufferCopy,
+            long requestCreationNs,
+            AbstractRequest requestBody,
+            Supplier<String> requestToString,
+            Consumer<Optional<AbstractResponse>> responseCallback) {
+        EnvelopeRequest.Builder envelopeRequest = 
ForwardingManagerUtil.buildEnvelopeRequest(requestContext, requestBufferCopy);
+        long requestCreationTimeMs = 
TimeUnit.NANOSECONDS.toMillis(requestCreationNs);
+
+        class ForwardingResponseHandler implements 
ControllerRequestCompletionHandler {
+
+            @Override
+            public void onComplete(ClientResponse clientResponse) {
+                forwardingManagerMetrics.decrementQueueLength();
+                
forwardingManagerMetrics.remoteTimeMsHist().record(clientResponse.requestLatencyMs());
+                
forwardingManagerMetrics.queueTimeMsHist().record(clientResponse.receivedTimeMs()
 - clientResponse.requestLatencyMs() - requestCreationTimeMs);
+
+                if (clientResponse.versionMismatch() != null) {
+                    LOG.debug("Returning `UNKNOWN_SERVER_ERROR` in response to 
{} due to unexpected version error",
+                            requestToString.get(), 
clientResponse.versionMismatch());
+                    
responseCallback.accept(Optional.of(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception())));
+                } else if (clientResponse.authenticationException() != null) {
+                    LOG.debug("Returning `UNKNOWN_SERVER_ERROR` in response to 
{} due to authentication error",
+                            requestToString.get(), 
clientResponse.authenticationException());
+                    
responseCallback.accept(Optional.of(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception())));
+                } else {
+                    EnvelopeResponse envelopeResponse = (EnvelopeResponse) 
clientResponse.responseBody();
+                    Errors envelopeError = envelopeResponse.error();
+
+                    // Unsupported version indicates an incompatibility 
between controller and client API versions. This
+                    // could happen when the controller changed after the 
connection was established. The forwarding broker
+                    // should close the connection with the client and let it 
reinitialize the connection and refresh
+                    // the controller API versions.
+                    if (envelopeError == Errors.UNSUPPORTED_VERSION) {
+                        responseCallback.accept(Optional.empty());
+                    } else {
+                        AbstractResponse response;
+                        if (envelopeError != Errors.NONE) {
+                            // A general envelope error indicates broker 
misconfiguration (e.g. the principal serde
+                            // might not be defined on the receiving broker). 
In this case, we do not return
+                            // the error directly to the client since it would 
not be expected. Instead, we
+                            // return `UNKNOWN_SERVER_ERROR` so that the user 
knows that there is a problem
+                            // on the broker.
+                            LOG.debug("Forwarded request {} failed with an 
error in the envelope response {}",
+                                    requestToString.get(), envelopeError);
+                            response = 
requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception());
+                        } else {
+                            response = 
parseResponse(envelopeResponse.responseData(), requestBody, 
requestContext.header);
+                        }
+                        responseCallback.accept(Optional.of(response));
+                    }
+                }
+            }
+
+            @Override
+            public void onTimeout() {
+                LOG.debug("Forwarding of the request {} failed due to timeout 
exception", requestToString.get());
+                forwardingManagerMetrics.decrementQueueLength();
+                
forwardingManagerMetrics.queueTimeMsHist().record(channelManager.getTimeoutMs());
+                AbstractResponse response = requestBody.getErrorResponse(new 
TimeoutException());
+                responseCallback.accept(Optional.of(response));
+            }
+        }
+
+        forwardingManagerMetrics.incrementQueueLength();
+        channelManager.sendRequest(envelopeRequest, new 
ForwardingResponseHandler());
+    }
+
+    @Override
+    public void close() {
+        forwardingManagerMetrics.close();
+    }
+
+    @Override
+    public Optional<NodeApiVersions> controllerApiVersions() {
+        return channelManager.controllerApiVersions();
+    }
+
+    private AbstractResponse parseResponse(ByteBuffer buffer, AbstractRequest 
request, RequestHeader header) {

Review Comment:
   This could be a static method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to