twobeeb commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565109219



##########
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map<?, ?> props) {
     public List<SourceAndTarget> clusterPairs() {
         List<SourceAndTarget> pairs = new ArrayList<>();
         Set<String> clusters = clusters();
+        Map<String, String> originalStrings = originalsStrings();
+        boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+        if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+            globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+        }
+
         for (String source : clusters) {
             for (String target : clusters) {
-                SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
                 if (!source.equals(target)) {
-                    pairs.add(sourceAndTarget);
+                    String clusterPairConfigPrefix = source + "->" + target + 
".";
+                    boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+                    boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+                    if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+                        clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+                    }
+
+                    if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
       That's actually the current behavior, all herders are created and the 
beats are emitted from the "opposite" herder:  
   
   If you have a replication flow from A to B **and you want heartbeats**, you 
need 2 herders :  
   - A->B for the MirrorSourceConnector 
   - B->A for the MirrorHeartbeatConnector 
   
   The MirrorHeartbeatConnector on B->A emits beats into topic heartbeats on 
cluster A.
   The MirrorSourceConnector on A->B then replicated whichever topic is 
configured as well as heartbeats.
   
   All of this is fine with 2-3 clusters. It starts to make less sense when 
using 10+ clusters.
   
   Because this is the current implementation, we decided to keep it as-is so 
as to not cause change in behavior, and add a new top-level property 
"emit.heartbeats.enabled" which defaults to "true".
   
   Existing users will not see any change and if they depend on these 
"opposites" herders for their monitoring, it will still work.  
   New users with more complex use case can change this property and fine tune 
their heartbeat generation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to