[ https://issues.apache.org/jira/browse/KAFKA-9981 ]


    YANGLiiN deleted comment on KAFKA-9981:
    ---------------------------------

was (Author: jitabc):
the root cause is the connect doesn't process the mm2 add the 'NOTUSED' url on 
the slave node

see 
[https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java#L122]

 

we should add the "NOTUSED" logic below the code 

[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1593-L1599]

 
{code:java}
if (leaderUrl.startsWith("NOTUSED")) {
    configBackingStore.putTaskConfigs(connName, rawTaskProps);
    cb.onCompletion(null, null);
    return;
}

{code}

PR: https://github.com/apache/kafka/pull/12016

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> ----------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9981
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9981
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.4.0, 2.5.0, 2.4.1
>            Reporter: victor
>            Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
>     List<Map<String, String>> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
>     if (isLeader()) {
>         configBackingStore.putTaskConfigs(connName, rawTaskProps);
>         cb.onCompletion(null, null);
>     } else {
>         // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
>         // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
>         forwardRequestExecutor.submit(new Runnable() {
>             @Override
>             public void run() {
>                 try {
>                     String leaderUrl = leaderUrl();
>                     if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
>                         cb.onCompletion(new ConnectException("Request to 
> leader to " +
>                                 "reconfigure connector tasks failed " +
>                                 "because the URL of the leader's REST 
> interface is empty!"), null);
>                         return;
>                     }
>                     String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
>                     log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
>                     RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
>                     cb.onCompletion(null, null);
>                 } catch (ConnectException e) {
>                     log.error("Request to leader to reconfigure connector 
> tasks failed", e);
>                     cb.onCompletion(e, null);
>                 }
>             }
>         });
>     }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to