Repository: incubator-eagle
Updated Branches:
  refs/heads/master 8fe968cb7 -> e24de5c7e


[MINOR] fix job & task attempt stream publisher bug in storm env

Author: wujinhu <wujinhu...@126.com>

Closes #747 from wujinhu/EAGLE-796.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e24de5c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e24de5c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e24de5c7

Branch: refs/heads/master
Commit: e24de5c7e85f7dba7eb5dfc4aee56b421c0e8ec4
Parents: 8fe968c
Author: wujinhu <wujinhu...@126.com>
Authored: Fri Dec 16 11:23:40 2016 +0800
Committer: wujinhu <wujinhu...@126.com>
Committed: Fri Dec 16 11:23:40 2016 +0800

----------------------------------------------------------------------
 .../jpm/mr/history/MRHistoryJobApplication.java | 15 +++++-------
 .../crawler/DefaultJHFInputStreamCallback.java  |  6 ++---
 .../jpm/mr/history/parser/JHFParserFactory.java |  3 +--
 .../JobEntityCreationEagleServiceListener.java  |  8 +++----
 .../history/publisher/JobStreamPublisher.java   |  4 ++--
 .../mr/history/publisher/StreamPublisher.java   | 13 +++++++++--
 .../publisher/TaskAttemptStreamPublisher.java   |  4 ++--
 .../jpm/mr/history/storm/JobHistorySpout.java   | 24 ++++++++++++--------
 ....history.MRHistoryJobApplicationProvider.xml | 15 ++++++++----
 9 files changed, 53 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
index e2695a3..4d02e91 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java
@@ -24,6 +24,7 @@ import 
org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter;
 import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder;
 import 
org.apache.eagle.jpm.mr.history.crawler.JobHistorySpoutCollectorInterceptor;
 import org.apache.eagle.jpm.mr.history.publisher.JobStreamPublisher;
+import org.apache.eagle.jpm.mr.history.publisher.StreamPublisher;
 import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager;
 import org.apache.eagle.jpm.mr.history.publisher.TaskAttemptStreamPublisher;
 import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout;
@@ -67,8 +68,7 @@ public class MRHistoryJobApplication extends StormApplication 
{
         TopologyBuilder topologyBuilder = new TopologyBuilder();
         String spoutName = "mrHistoryJobSpout";
         int tasks = jhfAppConf.getInt("stormConfig.mrHistoryJobSpoutTasks");
-        JobHistorySpoutCollectorInterceptor collectorInterceptor = new 
JobHistorySpoutCollectorInterceptor();
-        JobHistorySpout jobHistorySpout = new JobHistorySpout(filter, 
appConfig, collectorInterceptor);
+        JobHistorySpout jobHistorySpout = new JobHistorySpout(filter, 
appConfig);
         topologyBuilder.setSpout(
                 spoutName,
                 jobHistorySpout,
@@ -89,14 +89,11 @@ public class MRHistoryJobApplication extends 
StormApplication {
         String spoutToTaskAttemptSinkName = spoutName + "_to_" + 
taskAttemptSinkBoltName;
         taskAttemptKafkaBoltDeclarer.shuffleGrouping(spoutName, 
spoutToTaskAttemptSinkName);
 
-        List<String> streams = new ArrayList<>();
-        streams.add(spoutToJobSinkName);
-        streams.add(spoutToTaskAttemptSinkName);
-        jobHistorySpout.setStreams(streams);
+        List<StreamPublisher> streamPublishers = new ArrayList<>();
+        streamPublishers.add(new JobStreamPublisher(spoutToJobSinkName));
+        streamPublishers.add(new 
TaskAttemptStreamPublisher(spoutToTaskAttemptSinkName));
+        jobHistorySpout.setStreamPublishers(streamPublishers);
 
-        //4, add stream publisher
-        StreamPublisherManager.getInstance().addStreamPublisher(new 
JobStreamPublisher(spoutToJobSinkName, collectorInterceptor));
-        StreamPublisherManager.getInstance().addStreamPublisher(new 
TaskAttemptStreamPublisher(spoutToTaskAttemptSinkName, collectorInterceptor));
         return topologyBuilder.createTopology();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
index 2b5e15c..262054e 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java
@@ -34,12 +34,10 @@ public class DefaultJHFInputStreamCallback implements 
JHFInputStreamCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultJHFInputStreamCallback.class);
 
     private JobHistoryContentFilter filter;
-    private EagleOutputCollector collector;
     private MRHistoryJobConfig appConfig;
 
-    public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, 
EagleOutputCollector eagleCollector, MRHistoryJobConfig appConfig) {
+    public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, 
MRHistoryJobConfig appConfig) {
         this.filter = filter;
-        this.collector = eagleCollector;
         this.appConfig = appConfig;
     }
 
@@ -57,7 +55,7 @@ public class DefaultJHFInputStreamCallback implements 
JHFInputStreamCallback {
             jobFileInputStream.close();
         } else {
             //get parser and parse, do not need to emit data now
-            JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, 
filter, this.collector, appConfig);
+            JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, 
filter, appConfig);
             parser.parse(jobFileInputStream);
             jobFileInputStream.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
index 784d107..ca49d9c 100755
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java
@@ -34,12 +34,11 @@ public class JHFParserFactory {
     public static JHFParserBase getParser(Map<String, String> baseTags,
                                           Configuration configuration,
                                           JobHistoryContentFilter filter,
-                                          EagleOutputCollector outputCollector,
                                           MRHistoryJobConfig appConfig) {
         MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = 
appConfig.getEagleServiceConfig();
 
         JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, 
configuration, filter, appConfig);
-        reader2.addListener(new 
JobEntityCreationEagleServiceListener(outputCollector, appConfig));
+        reader2.addListener(new 
JobEntityCreationEagleServiceListener(appConfig));
         reader2.addListener(new TaskFailureListener(eagleServiceConfig));
         reader2.addListener(new 
TaskAttemptCounterListener(eagleServiceConfig));
         reader2.addListener(new 
JobConfigurationCreationServiceListener(eagleServiceConfig));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
index 0c7f8c8..80b4049 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java
@@ -52,19 +52,17 @@ public class JobEntityCreationEagleServiceListener 
implements HistoryJobEntityCr
     List<TaskAttemptErrorCategoryEntity> taskAttemptErrors = new ArrayList<>();
     private JobExecutionMetricsCreationListener 
jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener();
     private TimeZone timeZone;
-    private EagleOutputCollector collector;
     private MRHistoryJobConfig appConfig;
 
-    public JobEntityCreationEagleServiceListener(EagleOutputCollector 
collector, MRHistoryJobConfig appConfig) {
-        this(BATCH_SIZE, collector, appConfig);
+    public JobEntityCreationEagleServiceListener(MRHistoryJobConfig appConfig) 
{
+        this(BATCH_SIZE, appConfig);
     }
 
-    public JobEntityCreationEagleServiceListener(int batchSize, 
EagleOutputCollector collector, MRHistoryJobConfig appConfig) {
+    public JobEntityCreationEagleServiceListener(int batchSize, 
MRHistoryJobConfig appConfig) {
         if (batchSize <= 0) {
             throw new IllegalArgumentException("batchSize must be greater than 
0 when it is provided");
         }
         this.batchSize = batchSize;
-        this.collector = collector;
         this.appConfig = appConfig;
         timeZone = 
TimeZone.getTimeZone(appConfig.getJobHistoryEndpointConfig().timeZone);
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
index 06f1ff2..92c415c 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java
@@ -31,8 +31,8 @@ import java.util.Map;
 public class JobStreamPublisher extends StreamPublisher<JobExecutionAPIEntity> 
{
     private static final Logger LOG = 
LoggerFactory.getLogger(JobStreamPublisher.class);
 
-    public JobStreamPublisher(String stormStreamId, EagleOutputCollector 
collector) {
-        super(stormStreamId, collector);
+    public JobStreamPublisher(String stormStreamId) {
+        super(stormStreamId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java
index 17eb922..00f9317 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java
@@ -22,17 +22,26 @@ import 
org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class StreamPublisher<T> {
+import java.io.Serializable;
+
+public abstract class StreamPublisher<T> implements Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(StreamPublisher.class);
 
     protected String stormStreamId;
     protected EagleOutputCollector collector;
 
-    public StreamPublisher(String stormStreamId, EagleOutputCollector 
collector) {
+    public StreamPublisher(String stormStreamId) {
         this.stormStreamId = stormStreamId;
+    }
+
+    public void setCollector(EagleOutputCollector collector) {
         this.collector = collector;
     }
 
+    public String stormStreamId() {
+        return stormStreamId;
+    }
+
     public abstract Class<?> type();
 
     public abstract void flush(T entity);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java
index e196604..758c53d 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java
@@ -31,8 +31,8 @@ import java.util.Map;
 public class TaskAttemptStreamPublisher extends 
StreamPublisher<TaskAttemptExecutionAPIEntity> {
     private static final Logger LOG = 
LoggerFactory.getLogger(TaskAttemptStreamPublisher.class);
 
-    public TaskAttemptStreamPublisher(String stormStreamId, 
EagleOutputCollector collector) {
-        super(stormStreamId, collector);
+    public TaskAttemptStreamPublisher(String stormStreamId) {
+        super(stormStreamId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
index 2157da9..0cd30ae 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java
@@ -25,6 +25,8 @@ import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig;
 import org.apache.eagle.jpm.mr.history.crawler.*;
+import org.apache.eagle.jpm.mr.history.publisher.StreamPublisher;
+import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager;
 import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager;
 import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity;
 import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
@@ -97,17 +99,17 @@ public class JobHistorySpout extends BaseRichSpout {
     private static final int MAX_RETRY_TIMES = 3;
     private MRHistoryJobConfig appConfig;
     private JobHistoryEndpointConfig jobHistoryEndpointConfig;
-    private List<String> streams;
+    private List<StreamPublisher> streamPublishers;
 
     /**
      * mostly this constructor signature is for unit test purpose as you can 
put customized interceptor here.
      */
-    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig 
appConfig, JobHistorySpoutCollectorInterceptor adaptor) {
+    public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig 
appConfig) {
         this.contentFilter = filter;
-        this.interceptor = adaptor;
+        this.interceptor = new JobHistorySpoutCollectorInterceptor();
         this.appConfig = appConfig;
         jobHistoryEndpointConfig = appConfig.getJobHistoryEndpointConfig();
-        callback = new DefaultJHFInputStreamCallback(contentFilter, 
interceptor,  appConfig);
+        callback = new DefaultJHFInputStreamCallback(contentFilter, appConfig);
     }
 
     private int calculatePartitionId(TopologyContext context) {
@@ -175,8 +177,8 @@ public class JobHistorySpout extends BaseRichSpout {
         }
     }
 
-    public void setStreams(List<String> streams) {
-        this.streams = streams;
+    public void setStreamPublishers(List<StreamPublisher> streamPublishers) {
+        this.streamPublishers = streamPublishers;
     }
 
     /**
@@ -184,10 +186,14 @@ public class JobHistorySpout extends BaseRichSpout {
      */
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        if (streams != null) {
-            for (String streamId : streams) {
-                declarer.declareStream(streamId, new Fields("f1", "message"));
+        if (streamPublishers != null) {
+            for (StreamPublisher streamPublisher : streamPublishers) {
+                declarer.declareStream(streamPublisher.stormStreamId(), new 
Fields("f1", "message"));
+                streamPublisher.setCollector(this.interceptor);
+                
StreamPublisherManager.getInstance().addStreamPublisher(streamPublisher);
             }
+        } else {
+            declarer.declare(new Fields("f1", "message"));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
index 1c3b5cb..ccf3c6b 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml
@@ -75,10 +75,17 @@
             <required>true</required>
         </property>
         <property>
-            <name>dataSinkConfig.topic</name>
-            <displayName>Destination(Kafka Topic) Of Stream Data</displayName>
-            <value>sandbox_map_reduce_failed_job</value>
-            <description>application emits stream data to this kafka 
topic</description>
+            <name>dataSinkConfig.MAP_REDUCE_JOB_STREAM.topic</name>
+            <displayName>Destination(Kafka Topic) Of Job Stream 
Data</displayName>
+            <value>sandbox-map_reduce_job</value>
+            <description>application emits job stream data to this kafka 
topic</description>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSinkConfig.MAP_REDUCE_TASK_ATTEMPT_STREAM.topic</name>
+            <displayName>Destination(Kafka Topic) Of Task Attempt Stream 
Data</displayName>
+            <value>sandbox-map_reduce_task_attempt</value>
+            <description>application emits task attempt stream data to this 
kafka topic</description>
             <required>true</required>
         </property>
         <property>

Reply via email to