twobeeb commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565704218
##########
File path:
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##########
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map<?, ?> props) {
public List<SourceAndTarget> clusterPairs() {
List<SourceAndTarget> pairs = new ArrayList<>();
Set<String> clusters = clusters();
+ Map<String, String> originalStrings = originalsStrings();
+ boolean globalHeartbeatsEnabled =
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+ if
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+ globalHeartbeatsEnabled =
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+ }
+
for (String source : clusters) {
for (String target : clusters) {
- SourceAndTarget sourceAndTarget = new SourceAndTarget(source,
target);
if (!source.equals(target)) {
- pairs.add(sourceAndTarget);
+ String clusterPairConfigPrefix = source + "->" + target +
".";
+ boolean clusterPairEnabled =
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix +
"enabled", "false"));
+ boolean clusterPairHeartbeatsEnabled =
globalHeartbeatsEnabled;
+ if (originalStrings.containsKey(clusterPairConfigPrefix +
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+ clusterPairHeartbeatsEnabled =
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix +
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+ }
+
+ if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
Review comment:
@skaundinya15 Thanks for your review.
I'm having a hard time phrasing this properly, suggestions would be welcome.
Is this comment proposition aligned with what you had in mind ?
```suggestion
// By default, all source->target Herder combinations
are created even if `x->y.enabled=false`
// Unless `emit.heartbeats.enabled=false` or
`x->y.emit.heartbeats.enabled=false`
// Reason for this behavior: for a given replication
flow A->B with heartbeats, 2 herders are required :
// B->A for the MirrorHeartbeatConnector (emits
heartbeats into A)
// A->B for the MirrorSourceConnector (actual
replication flow)
if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]