Repository: lens
Updated Branches:
  refs/heads/master c4d45b189 -> 3e7a6f602


LENS-1148 : Throttling tests to regression


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/3e7a6f60
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/3e7a6f60
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/3e7a6f60

Branch: refs/heads/master
Commit: 3e7a6f602b5a9c23e5f9ad16dcc61872e105cfa7
Parents: c4d45b1
Author: Archana H <archana.h...@gmail.com>
Authored: Wed Jun 8 19:03:53 2016 +0530
Committer: Rajat Khandelwal <rajatgupt...@gmail.com>
Committed: Wed Jun 8 19:03:53 2016 +0530

----------------------------------------------------------------------
 lens-regression/pom.xml                         |  12 +
 .../org/apache/lens/regression/util/Util.java   |   2 +-
 .../lens/regression/client/KillQueryTests.java  |   6 +-
 .../lens/regression/throttling/ITCostTests.java |  10 +-
 .../lens/regression/throttling/Throttling.java  | 574 +++++++++++++++++++
 5 files changed, 594 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/3e7a6f60/lens-regression/pom.xml
----------------------------------------------------------------------
diff --git a/lens-regression/pom.xml b/lens-regression/pom.xml
index b9bee97..07959c8 100644
--- a/lens-regression/pom.xml
+++ b/lens-regression/pom.xml
@@ -136,6 +136,18 @@
                 </executions>
             </plugin>
 
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-surefire-plugin</artifactId>
+          <version>${surefire.plugin.version}</version>
+          <configuration>
+            <failIfNoTests>false</failIfNoTests>
+            <forkedProcessTimeoutInSeconds>7200</forkedProcessTimeoutInSeconds>
+            <redirectTestOutputToFile>true</redirectTestOutputToFile>
+          </configuration>
+        </plugin>
+
+
         </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/lens/blob/3e7a6f60/lens-regression/src/main/java/org/apache/lens/regression/util/Util.java
----------------------------------------------------------------------
diff --git 
a/lens-regression/src/main/java/org/apache/lens/regression/util/Util.java 
b/lens-regression/src/main/java/org/apache/lens/regression/util/Util.java
index 1a538da..dcf5e8b 100644
--- a/lens-regression/src/main/java/org/apache/lens/regression/util/Util.java
+++ b/lens-regression/src/main/java/org/apache/lens/regression/util/Util.java
@@ -62,7 +62,7 @@ import lombok.extern.slf4j.Slf4j;
 public class Util {
 
   private static final String PROPERTY_FILE = "lens.properties";
-  private static String localFilePath = "lens-regression/target/";
+  private static String localFilePath = "target/";
   private static String localFile;
   private static String backupFile;
 

http://git-wip-us.apache.org/repos/asf/lens/blob/3e7a6f60/lens-regression/src/test/java/org/apache/lens/regression/client/KillQueryTests.java
----------------------------------------------------------------------
diff --git 
a/lens-regression/src/test/java/org/apache/lens/regression/client/KillQueryTests.java
 
b/lens-regression/src/test/java/org/apache/lens/regression/client/KillQueryTests.java
index e588c1b..57351db 100644
--- 
a/lens-regression/src/test/java/org/apache/lens/regression/client/KillQueryTests.java
+++ 
b/lens-regression/src/test/java/org/apache/lens/regression/client/KillQueryTests.java
@@ -31,7 +31,6 @@ import org.apache.lens.api.query.*;
 import org.apache.lens.regression.core.constants.QueryInventory;
 import org.apache.lens.regression.core.helpers.*;
 import org.apache.lens.regression.core.testHelper.BaseTestClass;
-import org.apache.lens.regression.util.HadoopUtil;
 import org.apache.lens.server.api.error.LensException;
 
 import org.apache.log4j.Logger;
@@ -64,10 +63,11 @@ public class KillQueryTests extends BaseTestClass {
     logger.info("Creating a new Session");
     sessionHandleString = lens.openSession(lens.getCurrentDB());
 
-    HadoopUtil.uploadJars(localJarPath + "/" + hiveUdfJar, hdfsJarPath);
+    //TODO : Enable when udf registration per driver is fixed
+/*    HadoopUtil.uploadJars(localJarPath + "/" + hiveUdfJar, hdfsJarPath);
     logger.info("Adding jar for making query to run for longer period of 
time");
     sHelper.addResourcesJar(hdfsJarPath + "/" + hiveUdfJar);
-    QueryHandle queryHandle = (QueryHandle) 
qHelper.executeQuery(QueryInventory.SLEEP_FUNCTION).getData();
+    QueryHandle queryHandle = (QueryHandle) 
qHelper.executeQuery(QueryInventory.SLEEP_FUNCTION).getData();*/
   }
 
   @BeforeMethod(alwaysRun = true)

http://git-wip-us.apache.org/repos/asf/lens/blob/3e7a6f60/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java
----------------------------------------------------------------------
diff --git 
a/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java
 
b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java
index 4da4c3a..b4846e3 100644
--- 
a/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java
+++ 
b/lens-regression/src/test/java/org/apache/lens/regression/throttling/ITCostTests.java
@@ -217,7 +217,7 @@ public class ITCostTests extends BaseTestClass {
   @Test(enabled = true, groups= "user-cost-ceiling")
   public void testCostCeilingWithProrityMaxConcurrent() throws Exception {
 
-    String query = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_5"), "10");
+    String query = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_5"), "5");
     HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5",
         DriverConfig.PRIORITY_MAX_CONCURRENT, "HIGH=3");
 
@@ -263,7 +263,7 @@ public class ITCostTests extends BaseTestClass {
   @Test(enabled = true, groups= "user-cost-ceiling")
   public void multipleUserConcurrentPriorityThrottling() throws Exception {
 
-    String query = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_5"), "10");
+    String query = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_5"), "5");
     long timeToWait= 7 * SECONDS_IN_A_MINUTE; //in seconds
     int sleepTime = 5; //in seconds
     HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5",
@@ -289,10 +289,8 @@ public class ITCostTests extends BaseTestClass {
       List<QueryHandle> running = null, queued = null;
       for (int t = 0; t < timeToWait; t = t + sleepTime) {
 
-        running = qHelper.getQueryHandleList(null, "RUNNING", "all", 
sessionHandleString, null, null,
-            hiveDriver);
-        queued = qHelper.getQueryHandleList(null, "QUEUED", "all", 
sessionHandleString, null, null,
-            hiveDriver);
+        running = qHelper.getQueryHandleList(null, "RUNNING", "all", 
sessionHandleString, null, null, hiveDriver);
+        queued = qHelper.getQueryHandleList(null, "QUEUED", "all", 
sessionHandleString, null, null, hiveDriver);
         logger.info("Running query count : " + running.size() + "\t Queued 
query count : " + queued.size());
 
         if (running.isEmpty() && queued.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/lens/blob/3e7a6f60/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java
----------------------------------------------------------------------
diff --git 
a/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java
 
b/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java
new file mode 100644
index 0000000..2d852fb
--- /dev/null
+++ 
b/lens-regression/src/test/java/org/apache/lens/regression/throttling/Throttling.java
@@ -0,0 +1,574 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.regression.throttling;
+
+import java.lang.reflect.Method;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.WebTarget;
+
+import org.apache.lens.api.Priority;
+import org.apache.lens.api.query.LensQuery;
+import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.api.query.QueryStatus;
+import org.apache.lens.cube.parse.CubeQueryConfUtil;
+import org.apache.lens.driver.hive.HiveDriver;
+import org.apache.lens.regression.core.constants.DriverConfig;
+import org.apache.lens.regression.core.constants.QueryInventory;
+import org.apache.lens.regression.core.helpers.LensServerHelper;
+import org.apache.lens.regression.core.helpers.MetastoreHelper;
+import org.apache.lens.regression.core.helpers.QueryHelper;
+import org.apache.lens.regression.core.helpers.ServiceManagerHelper;
+import org.apache.lens.regression.core.helpers.SessionHelper;
+import org.apache.lens.regression.core.testHelper.BaseTestClass;
+import org.apache.lens.regression.util.Util;
+import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.util.LensUtil;
+
+import org.apache.log4j.Logger;
+
+import org.testng.Assert;
+import org.testng.annotations.*;
+
+public class Throttling extends BaseTestClass {
+
+  WebTarget servLens;
+  String sessionHandleString;
+
+  LensServerHelper lens = getLensServerHelper();
+  MetastoreHelper mHelper = getMetastoreHelper();
+  SessionHelper sHelper = getSessionHelper();
+  QueryHelper qHelper = getQueryHelper();
+
+  public static final String SLEEP_QUERY = QueryInventory.getSleepQuery("5");
+  public static final String COST_95 = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_95"), "7");
+  public static final String COST_60 = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_60"), "7");
+  public static final String COST_30 = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_30"), "6");
+  public static final String COST_20 = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_20"), "6");
+  public static final String COST_10 = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_10"), "6");
+  public static final String COST_5 = 
String.format(QueryInventory.getQueryFromInventory("HIVE.SLEEP_COST_5"), "5");
+  public static final String JDBC_QUERY1 = 
QueryInventory.getQueryFromInventory("JDBC.QUERY1");
+
+  private static String hiveDriver = "hive/hive1";
+  String hiveDriverConf = lens.getServerDir() + 
"/conf/drivers/hive/hive1/hivedriver-site.xml";
+  private static final long SECONDS_IN_A_MINUTE = 60;
+  private String session1 = null, session2 = null;
+  //TODO : Read queue names from property file
+  private static String queue1 = "queue1", queue2 = "queue2";
+
+  private static Logger logger = Logger.getLogger(Throttling.class);
+
+  @BeforeClass(alwaysRun = true)
+  public void initialize() throws Exception {
+    servLens = ServiceManagerHelper.init();
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "10",
+        HiveDriver.HS2_PRIORITY_RANGES, "HIGH,7,NORMAL,30,LOW,90,VERY_LOW");
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+  }
+
+  @BeforeMethod(alwaysRun = true)
+  public void setUp(Method method) throws Exception {
+    logger.info("Test Name: " + method.getName());
+    sessionHandleString = lens.openSession(lens.getCurrentDB());
+    session1 = sHelper.openNewSession("diff1", "diff1", lens.getCurrentDB());
+    session2 = sHelper.openNewSession("diff2", "diff2", lens.getCurrentDB());
+
+    sHelper.setAndValidateParam(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, 
"false");
+    sHelper.setAndValidateParam(session1, 
CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false");
+    sHelper.setAndValidateParam(session2, 
CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, "false");
+  }
+
+  @AfterMethod(alwaysRun = true)
+  public void afterMethod(Method method) throws Exception {
+    logger.info("Test Name: " + method.getName());
+    qHelper.killQuery(null, "QUEUED", "all");
+    qHelper.killQuery(null, "RUNNING", "all");
+    qHelper.killQuery(null, "EXECUTED", "all");
+    sHelper.closeNewSession(session1);
+    sHelper.closeNewSession(session2);
+    sHelper.closeNewSession(sessionHandleString);
+
+    Util.changeConfig(hiveDriverConf);
+    lens.restart();
+  }
+
+  @AfterClass(alwaysRun = true)
+  public void closeSession() throws Exception {
+  }
+
+
+  @Test(enabled = true)
+  public void testHiveThrottling() throws Exception {
+
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "2");
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+
+    List<QueryHandle> handleList = new ArrayList<>();
+    handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, 
session1).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, null, 
session2).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(SLEEP_QUERY, 
null).getData());
+
+    Thread.sleep(1000);
+
+    List<QueryStatus> statusList = new ArrayList<>();
+    for(QueryHandle handle : handleList){
+      statusList.add(qHelper.getQueryStatus(handle));
+    }
+
+    Assert.assertEquals(statusList.get(0).getStatus(), 
QueryStatus.Status.RUNNING);
+    Assert.assertEquals(statusList.get(1).getStatus(), 
QueryStatus.Status.RUNNING);
+    Assert.assertEquals(statusList.get(2).getStatus(), 
QueryStatus.Status.QUEUED);
+    Assert.assertEquals(statusList.get(3).getStatus(), 
QueryStatus.Status.QUEUED);
+
+    qHelper.waitForCompletion(handleList.get(0));
+    Thread.sleep(100);
+    Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), 
QueryStatus.Status.RUNNING);
+  }
+
+
+  @Test(enabled = true)
+  public void testHiveMaxConcurrentRandomQueryIngestion() throws Exception {
+
+    long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
+    int sleepTime = 3, maxConcurrent = 4;
+    List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+        String.valueOf(maxConcurrent));
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+
+    for (int i = 0; i < 5; i++) {
+      handleList.add((QueryHandle) 
qHelper.executeQuery(SLEEP_QUERY).getData());
+      handleList.add((QueryHandle) 
qHelper.executeQuery(JDBC_QUERY1).getData());
+      handleList.add((QueryHandle) 
qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData());
+    }
+
+    Thread.sleep(50);
+
+    List<QueryHandle> running = null, queued = null;
+    for (int t = 0; t < timeToWait; t = t + sleepTime) {
+
+      running = qHelper.getQueryHandleList(null, "RUNNING", "all", 
sessionHandleString, null, null, hiveDriver);
+      queued = qHelper.getQueryHandleList(null, "QUEUED", "all", 
sessionHandleString, null, null, hiveDriver);
+      logger.info("Running query count : " + running.size() + "\t Queued query 
count : " + queued.size());
+
+      if (running.isEmpty() && queued.isEmpty()) {
+        break;
+      }
+      Assert.assertTrue(running.size() <= maxConcurrent);
+
+      if (t % 30 == 0 && t < 200) {
+        handleList.add((QueryHandle) 
qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData());
+        handleList.add((QueryHandle) 
qHelper.executeQuery(QueryInventory.JDBC_CUBE_QUERY).getData());
+        handleList.add((QueryHandle) 
qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY, null, session1).getData());
+      }
+      TimeUnit.SECONDS.sleep(sleepTime);
+    }
+
+    Assert.assertTrue(running.isEmpty());
+    Assert.assertTrue(queued.isEmpty());
+
+    for(QueryHandle q : handleList){
+      LensQuery lq = qHelper.waitForCompletion(q);
+      Assert.assertEquals(lq.getStatus().getStatus(), 
QueryStatus.Status.SUCCESSFUL);
+    }
+  }
+
+  @Test(enabled = true)
+  public void testProrityMaxConcurrent() throws Exception {
+
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5",
+        DriverConfig.PRIORITY_MAX_CONCURRENT, "HIGH=2,VERY_LOW=1", 
HiveDriver.HS2_PRIORITY_RANGES,
+        "HIGH,7,NORMAL,30,LOW,90,VERY_LOW");
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+
+    /* First 3 are high priority queries, 2 of them will go to RUNNING, 3rd 
will be queued as there is a
+      threshold of 2 on HIGH priority. cost_95 queries are very_low priority 
ones, hence 1st will go to RUNNING,
+      other 1 is queued. cost_20 and cost_60 goes to running as they are 
normal and low priority queries and there
+      is no limit set for low and normal priority.Last cost_60 query goes to 
queue as, RUNNING query on this
+      driver has reached max concurrent threshold.
+    */
+
+    String[] queries = {COST_5, COST_5, COST_5, COST_95, COST_95, COST_20, 
COST_60, COST_60};
+    QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, 
QueryStatus.Status.RUNNING,
+      QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, 
QueryStatus.Status.QUEUED,
+      QueryStatus.Status.RUNNING, QueryStatus.Status.RUNNING, 
QueryStatus.Status.QUEUED, };
+
+    List<QueryHandle> handleList = new ArrayList<>();
+    for (String query : queries){
+      handleList.add((QueryHandle) qHelper.executeQuery(query).getData());
+    }
+
+    List<QueryStatus.Status> statusList = new ArrayList<>();
+    for (QueryHandle handle : handleList){
+      statusList.add(qHelper.getQueryStatus(handle).getStatus());
+    }
+
+    for (int i=0; i<statusList.size(); i++){
+      Assert.assertEquals(statusList.get(i), expectedStatus[i]);
+    }
+
+    qHelper.waitForCompletion(handleList.get(0));
+    Assert.assertEquals(qHelper.getQueryStatus(handleList.get(2)).getStatus(), 
QueryStatus.Status.RUNNING);
+  }
+
+  @Test(enabled = true)
+  public void prioritySumMoreThanMaxConcurrent() throws Exception {
+
+    long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
+    int sleepTime = 5, maxConcurrent = 5, lowConCurrent = 2, veryLowConcurrent 
= 1, highConcurrent = 4,
+        normalConcurrent = 2;
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+        String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT,
+        "LOW=" + String.valueOf(lowConCurrent) + ",VERY_LOW=" + 
String.valueOf(veryLowConcurrent),
+        "NORMAL=" + String.valueOf(normalConcurrent) + ",HIGH=" + 
String.valueOf(highConcurrent));
+
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+
+    List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+    for (int i=1; i<=5; i++){
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, 
session1).getData());
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, 
session2).getData());
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData());
+    }
+
+    List<QueryHandle> running=null, queued=null;
+    for (int t = 0; t < timeToWait; t = t + sleepTime) {
+
+      running = qHelper.getQueryHandleList(null, "RUNNING", "all", 
sessionHandleString, null, null, hiveDriver);
+      queued = qHelper.getQueryHandleList(null, "QUEUED", "all", 
sessionHandleString, null, null, hiveDriver);
+      logger.info("Running query count : " + running.size() + "\t Queued query 
count : " + queued.size());
+
+      if (running.isEmpty() && queued.isEmpty()) {
+        break;
+      }
+
+      Assert.assertTrue(running.size() <= maxConcurrent);
+
+      int low = 0, veryLow = 0, high = 0, normal = 0;
+      for (QueryHandle qh : running) {
+        Priority p = qHelper.getLensQuery(sessionHandleString, 
qh).getPriority();
+        Assert.assertNotNull(p);
+        switch (p) {
+        case HIGH:
+          high++;
+          break;
+        case NORMAL:
+          normal++;
+          break;
+        case LOW:
+          low++;
+          break;
+        case VERY_LOW:
+          veryLow++;
+          break;
+        default:
+          throw new Exception("Unexpected Priority");
+        }
+      }
+
+      Assert.assertTrue(low <= lowConCurrent);
+      Assert.assertTrue(veryLow <= veryLowConcurrent);
+      Assert.assertTrue(high <= highConcurrent);
+      Assert.assertTrue(normal <= normalConcurrent);
+
+      TimeUnit.SECONDS.sleep(sleepTime);
+    }
+
+    Assert.assertTrue(queued.isEmpty());
+    Assert.assertTrue(running.isEmpty());
+
+    for (QueryHandle q: handleList){
+      LensQuery lq = qHelper.waitForCompletion(q);
+      Assert.assertEquals(lq.getStatus().getStatus(), 
QueryStatus.Status.SUCCESSFUL);
+    }
+  }
+
+
+  @Test(enabled = true)
+  public void queueMaxConcurrent() throws Exception {
+
+    int maxConcurrent = 3, queue1Count = 1, queue2Count = 2;
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+        String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT,
+        queue1 + "=" + String.valueOf(queue1Count) + "," + queue2 + "=" + 
String.valueOf(queue2Count));
+    List<QueryHandle> handleList = new ArrayList<>();
+
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+
+    sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, 
queue1);
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+
+    sHelper.setAndValidateParam(LensConfConstants.MAPRED_JOB_QUEUE_NAME, 
queue2);
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_95).getData());
+
+    List<QueryStatus.Status> statusList = new ArrayList<>();
+    for (QueryHandle handle : handleList){
+      statusList.add(qHelper.getQueryStatus(handle).getStatus());
+    }
+
+    Assert.assertEquals(statusList.get(0), QueryStatus.Status.RUNNING);
+    Assert.assertEquals(statusList.get(1), QueryStatus.Status.QUEUED);
+    Assert.assertEquals(statusList.get(2), QueryStatus.Status.RUNNING);
+    Assert.assertEquals(statusList.get(3), QueryStatus.Status.RUNNING);
+    Assert.assertEquals(statusList.get(4), QueryStatus.Status.QUEUED);
+
+    qHelper.waitForCompletion(handleList.get(0));
+    Assert.assertEquals(qHelper.getQueryStatus(handleList.get(1)).getStatus(), 
QueryStatus.Status.RUNNING);
+
+    qHelper.waitForCompletion(handleList.get(2));
+    Assert.assertEquals(qHelper.getQueryStatus(handleList.get(4)).getStatus(), 
QueryStatus.Status.RUNNING);
+  }
+
+  @Test(enabled = true)
+  public void enableQueueThrottlingWithExistingQueuedQueries() throws 
Exception {
+
+    long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
+    int sleepTime = 5, maxConcurrent = 4, queue1Concurrent = 1, 
queue2Concurrent = 2;
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+        String.valueOf(maxConcurrent), DriverConfig.QUEUE_MAX_CONCURRENT,
+        queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + 
String.valueOf(queue2Concurrent));
+
+    sHelper.setAndValidateParam(session1, 
LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
+    sHelper.setAndValidateParam(session2, 
LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+
+    List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+    for (int i = 1; i <= 3; i++) {
+      handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData());
+      handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", 
session1).getData());
+      handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", 
session2).getData());
+    }
+
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+
+    for (int i = 1; i <= 2; i++) {
+      handleList.add((QueryHandle)qHelper.executeQuery(COST_95).getData());
+      handleList.add((QueryHandle)qHelper.executeQuery(COST_20, "", 
session1).getData());
+      handleList.add((QueryHandle)qHelper.executeQuery(COST_95, "", 
session2).getData());
+    }
+
+    List<QueryHandle> running = null, queued = null;
+    for (int t = 0; t < timeToWait; t = t + sleepTime) {
+
+      running = qHelper.getQueryHandleList(null, "RUNNING", "all", 
sessionHandleString, null, null, hiveDriver);
+      queued = qHelper.getQueryHandleList(null, "QUEUED", "all", 
sessionHandleString,  null, null, hiveDriver);
+      logger.info("Running query count : " + running.size() + "\t Queued query 
count : " + queued.size());
+
+      if (running.isEmpty() && queued.isEmpty()) {
+        break;
+      }
+      Assert.assertTrue(running.size() <= maxConcurrent);
+
+      int queue1Count = 0, queue2Count = 0;
+      for (QueryHandle qh : running) {
+        String queue = qHelper.getLensQuery(sessionHandleString, 
qh).getQueryConf().getProperties()
+            .get("mapreduce.job.queuename");
+        Assert.assertNotNull(queue);
+
+        if (queue.equals(queue1)) {
+          queue1Count++;
+        } else if (queue.equals(queue2)) {
+          queue2Count++;
+        }
+      }
+
+      Assert.assertTrue(queue1Count <= queue1Concurrent, "queue1 count : " + 
queue1Count);
+      Assert.assertTrue(queue2Count <= queue2Concurrent, "queue2 count : " + 
queue2Count);
+      TimeUnit.SECONDS.sleep(sleepTime);
+    }
+
+    Assert.assertTrue(running.isEmpty());
+    Assert.assertTrue(queued.isEmpty());
+
+    for(QueryHandle q: handleList){
+      LensQuery lq = qHelper.waitForCompletion(q);
+      Assert.assertEquals(lq.getStatus().getStatus(), 
QueryStatus.Status.SUCCESSFUL);
+    }
+  }
+
+
+  @Test(enabled = true)
+  public void queueAndPriorityMaxConcurrent() throws Exception {
+
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "5",
+      DriverConfig.PRIORITY_MAX_CONCURRENT, "LOW=2,VERY_LOW=1",
+      DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=2");
+
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+
+    sHelper.setAndValidateParam(session1, 
LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
+    sHelper.setAndValidateParam(session2, 
LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+
+    QueryStatus.Status[] expectedStatus = {QueryStatus.Status.RUNNING, 
QueryStatus.Status.QUEUED,
+      QueryStatus.Status.QUEUED, QueryStatus.Status.RUNNING, 
QueryStatus.Status.RUNNING,
+      QueryStatus.Status.RUNNING, QueryStatus.Status.QUEUED, 
QueryStatus.Status.QUEUED, };
+
+    List<QueryHandle> handleList = new ArrayList<>();
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, 
session1).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, 
session1).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_95, null, 
session2).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_60, null, 
session2).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, 
session2).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_60).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_20, null, 
session2).getData());
+    handleList.add((QueryHandle) qHelper.executeQuery(COST_5, null, 
session2).getData());
+
+    List<QueryStatus> statusList = new ArrayList<>();
+    for(QueryHandle handle: handleList){
+      statusList.add(qHelper.getQueryStatus(handle));
+    }
+
+    for(int i=0; i<expectedStatus.length; i++){
+      Assert.assertEquals(statusList.get(i).getStatus(), expectedStatus[i], 
"failed : query-" + i);
+    }
+  }
+
+
+  @Test(enabled = true)
+  public void queueAndPriorityMaxConcurrentMany() throws Exception {
+
+    long timeToWait= 5 * SECONDS_IN_A_MINUTE; // in seconds
+    int sleepTime = 5, maxConcurrent = 5, queue1Concurrent = 1, 
queue2Concurrent = 2, priority1 = 2, priority2 = 1;
+
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES,
+      String.valueOf(maxConcurrent), DriverConfig.PRIORITY_MAX_CONCURRENT,
+      "HIGH=" + String.valueOf(priority1) + ",NORMAL=" + 
String.valueOf(priority2),
+      DriverConfig.QUEUE_MAX_CONCURRENT,
+      queue1 + "=" + String.valueOf(queue1Concurrent) + "," + queue2 + "=" + 
String.valueOf(queue2Concurrent));
+
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+
+    sHelper.setAndValidateParam(session1, 
LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue1);
+    sHelper.setAndValidateParam(session2, 
LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+
+    List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+    for (int i = 1; i <= 3; i++) {
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_5).getData());
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_20, "", 
session1).getData());
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_60, "", 
session2).getData());
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_20).getData());
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_95, "", 
session1).getData());
+      handleList.add((QueryHandle) qHelper.executeQuery(COST_5, "", 
session2).getData());
+    }
+
+    List<QueryHandle> running = null, queued = null;
+    for (int t = 0; t < timeToWait; t = t + sleepTime) {
+
+      running = qHelper.getQueryHandleList(null, "RUNNING", "all", 
sessionHandleString, null, null, hiveDriver);
+      queued = qHelper.getQueryHandleList(null, "QUEUED", "all", 
sessionHandleString, null, null, hiveDriver);
+      logger.info("Running query count : " + running.size() + "\t Queued query 
count : " + queued.size());
+
+      if (running.isEmpty() && queued.isEmpty()) {
+        break;
+      }
+
+      Assert.assertTrue(running.size() <= maxConcurrent);
+
+      int pCount1 = 0, pCount2 = 0, queue1Count = 0, queue2Count = 0;
+      for (QueryHandle qh : running) {
+        Priority priority = qHelper.getLensQuery(sessionHandleString, 
qh).getPriority();
+        String queue = qHelper.getLensQuery(sessionHandleString, 
qh).getQueryConf().getProperties()
+            .get("mapreduce.job.queuename");
+
+        Assert.assertNotNull(priority);
+        Assert.assertNotNull(queue);
+
+        if (priority.equals(Priority.LOW)){
+          pCount1++;
+        } else if (priority.equals(Priority.VERY_LOW)){
+          pCount2++;
+        }
+
+        if (queue.equals(queue1)){
+          queue1Count++;
+        } else if (queue.equals(queue2)) {
+          queue2Count++;
+        }
+      }
+
+      Assert.assertTrue(pCount1 <= priority1, "proirty-1 count : " + pCount1);
+      Assert.assertTrue(pCount2 <= priority2, "priority-2 count : " + pCount2);
+      Assert.assertTrue(queue1Count <= queue1Concurrent, "queue-1 count : " + 
queue1Count);
+      Assert.assertTrue(queue2Count <= queue2Concurrent, "queue-2 count : " + 
queue2Count);
+
+      TimeUnit.SECONDS.sleep(sleepTime);
+    }
+
+    Assert.assertTrue(queued.isEmpty());
+    Assert.assertTrue(running.isEmpty());
+
+    for (QueryHandle q : handleList) {
+      LensQuery lq = qHelper.waitForCompletion(q);
+      Assert.assertEquals(lq.getStatus().getStatus(), 
QueryStatus.Status.SUCCESSFUL);
+    }
+  }
+
+  /*
+    LENS-973. Scenario is mentioned in jira
+  */
+
+  @Test(enabled = true)
+  public void queueConstraintFailureOnRestart() throws Exception {
+
+    List<QueryHandle> handleList = new ArrayList<QueryHandle>();
+
+    HashMap<String, String> map = 
LensUtil.getHashMap(DriverConfig.MAX_CONCURRENT_QUERIES, "3",
+        DriverConfig.QUEUE_MAX_CONCURRENT, queue1 + "=1," + queue2 + "=3");
+    Util.changeConfig(map, hiveDriverConf);
+    lens.restart();
+
+    String newSession = sHelper.openNewSession("user", "pwd", 
lens.getCurrentDB());
+    sHelper.setAndValidateParam(newSession, 
LensConfConstants.MAPRED_JOB_QUEUE_NAME, queue2);
+    handleList.add((QueryHandle) 
qHelper.executeQuery(QueryInventory.SLEEP_QUERY, null, newSession).getData());
+
+    for(int i=0; i<2; i++){
+      handleList.add((QueryHandle) 
qHelper.executeQuery(QueryInventory.HIVE_CUBE_QUERY).getData());
+    }
+
+    sHelper.closeNewSession(newSession);
+    lens.restart();
+    Assert.assertFalse(qHelper.getQueryStatus(handleList.get(0)).finished());
+
+    for(int i=0; i<6; i++){
+      handleList.add((QueryHandle) 
qHelper.executeQuery(QueryInventory.HIVE_DIM_QUERY).getData());
+    }
+
+    for(QueryHandle handle: handleList){
+      LensQuery lq = qHelper.waitForCompletion(handle);
+      Assert.assertEquals(lq.getStatus().getStatus(), 
QueryStatus.Status.SUCCESSFUL);
+    }
+  }
+}
+

Reply via email to