chia7712 commented on code in PR #22408: URL: https://github.com/apache/kafka/pull/22408#discussion_r3414959898
########## server/src/main/java/org/apache/kafka/server/ForwardingManager.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.network.Request; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public interface ForwardingManager { + + /** + * Close the forwarding manager + */ + void close(); Review Comment: Can we just let `ForwardingManager` extend `AutoCloseable`? ########## server/src/main/java/org/apache/kafka/server/ForwardingManager.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.network.Request; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public interface ForwardingManager { + + /** + * Close the forwarding manager + */ + void close(); + + /** + * Forward given request to the active controller. + * + * @param originalRequest The request to forward. + * @param responseCallback A callback which takes in an `Optional<AbstractResponse>`. + * We will call this function with Optional.of(x) after the controller responds with x. + * Or, if the controller doesn't support the request version, we will complete + * the callback with Optional.empty(). + */ + default void forwardRequest( + Request originalRequest, + Consumer<Optional<AbstractResponse>> responseCallback + ) { + ByteBuffer buffer = originalRequest.buffer().duplicate(); + buffer.flip(); + forwardRequest(originalRequest.context(), + buffer, + originalRequest.startTimeNanos(), + originalRequest.body(AbstractRequest.class), + originalRequest::toString, + responseCallback); + } + + /** + * Forward given request to the active controller. + * + * @param originalRequest The request to forward. + * @param newRequestBody The AbstractRequest we are sending. + * @param responseCallback A callback which takes in an `Optional<AbstractResponse>`. + * We will call this function with Optional.of(x) after the controller responds with x. + * Or, if the controller doesn't support the request version, we will complete + * the callback with Optional.empty(). + */ + default void forwardRequest( + Request originalRequest, + AbstractRequest newRequestBody, + Consumer<Optional<AbstractResponse>> responseCallback) { + ByteBuffer buffer = newRequestBody.serializeWithHeader(originalRequest.header()); + forwardRequest(originalRequest.context(), + buffer, + originalRequest.startTimeNanos(), + newRequestBody, + originalRequest::toString, + responseCallback); + } + + /** + * Forward given request to the active controller. + * + * @param requestContext The request context of the original envelope request. + * @param requestBufferCopy The request buffer we want to send. This should not be the original + * byte buffer from the envelope request, since we will be mutating + * the position and limit fields. It should be a copy. + * @param requestCreationNs The request creation timestamp in nanoseconds. + * @param requestBody The AbstractRequest we are sending. + * @param requestToString A callback which can be invoked to produce a human-readable description + * of the request. + * @param responseCallback A callback which takes in an `Optional<AbstractResponse>`. + * We will call this function with Optional.of(x) after the controller responds with x. + * Or, if the controller doesn't support the request version, we will complete + * the callback with Optional.empty(). + */ + void forwardRequest( + RequestContext requestContext, Review Comment: I guess we could simplify this signature ```java void forwardRequest( Request request, ByteBuffer requestBufferCopy, AbstractRequest requestBody, Consumer<Optional<AbstractResponse>> responseCallback); ``` ########## server/src/main/java/org/apache/kafka/server/ForwardingManager.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.network.Request; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public interface ForwardingManager { + + /** + * Close the forwarding manager + */ + void close(); + + /** + * Forward given request to the active controller. + * + * @param originalRequest The request to forward. + * @param responseCallback A callback which takes in an `Optional<AbstractResponse>`. + * We will call this function with Optional.of(x) after the controller responds with x. + * Or, if the controller doesn't support the request version, we will complete + * the callback with Optional.empty(). + */ + default void forwardRequest( + Request originalRequest, + Consumer<Optional<AbstractResponse>> responseCallback + ) { + ByteBuffer buffer = originalRequest.buffer().duplicate(); + buffer.flip(); + forwardRequest(originalRequest.context(), + buffer, + originalRequest.startTimeNanos(), + originalRequest.body(AbstractRequest.class), Review Comment: This type check is redundant since the body is definitely an `AbstractRequest`. Maybe we could have a specialized body() method that simply returns `AbstractRequest` ########## server/src/main/java/org/apache/kafka/server/ForwardingManagerImpl.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.EnvelopeRequest; +import org.apache.kafka.common.requests.EnvelopeResponse; +import org.apache.kafka.common.requests.RequestContext; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.server.common.ControllerRequestCompletionHandler; +import org.apache.kafka.server.common.NodeToControllerChannelManager; +import org.apache.kafka.server.metrics.ForwardingManagerMetrics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public class ForwardingManagerImpl implements ForwardingManager, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ForwardingManagerImpl.class); + + private final NodeToControllerChannelManager channelManager; + private final ForwardingManagerMetrics forwardingManagerMetrics; + + public ForwardingManagerImpl(NodeToControllerChannelManager channelManager, Metrics metrics) { + this.channelManager = channelManager; + this.forwardingManagerMetrics = new ForwardingManagerMetrics(metrics, channelManager.getTimeoutMs()); + } + + ForwardingManagerMetrics forwardingManagerMetrics() { + return forwardingManagerMetrics; + } + + @Override + public void forwardRequest( + RequestContext requestContext, + ByteBuffer requestBufferCopy, + long requestCreationNs, + AbstractRequest requestBody, + Supplier<String> requestToString, + Consumer<Optional<AbstractResponse>> responseCallback) { + EnvelopeRequest.Builder envelopeRequest = ForwardingManagerUtil.buildEnvelopeRequest(requestContext, requestBufferCopy); + long requestCreationTimeMs = TimeUnit.NANOSECONDS.toMillis(requestCreationNs); + + class ForwardingResponseHandler implements ControllerRequestCompletionHandler { + + @Override + public void onComplete(ClientResponse clientResponse) { + forwardingManagerMetrics.decrementQueueLength(); + forwardingManagerMetrics.remoteTimeMsHist().record(clientResponse.requestLatencyMs()); + forwardingManagerMetrics.queueTimeMsHist().record(clientResponse.receivedTimeMs() - clientResponse.requestLatencyMs() - requestCreationTimeMs); + + if (clientResponse.versionMismatch() != null) { + LOG.debug("Returning `UNKNOWN_SERVER_ERROR` in response to {} due to unexpected version error", + requestToString.get(), clientResponse.versionMismatch()); + responseCallback.accept(Optional.of(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception()))); + } else if (clientResponse.authenticationException() != null) { + LOG.debug("Returning `UNKNOWN_SERVER_ERROR` in response to {} due to authentication error", + requestToString.get(), clientResponse.authenticationException()); + responseCallback.accept(Optional.of(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception()))); + } else { + EnvelopeResponse envelopeResponse = (EnvelopeResponse) clientResponse.responseBody(); + Errors envelopeError = envelopeResponse.error(); + + // Unsupported version indicates an incompatibility between controller and client API versions. This + // could happen when the controller changed after the connection was established. The forwarding broker + // should close the connection with the client and let it reinitialize the connection and refresh + // the controller API versions. + if (envelopeError == Errors.UNSUPPORTED_VERSION) { + responseCallback.accept(Optional.empty()); + } else { + AbstractResponse response; + if (envelopeError != Errors.NONE) { + // A general envelope error indicates broker misconfiguration (e.g. the principal serde + // might not be defined on the receiving broker). In this case, we do not return + // the error directly to the client since it would not be expected. Instead, we + // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem + // on the broker. + LOG.debug("Forwarded request {} failed with an error in the envelope response {}", + requestToString.get(), envelopeError); + response = requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception()); + } else { + response = parseResponse(envelopeResponse.responseData(), requestBody, requestContext.header); + } + responseCallback.accept(Optional.of(response)); + } + } + } + + @Override + public void onTimeout() { + LOG.debug("Forwarding of the request {} failed due to timeout exception", requestToString.get()); + forwardingManagerMetrics.decrementQueueLength(); + forwardingManagerMetrics.queueTimeMsHist().record(channelManager.getTimeoutMs()); + AbstractResponse response = requestBody.getErrorResponse(new TimeoutException()); + responseCallback.accept(Optional.of(response)); + } + } + + forwardingManagerMetrics.incrementQueueLength(); + channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler()); + } + + @Override + public void close() { + forwardingManagerMetrics.close(); + } + + @Override + public Optional<NodeApiVersions> controllerApiVersions() { + return channelManager.controllerApiVersions(); + } + + private AbstractResponse parseResponse(ByteBuffer buffer, AbstractRequest request, RequestHeader header) { Review Comment: This could be a static method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
