Repository: incubator-eagle
Updated Branches:
  refs/heads/master 158a1524b -> d100d03ed


[EAGLE-551] fix some bugs and improvement of mr history feeder

https://issues.apache.org/jira/browse/EAGLE-551

1. bug fix:
1.1 mr running feeder does not set internalState to FINISHED when job finished.
1.2 JHFSparkEventReader should not reload config, it should use config that 
inited by framework
1.3 JHFSparkEventReader flushed entities failed, but it marks the application 
succeed
2. improvement: move hdfs ha configuration from (core-site.xml/hdfs-site.xml) 
to application.conf

Author: wujinhu <wujinhu...@126.com>

Closes #444 from wujinhu/EAGLE-551.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/d100d03e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/d100d03e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/d100d03e

Branch: refs/heads/master
Commit: d100d03ed0cadcb6f9a024147868e601a8383b85
Parents: 158a152
Author: wujinhu <wujinhu...@126.com>
Authored: Tue Sep 20 18:29:03 2016 +0800
Committer: Qingwen Zhao <qingwen...@gmail.com>
Committed: Tue Sep 20 18:29:03 2016 +0800

----------------------------------------------------------------------
 .../running/entities/JPMEntityRepository.java   |  1 -
 .../running/entities/JobConfigSerDeser.java     | 49 ++++++++++++++++++++
 .../jpm/mr/history/MRHistoryJobConfig.java      | 44 +++++++++---------
 .../mr/history/crawler/JobHistoryDAOImpl.java   | 16 +++----
 .../src/main/resources/application.conf         | 28 +++++++----
 .../jpm/mr/running/parser/MRJobParser.java      |  1 +
 .../spark/history/SparkHistoryJobAppConfig.java | 38 ++++++++-------
 .../history/crawl/JHFSparkEventReader.java      | 18 +++----
 .../SparkFilesystemInputStreamReaderImpl.java   | 14 +++---
 .../history/storm/SparkHistoryJobParseBolt.java | 11 +++--
 .../src/main/resources/application.conf         | 15 ++++--
 .../spark/running/SparkRunningJobAppConfig.java | 28 +++++------
 .../running/parser/SparkApplicationParser.java  |  7 +--
 .../src/main/resources/application.conf         | 15 ++++--
 14 files changed, 177 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
index 81f266b..5494573 100644
--- 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JPMEntityRepository.java
@@ -18,7 +18,6 @@
 
 package org.apache.eagle.jpm.spark.running.entities;
 
-import org.apache.eagle.jpm.mr.runningentity.JobConfigSerDeser;
 import org.apache.eagle.log.entity.repo.EntityRepository;
 
 public class JPMEntityRepository extends EntityRepository {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfigSerDeser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfigSerDeser.java
 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfigSerDeser.java
new file mode 100644
index 0000000..64b48d5
--- /dev/null
+++ 
b/eagle-jpm/eagle-jpm-entity/src/main/java/org/apache/eagle/jpm/spark/running/entities/JobConfigSerDeser.java
@@ -0,0 +1,49 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.eagle.jpm.spark.running.entities;
+
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
+import org.apache.eagle.log.entity.meta.MapSerDeser;
+
+import java.util.Map;
+
+/**
+ * refactor this class later.
+ */
+public class JobConfigSerDeser implements EntitySerDeser<JobConfig> {
+    private static final MapSerDeser INSTANCE = new MapSerDeser();
+
+    @Override
+    public JobConfig deserialize(byte[] bytes) {
+        Map map = INSTANCE.deserialize(bytes);
+        JobConfig config = new JobConfig();
+        config.putAll(map);
+        return config;
+    }
+
+    @Override
+    public byte[] serialize(JobConfig jobConfig) {
+        return INSTANCE.serialize(jobConfig);
+    }
+
+    @Override
+    public Class<JobConfig> type() {
+        return JobConfig.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
index 4ac875b..3bb0fda 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.jpm.mr.history;
 
+import com.typesafe.config.ConfigValue;
 import org.apache.eagle.common.config.ConfigOptionParser;
 import org.apache.eagle.jpm.util.DefaultJobIdPartitioner;
 import org.apache.eagle.jpm.util.JobIdPartitioner;
@@ -27,6 +28,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 public class MRHistoryJobConfig implements Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(MRHistoryJobConfig.class);
@@ -85,13 +89,11 @@ public class MRHistoryJobConfig implements Serializable {
     }
 
     public static class JobHistoryEndpointConfig implements Serializable {
-        public String nnEndpoint;
         public String mrHistoryServerUrl;
         public String basePath;
         public boolean pathContainsJobTrackerName;
         public String jobTrackerName;
-        public String principal;
-        public String keyTab;
+        public Map<String, String> hdfs;
     }
 
     public static class ControlConfig implements Serializable {
@@ -124,6 +126,7 @@ public class MRHistoryJobConfig implements Serializable {
     private MRHistoryJobConfig() {
         this.zkStateConfig = new ZKStateConfig();
         this.jobHistoryEndpointConfig = new JobHistoryEndpointConfig();
+        this.jobHistoryEndpointConfig.hdfs = new HashMap<>();
         this.controlConfig = new ControlConfig();
         this.jobExtractorConfig = new JobExtractorConfig();
         this.eagleServiceConfig = new EagleServiceConfig();
@@ -153,26 +156,25 @@ public class MRHistoryJobConfig implements Serializable {
         this.jobExtractorConfig.mrVersion = 
config.getString("jobExtractorConfig.mrVersion");
         this.jobExtractorConfig.readTimeoutSeconds = 
config.getInt("jobExtractorConfig.readTimeOutSeconds");
         //parse eagle zk
-        this.zkStateConfig.zkQuorum = 
config.getString("dataSourceConfig.zkQuorum");
-        this.zkStateConfig.zkPort = 
config.getString("dataSourceConfig.zkPort");
-        this.zkStateConfig.zkSessionTimeoutMs = 
config.getInt("dataSourceConfig.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRetryTimes = 
config.getInt("dataSourceConfig.zkRetryTimes");
-        this.zkStateConfig.zkRetryInterval = 
config.getInt("dataSourceConfig.zkRetryInterval");
-        this.zkStateConfig.zkRoot = 
config.getString("dataSourceConfig.zkRoot");
+        this.zkStateConfig.zkQuorum = 
config.getString("zkStateConfig.zkQuorum");
+        this.zkStateConfig.zkPort = config.getString("zkStateConfig.zkPort");
+        this.zkStateConfig.zkSessionTimeoutMs = 
config.getInt("zkStateConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRetryTimes = 
config.getInt("zkStateConfig.zkRetryTimes");
+        this.zkStateConfig.zkRetryInterval = 
config.getInt("zkStateConfig.zkRetryInterval");
+        this.zkStateConfig.zkRoot = config.getString("zkStateConfig.zkRoot");
 
         //parse job history endpoint
-        this.jobHistoryEndpointConfig.basePath = 
config.getString("dataSourceConfig.basePath");
-        this.jobHistoryEndpointConfig.jobTrackerName = 
config.getString("dataSourceConfig.jobTrackerName");
-        this.jobHistoryEndpointConfig.nnEndpoint = 
config.getString("dataSourceConfig.nnEndpoint");
-        this.jobHistoryEndpointConfig.mrHistoryServerUrl = 
config.getString("dataSourceConfig.mrHistoryServerUrl");
-        this.jobHistoryEndpointConfig.pathContainsJobTrackerName = 
config.getBoolean("dataSourceConfig.pathContainsJobTrackerName");
-        this.jobHistoryEndpointConfig.principal = 
config.getString("dataSourceConfig.principal");
-        this.jobHistoryEndpointConfig.keyTab = 
config.getString("dataSourceConfig.keytab");
-
+        this.jobHistoryEndpointConfig.basePath = 
config.getString("endpointConfig.basePath");
+        this.jobHistoryEndpointConfig.jobTrackerName = 
config.getString("endpointConfig.jobTrackerName");
+        this.jobHistoryEndpointConfig.mrHistoryServerUrl = 
config.getString("endpointConfig.mrHistoryServerUrl");
+        this.jobHistoryEndpointConfig.pathContainsJobTrackerName = 
config.getBoolean("endpointConfig.pathContainsJobTrackerName");
+        for (Map.Entry<String, ConfigValue> entry : 
config.getConfig("endpointConfig.hdfs").entrySet()) {
+            this.jobHistoryEndpointConfig.hdfs.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
+        }
         //parse control config
-        this.controlConfig.dryRun = 
config.getBoolean("dataSourceConfig.dryRun");
+        this.controlConfig.dryRun = config.getBoolean("controlConfig.dryRun");
         try {
-            this.controlConfig.partitionerCls = (Class<? extends 
JobIdPartitioner>) 
Class.forName(config.getString("dataSourceConfig.partitionerCls"));
+            this.controlConfig.partitionerCls = (Class<? extends 
JobIdPartitioner>) 
Class.forName(config.getString("controlConfig.partitionerCls"));
             assert this.controlConfig.partitionerCls != null;
         } catch (Exception e) {
             LOG.warn("can not initialize partitioner class, use 
org.apache.eagle.jpm.util.DefaultJobIdPartitioner", e);
@@ -180,8 +182,8 @@ public class MRHistoryJobConfig implements Serializable {
         } finally {
             LOG.info("Loaded partitioner class: {}", 
this.controlConfig.partitionerCls);
         }
-        this.controlConfig.zeroBasedMonth = 
config.getBoolean("dataSourceConfig.zeroBasedMonth");
-        this.controlConfig.timeZone = 
config.getString("dataSourceConfig.timeZone");
+        this.controlConfig.zeroBasedMonth = 
config.getBoolean("controlConfig.zeroBasedMonth");
+        this.controlConfig.timeZone = 
config.getString("controlConfig.timeZone");
 
         // parse eagle service endpoint
         this.eagleServiceConfig.eagleServiceHost = 
config.getString("eagleProps.eagleService.host");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
index 0441f1f..a035357 100644
--- 
a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
+++ 
b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JobHistoryDAOImpl.java
@@ -30,10 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
+import java.util.*;
 
 public class JobHistoryDAOImpl extends AbstractJobHistoryDAO {
     private static final Logger LOG = 
LoggerFactory.getLogger(JobHistoryDAOImpl.class);
@@ -44,13 +41,12 @@ public class JobHistoryDAOImpl extends 
AbstractJobHistoryDAO {
 
     public JobHistoryDAOImpl(JobHistoryEndpointConfig endpointConfig) throws 
Exception {
         super(endpointConfig.basePath, 
endpointConfig.pathContainsJobTrackerName, endpointConfig.jobTrackerName);
-        this.conf.set("fs.defaultFS", endpointConfig.nnEndpoint);
-        this.conf.setBoolean("fs.hdfs.impl.disable.cache", true);
-        if (!endpointConfig.principal.equals("")) {
-            this.conf.set("hdfs.kerberos.principal", endpointConfig.principal);
-            this.conf.set("hdfs.keytab.file", endpointConfig.keyTab);
+        for (Map.Entry<String, String> entry : endpointConfig.hdfs.entrySet()) 
{
+            this.conf.set(entry.getKey(), entry.getValue());
+            LOG.info("conf key {}, conf value {}", entry.getKey(), 
entry.getValue());
         }
-        LOG.info("file system:" + endpointConfig.nnEndpoint);
+        this.conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+
         hdfs = HDFSUtil.getFileSystem(conf);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf 
b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
index de874a6..1debd06 100644
--- a/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-mr-history/src/main/resources/application.conf
@@ -32,26 +32,36 @@
     "readTimeOutSeconds" : 10
   },
 
-  "dataSourceConfig" : {
+  "zkStateConfig" : {
     "zkQuorum" : "sandbox.hortonworks.com:2181",
     "zkPort" : "2181",
     "zkRoot" : "/test_mrjobhistory",
     "zkSessionTimeoutMs" : 15000,
     "zkRetryTimes" : 3,
-    "zkRetryInterval" : 20000,
-    "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
-    "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888";,
-    "principal":"", #if not need, then empty
-    "keytab":"",
-    "basePath" : "/mr-history/done",
-    "pathContainsJobTrackerName" : false,
-    "jobTrackerName" : "",
+    "zkRetryInterval" : 20000
+  },
+
+  "controlConfig" : {
     "zeroBasedMonth" : false,
     "dryRun" : false,
     "partitionerCls" : "org.apache.eagle.jpm.util.DefaultJobIdPartitioner",
     "timeZone" : "UTC"
   },
 
+  "endpointConfig" : {
+    "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888";,
+    "basePath" : "/mr-history/done",
+    "pathContainsJobTrackerName" : false,
+    "jobTrackerName" : "",
+    "hdfs" : {
+      fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020",
+      #if not need, then do not set
+      # hdfs.kerberos.principal = ,
+      # hdfs.keytab.file =
+      # ....
+    }
+  },
+
   "eagleProps" : {
     "mailHost" : "abc.com",
     "mailDebug" : "true",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
index 2e36bc4..e5e3444 100644
--- 
a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
+++ 
b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java
@@ -500,6 +500,7 @@ public class MRJobParser implements Runnable {
     private Function<String, Boolean> fetchJobConfig = jobId -> {
         if (mrJobConfigs.containsKey(jobId)) {
             mrJobEntityMap.get(jobId).setJobConfig(mrJobConfigs.get(jobId));
+            
mrJobEntityMap.get(jobId).getTags().put(MRJobTagName.JOB_TYPE.toString(), 
Utils.fetchJobType(mrJobConfigs.get(jobId)).toString());
             return true;
         }
         String confURL = app.getTrackingUrl() + Constants.MR_JOBS_URL + "/" + 
jobId + "/" + Constants.MR_CONF_URL + "?" + Constants.ANONYMOUS_PARAMETER;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
index 58571cb..7505cd1 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/SparkHistoryJobAppConfig.java
@@ -20,8 +20,11 @@
 package org.apache.eagle.jpm.spark.history;
 
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 
 public class SparkHistoryJobAppConfig implements Serializable {
     static final String SPARK_HISTORY_JOB_FETCH_SPOUT_NAME = 
"sparkHistoryJobFetchSpout";
@@ -29,7 +32,6 @@ public class SparkHistoryJobAppConfig implements Serializable 
{
 
     public ZKStateConfig zkStateConfig;
     public JobHistoryEndpointConfig jobHistoryConfig;
-    public HDFSConfig hdfsConfig;
     public BasicInfo info;
     public EagleInfo eagleInfo;
     public StormConfig stormConfig;
@@ -45,7 +47,7 @@ public class SparkHistoryJobAppConfig implements Serializable 
{
     public SparkHistoryJobAppConfig() {
         this.zkStateConfig = new ZKStateConfig();
         this.jobHistoryConfig = new JobHistoryEndpointConfig();
-        this.hdfsConfig = new HDFSConfig();
+        this.jobHistoryConfig.hdfs = new HashMap<>();
         this.info = new BasicInfo();
         this.eagleInfo = new EagleInfo();
         this.stormConfig = new StormConfig();
@@ -59,27 +61,29 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
     private void init(Config config) {
         this.config = config;
 
-        this.zkStateConfig.zkQuorum = 
config.getString("dataSourceConfig.zkQuorum");
-        this.zkStateConfig.zkRetryInterval = 
config.getInt("dataSourceConfig.zkRetryInterval");
-        this.zkStateConfig.zkRetryTimes = 
config.getInt("dataSourceConfig.zkRetryTimes");
-        this.zkStateConfig.zkSessionTimeoutMs = 
config.getInt("dataSourceConfig.zkSessionTimeoutMs");
-        this.zkStateConfig.zkRoot = 
config.getString("dataSourceConfig.zkRoot");
+        this.zkStateConfig.zkQuorum = 
config.getString("zkStateConfig.zkQuorum");
+        this.zkStateConfig.zkRetryInterval = 
config.getInt("zkStateConfig.zkRetryInterval");
+        this.zkStateConfig.zkRetryTimes = 
config.getInt("zkStateConfig.zkRetryTimes");
+        this.zkStateConfig.zkSessionTimeoutMs = 
config.getInt("zkStateConfig.zkSessionTimeoutMs");
+        this.zkStateConfig.zkRoot = config.getString("zkStateConfig.zkRoot");
 
         jobHistoryConfig.historyServerUrl = 
config.getString("dataSourceConfig.spark.history.server.url");
         jobHistoryConfig.historyServerUserName = 
config.getString("dataSourceConfig.spark.history.server.username");
         jobHistoryConfig.historyServerUserPwd = 
config.getString("dataSourceConfig.spark.history.server.password");
         jobHistoryConfig.rms = 
config.getString("dataSourceConfig.rm.url").split(",\\s*");
-
-        this.hdfsConfig.baseDir = 
config.getString("dataSourceConfig.hdfs.eventLog");
-        this.hdfsConfig.endpoint = 
config.getString("dataSourceConfig.hdfs.endPoint");
-        this.hdfsConfig.principal = 
config.getString("dataSourceConfig.hdfs.principal");
-        this.hdfsConfig.keytab = 
config.getString("dataSourceConfig.hdfs.keytab");
+        jobHistoryConfig.baseDir = 
config.getString("dataSourceConfig.baseDir");
+        for (Map.Entry<String, ConfigValue> entry : 
config.getConfig("dataSourceConfig.hdfs").entrySet()) {
+            this.jobHistoryConfig.hdfs.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
+        }
 
         info.site = config.getString("basic.cluster") + "-" + 
config.getString("basic.dataCenter");
         info.jobConf = 
config.getString("basic.jobConf.additional.info").split(",\\s*");
 
         this.eagleInfo.host = 
config.getString("eagleProps.eagle.service.host");
         this.eagleInfo.port = config.getInt("eagleProps.eagle.service.port");
+        this.eagleInfo.username = 
config.getString("eagleProps.eagle.service.username");
+        this.eagleInfo.password = 
config.getString("eagleProps.eagle.service.password");
+        this.eagleInfo.timeout = 
config.getInt("eagleProps.eagle.service.read.timeout");
 
         this.stormConfig.timeoutSec = config.getInt("storm.messageTimeoutSec");
         this.stormConfig.spoutPending = config.getInt("storm.pendingSpout");
@@ -99,13 +103,8 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
         public String historyServerUrl;
         public String historyServerUserName;
         public String historyServerUserPwd;
-    }
-
-    public static class HDFSConfig implements Serializable {
-        public String endpoint;
         public String baseDir;
-        public String principal;
-        public String keytab;
+        public Map<String, String> hdfs;
     }
 
     public static class BasicInfo implements Serializable {
@@ -122,5 +121,8 @@ public class SparkHistoryJobAppConfig implements 
Serializable {
     public static class EagleInfo implements Serializable {
         public String host;
         public int port;
+        public String username;
+        public String password;
+        public int timeout;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
index 76f560e..b4a403e 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/JHFSparkEventReader.java
@@ -18,6 +18,7 @@
 package org.apache.eagle.jpm.spark.history.crawl;
 
 import org.apache.eagle.jpm.spark.entity.*;
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
 import org.apache.eagle.jpm.util.*;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.service.client.EagleServiceClientException;
@@ -52,9 +53,10 @@ public class JHFSparkEventReader {
 
     private List<TaggedLogAPIEntity> createEntities;
 
+    private SparkHistoryJobAppConfig config;
     private Config conf;
 
-    public JHFSparkEventReader(Map<String, String> baseTags, 
SparkApplicationInfo info) {
+    public JHFSparkEventReader(SparkHistoryJobAppConfig config, Map<String, 
String> baseTags, SparkApplicationInfo info) {
         app = new SparkApp();
         app.setTags(new HashMap<String, String>(baseTags));
         app.setYarnState(info.getState());
@@ -66,7 +68,8 @@ public class JHFSparkEventReader {
         tasks = new HashMap<Long, SparkTask>();
         executors = new HashMap<String, SparkExecutor>();
         stageTaskStatusMap = new HashMap<>();
-        conf = ConfigFactory.load();
+        conf = config.getConfig();
+        this.config = config;
         this.initiateClient();
     }
 
@@ -694,12 +697,11 @@ public class JHFSparkEventReader {
     }
 
     private EagleServiceBaseClient initiateClient() {
-        String host = conf.getString("eagleProps.eagle.service.host");
-        int port = conf.getInt("eagleProps.eagle.service.port");
-        String userName = conf.getString("eagleProps.eagle.service.username");
-        String pwd = conf.getString("eagleProps.eagle.service.password");
-        client = new EagleServiceClientImpl(host, port, userName, pwd);
-        int timeout = conf.getInt("eagleProps.eagle.service.read.timeout");
+        client = new EagleServiceClientImpl(config.eagleInfo.host,
+            config.eagleInfo.port,
+            config.eagleInfo.username,
+            config.eagleInfo.password);
+        int timeout = config.eagleInfo.timeout;
         client.getJerseyClient().setReadTimeout(timeout * 1000);
 
         return client;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
index 0144410..57ced63 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/crawl/SparkFilesystemInputStreamReaderImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.eagle.jpm.spark.history.crawl;
 
+import org.apache.eagle.jpm.spark.history.SparkHistoryJobAppConfig;
 import org.apache.eagle.jpm.util.SparkJobTagName;
 
 import java.io.File;
@@ -29,10 +30,11 @@ public class SparkFilesystemInputStreamReaderImpl 
implements JHFInputStreamReade
 
     private String site;
     private SparkApplicationInfo app;
+    private SparkHistoryJobAppConfig config;
 
-
-    public SparkFilesystemInputStreamReaderImpl(String site, 
SparkApplicationInfo app) {
-        this.site = site;
+    public SparkFilesystemInputStreamReaderImpl(SparkHistoryJobAppConfig 
config, SparkApplicationInfo app) {
+        this.config = config;
+        this.site = config.info.site;
         this.app = app;
     }
 
@@ -41,13 +43,13 @@ public class SparkFilesystemInputStreamReaderImpl 
implements JHFInputStreamReade
         Map<String, String> baseTags = new HashMap<>();
         baseTags.put(SparkJobTagName.SITE.toString(), site);
         baseTags.put(SparkJobTagName.SPARK_QUEUE.toString(), app.getQueue());
-        JHFParserBase parser = new JHFSparkParser(new 
JHFSparkEventReader(baseTags, this.app));
+        JHFParserBase parser = new JHFSparkParser(new 
JHFSparkEventReader(config, baseTags, this.app));
         parser.parse(is);
     }
 
     public static void main(String[] args) throws Exception {
-        SparkFilesystemInputStreamReaderImpl impl = new 
SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
-        impl.read(new FileInputStream(new 
File("E:\\eagle\\application_1459803563374_535667_1")));
+        //SparkFilesystemInputStreamReaderImpl impl = new 
SparkFilesystemInputStreamReaderImpl("apollo-phx", new SparkApplicationInfo());
+        //impl.read(new FileInputStream(new 
File("E:\\eagle\\application_1459803563374_535667_1")));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
index 82f26c2..94f6bbf 100644
--- 
a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
+++ 
b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -62,10 +62,11 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
     public void prepare(Map map, TopologyContext topologyContext, 
OutputCollector outputCollector) {
         this.collector = outputCollector;
         this.hdfsConf = new Configuration();
-        this.hdfsConf.set("fs.defaultFS", config.hdfsConfig.endpoint);
         this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
-        this.hdfsConf.set("hdfs.kerberos.principal", 
config.hdfsConfig.principal);
-        this.hdfsConf.set("hdfs.keytab.file", config.hdfsConfig.keytab);
+        for (Map.Entry<String, String> entry : 
config.jobHistoryConfig.hdfs.entrySet()) {
+            this.hdfsConf.set(entry.getKey(), entry.getValue());
+            LOG.info("conf key {}, conf value {}", entry.getKey(), 
entry.getValue());
+        }
         this.historyServerFetcher = new 
SparkHistoryServerResourceFetcher(config.jobHistoryConfig.historyServerUrl,
                 config.jobHistoryConfig.historyServerUserName, 
config.jobHistoryConfig.historyServerUserPwd);
         this.zkState = new JobHistoryZKStateManager(config);
@@ -99,7 +100,7 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
                     LOG.info("Attempt log name: " + attemptLogName + 
extension);
 
                     Path attemptFile = getFilePath(attemptLogName, extension);
-                    JHFInputStreamReader reader = new 
SparkFilesystemInputStreamReaderImpl(config.info.site, info);
+                    JHFInputStreamReader reader = new 
SparkFilesystemInputStreamReaderImpl(config, info);
                     reader.read(hdfs.open(attemptFile));
                 }
             }
@@ -130,7 +131,7 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
     }
 
     private Path getFilePath(String appAttemptLogName, String extension) {
-        String attemptLogDir = this.config.hdfsConfig.baseDir + "/" + 
appAttemptLogName + extension;
+        String attemptLogDir = this.config.jobHistoryConfig.baseDir + "/" + 
appAttemptLogName + extension;
         return new Path(attemptLogDir);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf 
b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
index 4c22b15..baf559b 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/resources/application.conf
@@ -30,21 +30,26 @@
     eagle.service.password : "secret",
     eagle.service.read.timeout : 2
   },
-  "dataSourceConfig":{
+  "zkStateConfig" : {
     "zkQuorum" : "sandbox.hortonworks.com:2181",
     "zkRoot" : "/sparkJobHistory",
     "zkSessionTimeoutMs" : 15000,
     "zkRetryTimes" : 3,
     "zkRetryInterval" : 20000,
+  },
+
+  "dataSourceConfig":{
     spark.history.server.url : "http://sandbox.hortonworks.com:18080";,
     spark.history.server.username : "",
     spark.history.server.password : "",
     rm.url: "http://sandbox.hortonworks.com:8088";,
+    "baseDir" : "/spark-history",
     "hdfs": {
-      "eventLog": "/spark-history",
-      "endPoint": "hdfs://sandbox.hortonworks.com:8020",
-      "principal": "",
-      "keytab" : ""
+      fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020",
+      #if not need, then do not set
+      # hdfs.kerberos.principal = ,
+      # hdfs.keytab.file =
+      # ....
     }
   },
   "storm":{

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
 
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
index d9c66e3..6855b8e 100644
--- 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
+++ 
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/SparkRunningJobAppConfig.java
@@ -18,24 +18,21 @@
 
 package org.apache.eagle.jpm.spark.running;
 
+import com.typesafe.config.ConfigValue;
 import org.apache.eagle.common.config.ConfigOptionParser;
 import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
 
 public class SparkRunningJobAppConfig implements Serializable {
     private static final Logger LOG = 
LoggerFactory.getLogger(SparkRunningJobAppConfig.class);
     static final String JOB_FETCH_SPOUT_NAME = "sparkRunningJobFetchSpout";
     static final String JOB_PARSE_BOLT_NAME = "sparkRunningJobParseBolt";
 
-    public String getEnv() {
-        return env;
-    }
-
-    private String env;
-
     ZKStateConfig getZkStateConfig() {
         return zkStateConfig;
     }
@@ -98,11 +95,9 @@ public class SparkRunningJobAppConfig implements 
Serializable {
     }
 
     public static class EndpointConfig implements Serializable {
-        public String nnEndpoint;
         public String eventLog;
         public String[] rmUrls;
-        public String principal;
-        public String keyTab;
+        public Map<String, String> hdfs;
     }
 
     public Config getConfig() {
@@ -117,6 +112,7 @@ public class SparkRunningJobAppConfig implements 
Serializable {
         this.eagleServiceConfig = new EagleServiceConfig();
         this.jobExtractorConfig = new JobExtractorConfig();
         this.endpointConfig = new EndpointConfig();
+        this.endpointConfig.hdfs = new HashMap<>();
         this.zkStateConfig = new ZKStateConfig();
         this.topologyConfig = new TopologyConfig();
     }
@@ -138,7 +134,6 @@ public class SparkRunningJobAppConfig implements 
Serializable {
 
     private void init(Config config) {
         this.config = config;
-        this.env = config.getString("envContextConfig.env");
         this.zkStateConfig.zkQuorum = 
config.getString("zookeeperConfig.zkQuorum");
         this.zkStateConfig.zkPort = config.getString("zookeeperConfig.zkPort");
         this.zkStateConfig.zkSessionTimeoutMs = 
config.getInt("zookeeperConfig.zkSessionTimeoutMs");
@@ -162,13 +157,13 @@ public class SparkRunningJobAppConfig implements 
Serializable {
         this.jobExtractorConfig.fetchRunningJobInterval = 
config.getInt("jobExtractorConfig.fetchRunningJobInterval");
         this.jobExtractorConfig.parseThreadPoolSize = 
config.getInt("jobExtractorConfig.parseThreadPoolSize");
 
-        //parse data source config
-        this.endpointConfig.eventLog = 
config.getString("dataSourceConfig.eventLog");
-        this.endpointConfig.nnEndpoint = 
config.getString("dataSourceConfig.nnEndpoint");
-        this.endpointConfig.keyTab = 
config.getString("dataSourceConfig.keytab");
-        this.endpointConfig.principal = 
config.getString("dataSourceConfig.principal");
+        //parse endpointConfig config
+        this.endpointConfig.eventLog = 
config.getString("endpointConfig.eventLog");
+        for (Map.Entry<String, ConfigValue> entry : 
config.getConfig("endpointConfig.hdfs").entrySet()) {
+            this.endpointConfig.hdfs.put(entry.getKey(), 
entry.getValue().unwrapped().toString());
+        }
 
-        this.endpointConfig.rmUrls = 
config.getString("dataSourceConfig.rmUrls").split(",");
+        this.endpointConfig.rmUrls = 
config.getString("endpointConfig.rmUrls").split(",");
 
         this.topologyConfig.jobFetchSpoutParallism = 
config.getInt("envContextConfig.parallelismConfig." + JOB_FETCH_SPOUT_NAME);
         this.topologyConfig.jobFetchSpoutTasksNum = 
config.getInt("envContextConfig.tasks." + JOB_FETCH_SPOUT_NAME);
@@ -176,7 +171,6 @@ public class SparkRunningJobAppConfig implements 
Serializable {
         this.topologyConfig.jobParseBoltTasksNum = 
config.getInt("envContextConfig.tasks." + JOB_PARSE_BOLT_NAME);
 
         LOG.info("Successfully initialized SparkRunningJobAppConfig");
-        LOG.info("env: " + this.env);
         LOG.info("site: " + this.jobExtractorConfig.site);
         LOG.info("eagle.service.host: " + 
this.eagleServiceConfig.eagleServiceHost);
         LOG.info("eagle.service.port: " + 
this.eagleServiceConfig.eagleServicePort);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
 
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
index 3719325..ba4e9fd 100644
--- 
a/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
+++ 
b/eagle-jpm/eagle-jpm-spark-running/src/main/java/org/apache/eagle/jpm/spark/running/parser/SparkApplicationParser.java
@@ -93,10 +93,11 @@ public class SparkApplicationParser implements Runnable {
         this.currentAttempt = 1;
         this.first = true;
         this.hdfsConf  = new Configuration();
-        this.hdfsConf.set("fs.defaultFS", endpointConfig.nnEndpoint);
+        for (Map.Entry<String, String> entry : endpointConfig.hdfs.entrySet()) 
{
+            this.hdfsConf.set(entry.getKey(), entry.getValue());
+            LOG.info("conf key {}, conf value {}", entry.getKey(), 
entry.getValue());
+        }
         this.hdfsConf.setBoolean("fs.hdfs.impl.disable.cache", true);
-        this.hdfsConf.set("hdfs.kerberos.principal", endpointConfig.principal);
-        this.hdfsConf.set("hdfs.keytab.file", endpointConfig.keyTab);
 
         this.commonTags.put(SparkJobTagName.SITE.toString(), 
jobExtractorConfig.site);
         this.commonTags.put(SparkJobTagName.SPARK_USER.toString(), 
app.getUser());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/d100d03e/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git 
a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf 
b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
index 4d07b38..f0f6d42 100644
--- a/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
+++ b/eagle-jpm/eagle-jpm-spark-running/src/main/resources/application.conf
@@ -33,12 +33,17 @@
     "fetchRunningJobInterval" : 15,
     "parseThreadPoolSize" : 5
   },
-  "dataSourceConfig" : {
+
+  "endpointConfig" : {
     "rmUrls": "http://sandbox.hortonworks.com:8088";,
-    "nnEndpoint" : "hdfs://sandbox.hortonworks.com:8020",
-    "principal" : "", #if not need, then empty
-    "keytab" : "",
-    "eventLog" : "/spark-history"
+    "eventLog" : "/spark-history",
+    "hdfs" : {
+      fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020",
+      #if not need, then do not set
+      # hdfs.kerberos.principal = ,
+      # hdfs.keytab.file =
+      # ....
+    }
   },
 
   "zookeeperConfig" : {


Reply via email to