Repository: incubator-eagle Updated Branches: refs/heads/master 8fe968cb7 -> e24de5c7e
[MINOR] fix job & task attempt stream publisher bug in storm env Author: wujinhu <wujinhu...@126.com> Closes #747 from wujinhu/EAGLE-796. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e24de5c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e24de5c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e24de5c7 Branch: refs/heads/master Commit: e24de5c7e85f7dba7eb5dfc4aee56b421c0e8ec4 Parents: 8fe968c Author: wujinhu <wujinhu...@126.com> Authored: Fri Dec 16 11:23:40 2016 +0800 Committer: wujinhu <wujinhu...@126.com> Committed: Fri Dec 16 11:23:40 2016 +0800 ---------------------------------------------------------------------- .../jpm/mr/history/MRHistoryJobApplication.java | 15 +++++------- .../crawler/DefaultJHFInputStreamCallback.java | 6 ++--- .../jpm/mr/history/parser/JHFParserFactory.java | 3 +-- .../JobEntityCreationEagleServiceListener.java | 8 +++---- .../history/publisher/JobStreamPublisher.java | 4 ++-- .../mr/history/publisher/StreamPublisher.java | 13 +++++++++-- .../publisher/TaskAttemptStreamPublisher.java | 4 ++-- .../jpm/mr/history/storm/JobHistorySpout.java | 24 ++++++++++++-------- ....history.MRHistoryJobApplicationProvider.xml | 15 ++++++++---- 9 files changed, 53 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java index e2695a3..4d02e91 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplication.java @@ -24,6 +24,7 @@ import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilter; import org.apache.eagle.jpm.mr.history.crawler.JobHistoryContentFilterBuilder; import org.apache.eagle.jpm.mr.history.crawler.JobHistorySpoutCollectorInterceptor; import org.apache.eagle.jpm.mr.history.publisher.JobStreamPublisher; +import org.apache.eagle.jpm.mr.history.publisher.StreamPublisher; import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager; import org.apache.eagle.jpm.mr.history.publisher.TaskAttemptStreamPublisher; import org.apache.eagle.jpm.mr.history.storm.JobHistorySpout; @@ -67,8 +68,7 @@ public class MRHistoryJobApplication extends StormApplication { TopologyBuilder topologyBuilder = new TopologyBuilder(); String spoutName = "mrHistoryJobSpout"; int tasks = jhfAppConf.getInt("stormConfig.mrHistoryJobSpoutTasks"); - JobHistorySpoutCollectorInterceptor collectorInterceptor = new JobHistorySpoutCollectorInterceptor(); - JobHistorySpout jobHistorySpout = new JobHistorySpout(filter, appConfig, collectorInterceptor); + JobHistorySpout jobHistorySpout = new JobHistorySpout(filter, appConfig); topologyBuilder.setSpout( spoutName, jobHistorySpout, @@ -89,14 +89,11 @@ public class MRHistoryJobApplication extends StormApplication { String spoutToTaskAttemptSinkName = spoutName + "_to_" + taskAttemptSinkBoltName; taskAttemptKafkaBoltDeclarer.shuffleGrouping(spoutName, spoutToTaskAttemptSinkName); - List<String> streams = new ArrayList<>(); - streams.add(spoutToJobSinkName); - streams.add(spoutToTaskAttemptSinkName); - jobHistorySpout.setStreams(streams); + List<StreamPublisher> streamPublishers = new ArrayList<>(); + streamPublishers.add(new JobStreamPublisher(spoutToJobSinkName)); + streamPublishers.add(new TaskAttemptStreamPublisher(spoutToTaskAttemptSinkName)); + jobHistorySpout.setStreamPublishers(streamPublishers); - //4, add stream publisher - StreamPublisherManager.getInstance().addStreamPublisher(new JobStreamPublisher(spoutToJobSinkName, collectorInterceptor)); - StreamPublisherManager.getInstance().addStreamPublisher(new TaskAttemptStreamPublisher(spoutToTaskAttemptSinkName, collectorInterceptor)); return topologyBuilder.createTopology(); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java index 2b5e15c..262054e 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/DefaultJHFInputStreamCallback.java @@ -34,12 +34,10 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { private static final Logger LOG = LoggerFactory.getLogger(DefaultJHFInputStreamCallback.class); private JobHistoryContentFilter filter; - private EagleOutputCollector collector; private MRHistoryJobConfig appConfig; - public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, EagleOutputCollector eagleCollector, MRHistoryJobConfig appConfig) { + public DefaultJHFInputStreamCallback(JobHistoryContentFilter filter, MRHistoryJobConfig appConfig) { this.filter = filter; - this.collector = eagleCollector; this.appConfig = appConfig; } @@ -57,7 +55,7 @@ public class DefaultJHFInputStreamCallback implements JHFInputStreamCallback { jobFileInputStream.close(); } else { //get parser and parse, do not need to emit data now - JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter, this.collector, appConfig); + JHFParserBase parser = JHFParserFactory.getParser(baseTags, conf, filter, appConfig); parser.parse(jobFileInputStream); jobFileInputStream.close(); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java index 784d107..ca49d9c 100755 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JHFParserFactory.java @@ -34,12 +34,11 @@ public class JHFParserFactory { public static JHFParserBase getParser(Map<String, String> baseTags, Configuration configuration, JobHistoryContentFilter filter, - EagleOutputCollector outputCollector, MRHistoryJobConfig appConfig) { MRHistoryJobConfig.EagleServiceConfig eagleServiceConfig = appConfig.getEagleServiceConfig(); JHFMRVer2EventReader reader2 = new JHFMRVer2EventReader(baseTags, configuration, filter, appConfig); - reader2.addListener(new JobEntityCreationEagleServiceListener(outputCollector, appConfig)); + reader2.addListener(new JobEntityCreationEagleServiceListener(appConfig)); reader2.addListener(new TaskFailureListener(eagleServiceConfig)); reader2.addListener(new TaskAttemptCounterListener(eagleServiceConfig)); reader2.addListener(new JobConfigurationCreationServiceListener(eagleServiceConfig)); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java index 0c7f8c8..80b4049 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobEntityCreationEagleServiceListener.java @@ -52,19 +52,17 @@ public class JobEntityCreationEagleServiceListener implements HistoryJobEntityCr List<TaskAttemptErrorCategoryEntity> taskAttemptErrors = new ArrayList<>(); private JobExecutionMetricsCreationListener jobExecutionMetricsCreationListener = new JobExecutionMetricsCreationListener(); private TimeZone timeZone; - private EagleOutputCollector collector; private MRHistoryJobConfig appConfig; - public JobEntityCreationEagleServiceListener(EagleOutputCollector collector, MRHistoryJobConfig appConfig) { - this(BATCH_SIZE, collector, appConfig); + public JobEntityCreationEagleServiceListener(MRHistoryJobConfig appConfig) { + this(BATCH_SIZE, appConfig); } - public JobEntityCreationEagleServiceListener(int batchSize, EagleOutputCollector collector, MRHistoryJobConfig appConfig) { + public JobEntityCreationEagleServiceListener(int batchSize, MRHistoryJobConfig appConfig) { if (batchSize <= 0) { throw new IllegalArgumentException("batchSize must be greater than 0 when it is provided"); } this.batchSize = batchSize; - this.collector = collector; this.appConfig = appConfig; timeZone = TimeZone.getTimeZone(appConfig.getJobHistoryEndpointConfig().timeZone); } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java index 06f1ff2..92c415c 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/JobStreamPublisher.java @@ -31,8 +31,8 @@ import java.util.Map; public class JobStreamPublisher extends StreamPublisher<JobExecutionAPIEntity> { private static final Logger LOG = LoggerFactory.getLogger(JobStreamPublisher.class); - public JobStreamPublisher(String stormStreamId, EagleOutputCollector collector) { - super(stormStreamId, collector); + public JobStreamPublisher(String stormStreamId) { + super(stormStreamId); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java index 17eb922..00f9317 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/StreamPublisher.java @@ -22,17 +22,26 @@ import org.apache.eagle.jpm.mr.history.crawler.EagleOutputCollector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class StreamPublisher<T> { +import java.io.Serializable; + +public abstract class StreamPublisher<T> implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(StreamPublisher.class); protected String stormStreamId; protected EagleOutputCollector collector; - public StreamPublisher(String stormStreamId, EagleOutputCollector collector) { + public StreamPublisher(String stormStreamId) { this.stormStreamId = stormStreamId; + } + + public void setCollector(EagleOutputCollector collector) { this.collector = collector; } + public String stormStreamId() { + return stormStreamId; + } + public abstract Class<?> type(); public abstract void flush(T entity); http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java index e196604..758c53d 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/publisher/TaskAttemptStreamPublisher.java @@ -31,8 +31,8 @@ import java.util.Map; public class TaskAttemptStreamPublisher extends StreamPublisher<TaskAttemptExecutionAPIEntity> { private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptStreamPublisher.class); - public TaskAttemptStreamPublisher(String stormStreamId, EagleOutputCollector collector) { - super(stormStreamId, collector); + public TaskAttemptStreamPublisher(String stormStreamId) { + super(stormStreamId); } @Override http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java index 2157da9..0cd30ae 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java @@ -25,6 +25,8 @@ import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import org.apache.eagle.jpm.mr.history.MRHistoryJobConfig; import org.apache.eagle.jpm.mr.history.crawler.*; +import org.apache.eagle.jpm.mr.history.publisher.StreamPublisher; +import org.apache.eagle.jpm.mr.history.publisher.StreamPublisherManager; import org.apache.eagle.jpm.mr.history.zkres.JobHistoryZKStateManager; import org.apache.eagle.jpm.mr.historyentity.JobProcessTimeStampEntity; import org.apache.eagle.jpm.util.DefaultJobIdPartitioner; @@ -97,17 +99,17 @@ public class JobHistorySpout extends BaseRichSpout { private static final int MAX_RETRY_TIMES = 3; private MRHistoryJobConfig appConfig; private JobHistoryEndpointConfig jobHistoryEndpointConfig; - private List<String> streams; + private List<StreamPublisher> streamPublishers; /** * mostly this constructor signature is for unit test purpose as you can put customized interceptor here. */ - public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig appConfig, JobHistorySpoutCollectorInterceptor adaptor) { + public JobHistorySpout(JobHistoryContentFilter filter, MRHistoryJobConfig appConfig) { this.contentFilter = filter; - this.interceptor = adaptor; + this.interceptor = new JobHistorySpoutCollectorInterceptor(); this.appConfig = appConfig; jobHistoryEndpointConfig = appConfig.getJobHistoryEndpointConfig(); - callback = new DefaultJHFInputStreamCallback(contentFilter, interceptor, appConfig); + callback = new DefaultJHFInputStreamCallback(contentFilter, appConfig); } private int calculatePartitionId(TopologyContext context) { @@ -175,8 +177,8 @@ public class JobHistorySpout extends BaseRichSpout { } } - public void setStreams(List<String> streams) { - this.streams = streams; + public void setStreamPublishers(List<StreamPublisher> streamPublishers) { + this.streamPublishers = streamPublishers; } /** @@ -184,10 +186,14 @@ public class JobHistorySpout extends BaseRichSpout { */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (streams != null) { - for (String streamId : streams) { - declarer.declareStream(streamId, new Fields("f1", "message")); + if (streamPublishers != null) { + for (StreamPublisher streamPublisher : streamPublishers) { + declarer.declareStream(streamPublisher.stormStreamId(), new Fields("f1", "message")); + streamPublisher.setCollector(this.interceptor); + StreamPublisherManager.getInstance().addStreamPublisher(streamPublisher); } + } else { + declarer.declare(new Fields("f1", "message")); } } http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e24de5c7/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml ---------------------------------------------------------------------- diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml index 1c3b5cb..ccf3c6b 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/META-INF/providers/org.apache.eagle.jpm.mr.history.MRHistoryJobApplicationProvider.xml @@ -75,10 +75,17 @@ <required>true</required> </property> <property> - <name>dataSinkConfig.topic</name> - <displayName>Destination(Kafka Topic) Of Stream Data</displayName> - <value>sandbox_map_reduce_failed_job</value> - <description>application emits stream data to this kafka topic</description> + <name>dataSinkConfig.MAP_REDUCE_JOB_STREAM.topic</name> + <displayName>Destination(Kafka Topic) Of Job Stream Data</displayName> + <value>sandbox-map_reduce_job</value> + <description>application emits job stream data to this kafka topic</description> + <required>true</required> + </property> + <property> + <name>dataSinkConfig.MAP_REDUCE_TASK_ATTEMPT_STREAM.topic</name> + <displayName>Destination(Kafka Topic) Of Task Attempt Stream Data</displayName> + <value>sandbox-map_reduce_task_attempt</value> + <description>application emits task attempt stream data to this kafka topic</description> <required>true</required> </property> <property>