HIVE-13365. Change the MiniLLAPCluster to work with a MiniZKCluster, and 
potentially allow multiple instances of LLAP within the MiniLlapCluster. 
(Siddharth Seth, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/91ab819a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/91ab819a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/91ab819a

Branch: refs/heads/llap
Commit: 91ab819a18d6271a6c8905d085ad90b1b184ecae
Parents: b446502
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Apr 4 15:23:37 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Apr 4 15:23:37 2016 -0700

----------------------------------------------------------------------
 itests/hive-unit/pom.xml                        |   2 -
 .../org/apache/hive/jdbc/miniHS2/MiniHS2.java   |   2 +-
 .../apache/hadoop/hive/llap/LlapItUtils.java    |  10 +-
 .../org/apache/hadoop/hive/ql/QTestUtil.java    |  10 +-
 .../hive/llap/daemon/MiniLlapCluster.java       | 145 ++++++++++++-------
 5 files changed, 109 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index 7219f1d..ae231de 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -210,14 +210,12 @@
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
       <version>${hbase.version}</version>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
       <version>${hbase.version}</version>
       <type>test-jar</type>
-      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java 
b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
index 6141a1a..6b337d2 100644
--- a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
+++ b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java
@@ -202,7 +202,7 @@ public class MiniHS2 extends AbstractHiveService {
         if (usePortsFromConf) {
           hiveConf.setBoolean("minillap.usePortsFromConf", true);
         }
-        llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null);
+        llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null, 
null);
 
         mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, 
uriString);
         break;

http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java
----------------------------------------------------------------------
diff --git 
a/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java 
b/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java
index cb4aba5..c1a32c9 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
 import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster;
@@ -36,7 +37,9 @@ public class LlapItUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapItUtils.class);
 
-  public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf, 
String confDir) throws
+  public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf,
+                                                           
MiniZooKeeperCluster miniZkCluster,
+                                                           String confDir) 
throws
       IOException {
     MiniLlapCluster llapCluster;
     LOG.info("Using conf dir: {}", confDir);
@@ -57,11 +60,14 @@ public class LlapItUtils {
     // enabling this will cause test failures in Mac OS X
     final boolean directMemoryEnabled = false;
     final int numLocalDirs = 1;
-    LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: 
" + memoryForCache
+    LOG.info("MiniLlap Configs -  maxMemory: " + maxMemory +
+        " memoryForCache: " + memoryForCache
         + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + 
numExecutors
         + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + 
directMemoryEnabled
         + " numLocalDirs: " + numLocalDirs);
     llapCluster = MiniLlapCluster.create(clusterName,
+        miniZkCluster,
+        1,
         numExecutors,
         totalExecutorMemory,
         asyncIOEnabled,

http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java 
b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
index 5ccbcba..8473436 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
@@ -429,6 +429,9 @@ public class QTestUtil {
         fs = dfs.getFileSystem();
       }
 
+      setup = new QTestSetup();
+      setup.preTest(conf);
+
       String uriString = 
WindowsPathUtil.getHdfsUriString(fs.getUri().toString());
       if (clusterType == MiniClusterType.tez) {
         if (confDir != null && !confDir.isEmpty()) {
@@ -437,13 +440,16 @@ public class QTestUtil {
         }
         mr = shims.getMiniTezCluster(conf, 4, uriString);
       } else if (clusterType == MiniClusterType.llap) {
-        llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, confDir);
+        llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, 
setup.zooKeeperCluster, confDir);
         mr = shims.getMiniTezCluster(conf, 2, uriString);
       } else if (clusterType == MiniClusterType.miniSparkOnYarn) {
         mr = shims.getMiniSparkCluster(conf, 4, uriString, 1);
       } else {
         mr = shims.getMiniMrCluster(conf, 4, uriString, 1);
       }
+    } else {
+      setup = new QTestSetup();
+      setup.preTest(conf);
     }
 
     initConf();
@@ -471,8 +477,6 @@ public class QTestUtil {
 
     overWrite = 
"true".equalsIgnoreCase(System.getProperty("test.output.overwrite"));
 
-    setup = new QTestSetup();
-    setup.preTest(conf);
     init();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index a09c0b2..9871702 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -14,12 +14,11 @@
 
 package org.apache.hadoop.hive.llap.daemon;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.Map;
 
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,47 +40,57 @@ public class MiniLlapCluster extends AbstractService {
   private static final Logger LOG = 
LoggerFactory.getLogger(MiniLlapCluster.class);
 
   private final File testWorkDir;
+  private final String clusterNameTrimmed;
+  private final long numInstances;
   private final long execBytesPerService;
   private final boolean llapIoEnabled;
   private final boolean ioIsDirect;
   private final long ioBytesPerService;
   private final int numExecutorsPerService;
+  private final File zkWorkDir;
   private final String[] localDirs;
   private final Configuration clusterSpecificConfiguration = new 
Configuration(false);
 
-  private LlapDaemon llapDaemon;
+  private final LlapDaemon [] llapDaemons;
+  private MiniZooKeeperCluster miniZooKeeperCluster;
+  private final boolean ownZkCluster;
 
-  public static MiniLlapCluster create(String clusterName, int 
numExecutorsPerService,
-      long execBytePerService, boolean llapIoEnabled, boolean ioIsDirect, long 
ioBytesPerService,
-      int numLocalDirs) {
-    return new MiniLlapCluster(clusterName, numExecutorsPerService, 
execBytePerService,
+
+  public static MiniLlapCluster create(String clusterName,
+                                       @Nullable MiniZooKeeperCluster 
miniZkCluster,
+                                       int numInstances,
+                                       int numExecutorsPerService,
+                                       long execBytePerService, boolean 
llapIoEnabled,
+                                       boolean ioIsDirect, long 
ioBytesPerService,
+                                       int numLocalDirs) {
+    return new MiniLlapCluster(clusterName, miniZkCluster, numInstances, 
numExecutorsPerService,
+        execBytePerService,
         llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs);
   }
 
-  public static MiniLlapCluster createAndLaunch(Configuration conf, String 
clusterName,
-      int numExecutorsPerService, long execBytePerService, boolean 
llapIoEnabled,
-      boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) {
-    MiniLlapCluster miniLlapCluster = create(clusterName, 
numExecutorsPerService,
-        execBytePerService, llapIoEnabled, ioIsDirect, ioBytesPerService, 
numLocalDirs);
-    miniLlapCluster.init(conf);
-    miniLlapCluster.start();
-    Configuration llapConf = miniLlapCluster.getClusterSpecificConfiguration();
-    Iterator<Map.Entry<String, String>> confIter = llapConf.iterator();
-    while (confIter.hasNext()) {
-      Map.Entry<String, String> entry = confIter.next();
-      conf.set(entry.getKey(), entry.getValue());
-    }
-    return miniLlapCluster;
+  public static MiniLlapCluster create(String clusterName,
+                                       @Nullable MiniZooKeeperCluster 
miniZkCluster,
+                                       int numExecutorsPerService,
+                                       long execBytePerService, boolean 
llapIoEnabled,
+                                       boolean ioIsDirect, long 
ioBytesPerService,
+                                       int numLocalDirs) {
+    return create(clusterName, miniZkCluster, 1, numExecutorsPerService, 
execBytePerService,
+        llapIoEnabled,
+        ioIsDirect, ioBytesPerService, numLocalDirs);
   }
 
-  // TODO Add support for multiple instances
-  private MiniLlapCluster(String clusterName, int numExecutorsPerService, long 
execMemoryPerService,
-                          boolean llapIoEnabled, boolean ioIsDirect, long 
ioBytesPerService, int numLocalDirs) {
+  private MiniLlapCluster(String clusterName, @Nullable MiniZooKeeperCluster 
miniZkCluster,
+                          int numInstances, int numExecutorsPerService, long 
execMemoryPerService,
+                          boolean llapIoEnabled, boolean ioIsDirect, long 
ioBytesPerService,
+                          int numLocalDirs) {
     super(clusterName + "_" + MiniLlapCluster.class.getSimpleName());
     Preconditions.checkArgument(numExecutorsPerService > 0);
     Preconditions.checkArgument(execMemoryPerService > 0);
     Preconditions.checkArgument(numLocalDirs > 0);
-    String clusterNameTrimmed = clusterName.replace("$", "") + "_" + 
MiniLlapCluster.class.getSimpleName();
+    this.numInstances = numInstances;
+
+    this.clusterNameTrimmed = clusterName.replace("$", "") + "_" + 
MiniLlapCluster.class.getSimpleName();
+    this.llapDaemons = new LlapDaemon[numInstances];
     File targetWorkDir = new File("target", clusterNameTrimmed);
     try {
       FileContext.getLocalFSFileContext().delete(
@@ -123,8 +132,18 @@ public class MiniLlapCluster extends AbstractService {
 
       this.testWorkDir = link;
     } else {
+      targetWorkDir.mkdir();
       this.testWorkDir = targetWorkDir;
     }
+    if (miniZkCluster == null) {
+      ownZkCluster = true;
+      this.zkWorkDir = new File(testWorkDir, "mini-zk-cluster");
+      zkWorkDir.mkdir();
+    } else {
+      miniZooKeeperCluster = miniZkCluster;
+      ownZkCluster = false;
+      this.zkWorkDir = null;
+    }
     this.numExecutorsPerService = numExecutorsPerService;
     this.execBytesPerService = execMemoryPerService;
     this.ioIsDirect = ioIsDirect;
@@ -142,12 +161,13 @@ public class MiniLlapCluster extends AbstractService {
   }
 
   @Override
-  public void serviceInit(Configuration conf) {
+  public void serviceInit(Configuration conf) throws IOException, 
InterruptedException {
     int rpcPort = 0;
     int mngPort = 0;
     int shufflePort = 0;
     int webPort = 0;
     boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", 
false);
+    LOG.info("MiniLlap configured to use ports from conf: {}", 
usePortsFromConf);
     if (usePortsFromConf) {
       rpcPort = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT);
       mngPort = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT);
@@ -155,43 +175,61 @@ public class MiniLlapCluster extends AbstractService {
       webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
     }
 
-    llapDaemon = new LlapDaemon(conf, numExecutorsPerService, 
execBytesPerService, llapIoEnabled,
-        ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, 
shufflePort, webPort);
-    llapDaemon.init(conf);
+    if (ownZkCluster) {
+      miniZooKeeperCluster = new MiniZooKeeperCluster();
+      miniZooKeeperCluster.startup(zkWorkDir);
+    } else {
+      // Already setup in the create method
+    } 
+
+    conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + 
clusterNameTrimmed);
+    conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, "localhost");
+    conf.setInt(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, 
miniZooKeeperCluster.getClientPort());
+  
+    LOG.info("Initializing {} llap instances for MiniLlapCluster with 
name={}", numInstances, clusterNameTrimmed);
+    for (int i = 0 ;i < numInstances ; i++) {
+      llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, 
execBytesPerService, llapIoEnabled,
+          ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, 
shufflePort, webPort);
+      llapDaemons[i].init(new Configuration(conf));
+    }
+    LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", 
numInstances, clusterNameTrimmed);
   }
 
   @Override
   public void serviceStart() {
-    llapDaemon.start();
-
-    
clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
-        getServiceAddress().getHostName());
-    clusterSpecificConfiguration.setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname,
-        getServiceAddress().getPort());
-
-    clusterSpecificConfiguration.setInt(
-        ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
-        numExecutorsPerService);
-    clusterSpecificConfiguration.setLong(
-        ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, 
execBytesPerService);
+    LOG.info("Starting {} llap instances for MiniLlapCluster with name={}", 
numInstances, clusterNameTrimmed);
+    for (int i = 0 ;i < numInstances ; i++) {
+      llapDaemons[i].start();
+    }
+    LOG.info("Started {} llap instances for MiniLlapCluster with name={}", 
numInstances, clusterNameTrimmed);
+
     // Optimize local fetch does not work with LLAP due to different local 
directories
     // used by containers and LLAP
     clusterSpecificConfiguration
         .setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, 
false);
+    
clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, 
"@" + clusterNameTrimmed);
   }
 
   @Override
-  public void serviceStop() {
-    if (llapDaemon != null) {
-      llapDaemon.stop();
-      llapDaemon = null;
+  public void serviceStop() throws IOException {
+    for (int i = 0 ; i < numInstances ; i++) {
+      if (llapDaemons[i] != null) {
+        llapDaemons[i].stop();
+        llapDaemons[i] = null;
+      }
+    }
+    if (ownZkCluster) {
+      if (miniZooKeeperCluster != null) {
+        LOG.info("Stopping MiniZooKeeper cluster");
+        miniZooKeeperCluster.shutdown();
+        miniZooKeeperCluster = null;
+        LOG.info("Stopped MiniZooKeeper cluster");
+      }
+    } else {
+      LOG.info("Not stopping MiniZK cluster since it is now owned by us"); 
     }
   }
 
-  private InetSocketAddress getServiceAddress() {
-    Preconditions.checkState(getServiceState() == Service.STATE.STARTED);
-    return llapDaemon.getListenerAddress();
-  }
 
   public Configuration getClusterSpecificConfiguration() {
     Preconditions.checkState(getServiceState() == Service.STATE.STARTED);
@@ -200,7 +238,10 @@ public class MiniLlapCluster extends AbstractService {
 
   // Mainly for verification
   public long getNumSubmissions() {
-    return llapDaemon.getNumSubmissions();
+    int numSubmissions = 0;
+    for (int i = 0 ; i < numInstances ; i++) {
+      numSubmissions += llapDaemons[i].getNumSubmissions();
+    }
+    return numSubmissions;
   }
-
 }

Reply via email to