[EAGLE-620]: AlertEngine: SpoutWrapper are sending duplicated message

Author: ralphsu

This closes #510


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/2e715e3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/2e715e3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/2e715e3e

Branch: refs/heads/master
Commit: 2e715e3e94a6b98e0f3073d5d384878a0a845b72
Parents: c6ac2eb
Author: Ralph, Su <suliang...@gmail.com>
Authored: Fri Oct 14 17:30:27 2016 +0800
Committer: Ralph, Su <suliang...@gmail.com>
Committed: Fri Oct 14 17:32:25 2016 +0800

----------------------------------------------------------------------
 .../alert/coordinator/impl/MonitorMetadataGenerator.java    | 2 +-
 .../alert/engine/spout/SpoutOutputCollectorWrapper.java     | 9 ++++++---
 2 files changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e715e3e/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
index c5c992b..3f64f86 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
@@ -265,7 +265,7 @@ public class MonitorMetadataGenerator {
             }
         }
         if (targetSm == null) {
-            targetSm = new StreamRepartitionMetadata(datasourceName, 
schema.getStreamId());
+            targetSm = new StreamRepartitionMetadata(topicName, 
schema.getStreamId());
             dsStreamMeta.add(targetSm);
         }
         if (!targetSm.groupingStrategies.contains(gs)) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/2e715e3e/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
index a8dcc0d..e205da4 100644
--- 
a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
+++ 
b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/SpoutOutputCollectorWrapper.java
@@ -18,6 +18,9 @@
  */
 package org.apache.eagle.alert.engine.spout;
 
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.spout.SpoutOutputCollector;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.eagle.alert.coordination.model.SpoutSpec;
 import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
 import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
@@ -29,9 +32,6 @@ import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.alert.engine.serialization.PartitionedEventSerializer;
 import 
org.apache.eagle.alert.engine.serialization.SerializationMetadataProvider;
 import org.apache.eagle.alert.utils.StreamIdConversion;
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,6 +121,9 @@ public class SpoutOutputCollectorWrapper extends 
SpoutOutputCollector implements
             phase 2: stream repartition
         */
         for (StreamRepartitionMetadata md : streamRepartitionMetadataList) {
+            if (!event.getStreamId().equals(md.getStreamId())) {
+                continue;
+            }
             // one stream may have multiple group-by strategies, each strategy 
is for a specific group-by
             for (StreamRepartitionStrategy groupingStrategy : 
md.groupingStrategies) {
                 int hash = 0;

Reply via email to