twobeeb commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565704218



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map<?, ?> props) {
     public List<SourceAndTarget> clusterPairs() {
         List<SourceAndTarget> pairs = new ArrayList<>();
         Set<String> clusters = clusters();
+        Map<String, String> originalStrings = originalsStrings();
+        boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+        if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+            globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+        }
+
         for (String source : clusters) {
             for (String target : clusters) {
-                SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
                 if (!source.equals(target)) {
-                    pairs.add(sourceAndTarget);
+                    String clusterPairConfigPrefix = source + "->" + target + 
".";
+                    boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+                    boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+                    if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+                        clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+                    }
+
+                    if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
       Thanks for your review @skaundinya15.
   I'm having a hard time phrasing this properly, suggestions would be welcome.
   Is this comment proposition aligned with what you had in mind ?
   ```suggestion
                       // By default, all source->target Herder combinations 
are created even if `x->y.enabled=false`
                       // Unless `emit.heartbeats.enabled=false` or 
`x->y.emit.heartbeats.enabled=false`
                       // Reason for this behavior: for a given replication 
flow A->B with heartbeats, 2 herders are required :
                       // B->A for the MirrorHeartbeatConnector (emits 
heartbeats into A)
                       // A->B for the MirrorSourceConnector (actual 
replication flow)
                       if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to