[ https://issues.apache.org/jira/browse/KAFKA-9981 ]
YANGLiiN deleted comment on KAFKA-9981:
---------------------------------
was (Author: jitabc):
the root cause is the connect doesn't process the mm2 add the 'NOTUSED' url on
the slave node
see
[https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L122]
we should add the "NOTUSED" logic below the code
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1593-L1599]
{code:java}
if (leaderUrl.startsWith("NOTUSED")) {
configBackingStore.putTaskConfigs(connName, rawTaskProps);
cb.onCompletion(null, null);
return;
}
{code}
PR: https://github.com/apache/kafka/pull/12016
> Running a dedicated mm2 cluster with more than one nodes,When the
> configuration is updated the task is not aware and will lose the update
> operation.
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
> Issue Type: Bug
> Components: mirrormaker
> Affects Versions: 2.4.0, 2.5.0, 2.4.1
> Reporter: victor
> Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List<Map<String, String>> rawTaskProps = reverseTransform(connName,
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl,
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null,
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic
> whitelist update.If KafkaConfigBackingStore task is not running on leader
> node,an HTTP request will be send to notify the leader of the configuration
> update.However,dedicated mm2 cluster does not have the HTTP server turned
> on,so the request will fail to be sent,causing the update operation to be
> lost.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)