[EAGLE-620]: AlertEngine: SpoutWrapper are sending duplicated message Author: ralphsu
This closes #510 Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2e715e3e Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2e715e3e Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2e715e3e Branch: refs/heads/master Commit: 2e715e3e94a6b98e0f3073d5d384878a0a845b72 Parents: c6ac2eb Author: Ralph, Su <suliang...@gmail.com> Authored: Fri Oct 14 17:30:27 2016 +0800 Committer: Ralph, Su <suliang...@gmail.com> Committed: Fri Oct 14 17:32:25 2016 +0800 ---------------------------------------------------------------------- .../alert/coordinator/impl/MonitorMetadataGenerator.java | 2 +- .../alert/engine/spout/SpoutOutputCollectorWrapper.java | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e715e3e/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java index c5c992b..3f64f86 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java @@ -265,7 +265,7 @@ public class MonitorMetadataGenerator { } } if (targetSm == null) { - targetSm = new StreamRepartitionMetadata(datasourceName, schema.getStreamId()); + targetSm = new StreamRepartitionMetadata(topicName, schema.getStreamId()); dsStreamMeta.add(targetSm); } if (!targetSm.groupingStrategies.contains(gs)) { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e715e3e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java index a8dcc0d..e205da4 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java @@ -18,6 +18,9 @@ */ package org.apache.eagle.alert.engine.spout; +import backtype.storm.spout.ISpoutOutputCollector; +import backtype.storm.spout.SpoutOutputCollector; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.eagle.alert.coordination.model.SpoutSpec; import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata; import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy; @@ -29,9 +32,6 @@ import org.apache.eagle.alert.engine.model.StreamEvent; import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer; import org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider; import org.apache.eagle.alert.utils.StreamIdConversion; -import backtype.storm.spout.ISpoutOutputCollector; -import backtype.storm.spout.SpoutOutputCollector; -import org.apache.commons.lang3.builder.HashCodeBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,6 +121,9 @@ public class SpoutOutputCollectorWrapper extends SpoutOutputCollector implements phase 2: stream repartition */ for (StreamRepartitionMetadata md : streamRepartitionMetadataList) { + if (!event.getStreamId().equals(md.getStreamId())) { + continue; + } // one stream may have multiple group-by strategies, each strategy is for a specific group-by for (StreamRepartitionStrategy groupingStrategy : md.groupingStrategies) { int hash = 0;