Repository: lens Updated Branches: refs/heads/master 73f92430c -> 4d3d2f82f
LENS-890 : Adds per-queue and per-priority driver max launched queries constraints Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/4d3d2f82 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/4d3d2f82 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/4d3d2f82 Branch: refs/heads/master Commit: 4d3d2f82fb93ee4d5c52dc3b4910573953094c0a Parents: 73f9243 Author: Rajat Khandelwal <pro...@apache.org> Authored: Tue Dec 15 18:45:08 2015 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Tue Dec 15 18:45:08 2015 +0530 ---------------------------------------------------------------------- .../org/apache/lens/api/util/CommonUtils.java | 27 ++- .../FactPartitionBasedQueryCostCalculator.java | 8 +- .../org/apache/lens/driver/hive/HiveDriver.java | 68 +++--- .../apache/lens/driver/hive/TestHiveDriver.java | 233 ++++++++++++------- .../src/test/resources/priority_tests.data | 1 + .../server/api/driver/AbstractLensDriver.java | 13 +- .../lens/server/api/driver/LensDriver.java | 13 +- .../server/api/query/AbstractQueryContext.java | 9 +- .../lens/server/api/query/QueryContext.java | 12 +- .../MaxConcurrentDriverQueriesConstraint.java | 54 ++++- ...oncurrentDriverQueriesConstraintFactory.java | 49 +++- .../api/query/TestAbstractQueryContext.java | 4 +- ...axConcurrentDriverQueriesConstraintTest.java | 181 +++++++++++++- .../server/query/QueryExecutionServiceImpl.java | 1 + .../ThreadSafeEstimatedQueryCollectionTest.java | 3 +- 15 files changed, 527 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-api/src/main/java/org/apache/lens/api/util/CommonUtils.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/util/CommonUtils.java b/lens-api/src/main/java/org/apache/lens/api/util/CommonUtils.java index 38d58c7..119c924 100644 --- a/lens-api/src/main/java/org/apache/lens/api/util/CommonUtils.java +++ b/lens-api/src/main/java/org/apache/lens/api/util/CommonUtils.java @@ -27,6 +27,25 @@ public class CommonUtils { } + public interface EntryParser<K, V> { + K parseKey(String str); + + V parseValue(String str); + } + + private static EntryParser<String, String> defaultEntryParser = new EntryParser<String, String>() { + @Override + public String parseKey(String str) { + return str; + } + + @Override + public String parseValue(String str) { + return str; + } + }; + + /** * Splits given String str around non-escaped commas. Then parses each of the split element * as map entries in the format `key=value`. Constructs a map of such entries. @@ -36,7 +55,11 @@ public class CommonUtils { * @return parsed map */ public static Map<String, String> parseMapFromString(String str) { - Map<String, String> map = new HashMap<>(); + return parseMapFromString(str, defaultEntryParser); + } + + public static <K, V> Map<K, V> parseMapFromString(String str, EntryParser<K, V> parser) { + Map<K, V> map = new HashMap<>(); if (str != null) { for (String kv : str.split("(?<!\\\\),")) { if (!kv.isEmpty()) { @@ -49,7 +72,7 @@ public class CommonUtils { if (kvArray.length > 1) { value = kvArray[1].replaceAll("\\\\,", ",").trim(); } - map.put(key, value); + map.put(parser.parseKey(key), parser.parseValue(value)); } } } http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-cube/src/main/java/org/apache/lens/cube/query/cost/FactPartitionBasedQueryCostCalculator.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/query/cost/FactPartitionBasedQueryCostCalculator.java b/lens-cube/src/main/java/org/apache/lens/cube/query/cost/FactPartitionBasedQueryCostCalculator.java index d56e1c7..9fecdbc 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/query/cost/FactPartitionBasedQueryCostCalculator.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/query/cost/FactPartitionBasedQueryCostCalculator.java @@ -47,8 +47,11 @@ public class FactPartitionBasedQueryCostCalculator implements QueryCostCalculato */ @SuppressWarnings("unchecked") // required for (Set<FactPartition>) casting - private double getTotalPartitionCost(final AbstractQueryContext queryContext, LensDriver driver) + private Double getTotalPartitionCost(final AbstractQueryContext queryContext, LensDriver driver) throws LensException { + if (queryContext.getDriverRewriterPlan(driver) == null) { + return null; + } double cost = 0; for (Map.Entry<String, Set<?>> entry : getAllPartitions(queryContext, driver).entrySet()) { // Have to do instanceof check, since it can't be handled by polymorphism. @@ -86,7 +89,8 @@ public class FactPartitionBasedQueryCostCalculator implements QueryCostCalculato @Override public QueryCost calculateCost(final AbstractQueryContext queryContext, LensDriver driver) throws LensException { - return new FactPartitionBasedQueryCost(getTotalPartitionCost(queryContext, driver)); + Double cost = getTotalPartitionCost(queryContext, driver); + return cost == null ? null : new FactPartitionBasedQueryCost(cost); } public Map<String, Set<?>> getAllPartitions(AbstractQueryContext queryContext, LensDriver driver) { http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java index 253cfc4..7391f47 100644 --- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java +++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java @@ -20,8 +20,14 @@ package org.apache.lens.driver.hive; import static org.apache.lens.server.api.util.LensUtil.getImplementations; -import java.io.*; -import java.util.*; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; @@ -29,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.lens.api.LensConf; import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.Priority; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryPrepareHandle; import org.apache.lens.cube.query.cost.FactPartitionBasedQueryCostCalculator; @@ -50,7 +57,6 @@ import org.apache.lens.server.api.query.priority.CostToPriorityRangeConf; import org.apache.lens.server.api.query.priority.QueryPriorityDecider; import org.apache.commons.lang.StringUtils; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -66,7 +72,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; import com.google.common.collect.ImmutableSet; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -110,6 +115,7 @@ public class HiveDriver extends AbstractLensDriver { private HiveConf hiveConf; /** The hive handles. */ + @Getter private Map<QueryHandle, OperationHandle> hiveHandles = new ConcurrentHashMap<QueryHandle, OperationHandle>(); /** The orphaned hive sessions. */ @@ -383,10 +389,12 @@ public class HiveDriver extends AbstractLensDriver { private QueryCost calculateQueryCost(AbstractQueryContext qctx) throws LensException { if (qctx.isOlapQuery()) { - return queryCostCalculator.calculateCost(qctx, this); - } else { - return new FactPartitionBasedQueryCost(Double.MAX_VALUE); + QueryCost cost = queryCostCalculator.calculateCost(qctx, this); + if (cost != null) { + return cost; + } } + return new FactPartitionBasedQueryCost(Double.MAX_VALUE); } @Override @@ -548,22 +556,7 @@ public class HiveDriver extends AbstractLensDriver { addPersistentPath(ctx); Configuration qdconf = ctx.getDriverConf(this); qdconf.set("mapred.job.name", ctx.getQueryHandle().toString()); - //Query is already explained. - log.info("whetherCalculatePriority: {}", whetherCalculatePriority); - if (whetherCalculatePriority) { - try { - // Inside try since non-data fetching queries can also be executed by async method. - String priority = ctx.calculateCostAndDecidePriority(this, queryCostCalculator, queryPriorityDecider) - .toString(); - qdconf.set("mapred.job.priority", priority); - log.info("set priority to {}", priority); - } catch (Exception e) { - // not failing query launch when setting priority fails - // priority will be set to usually NORMAL - the default in underlying system. - log.error("could not set priority for lens session id:{} User query: {}", ctx.getLensSessionIdentifier(), - ctx.getUserQuery(), e); - } - } + decidePriority(ctx); queryHook.preLaunch(ctx); SessionHandle sessionHandle = getSession(ctx); OperationHandle op = getClient().executeStatementAsync(sessionHandle, ctx.getSelectedDriverQuery(), @@ -809,6 +802,27 @@ public class HiveDriver extends AbstractLensDriver { return selectionPolicies; } + @Override + public Priority decidePriority(QueryContext ctx) { + if (whetherCalculatePriority && ctx.getDriverConf(this).get("mapred.job.priority") == null) { + try { + // Inside try since non-data fetching queries can also be executed by async method. + Priority priority = ctx.decidePriority(this, queryPriorityDecider); + String priorityStr = priority.toString(); + ctx.getDriverConf(this).set("mapred.job.priority", priorityStr); + log.info("set priority to {}", priority); + return priority; + } catch (Exception e) { + // not failing query launch when setting priority fails + // priority will be set to usually NORMAL - the default in underlying system. + log.error("could not set priority for lens session id:{} User query: {}", ctx.getLensSessionIdentifier(), + ctx.getUserQuery(), e); + return null; + } + } + return null; + } + protected CLIServiceClient getClient() throws LensException { if (isEmbedded) { if (embeddedConnection == null) { @@ -837,7 +851,7 @@ public class HiveDriver extends AbstractLensDriver { thriftConnExpiryQueue.offer(connection); threadConnections.put(connectionKey, connection); log.info("New thrift connection {} for thread: {} for user: {} connection ID={} on driver:{}", - connectionClass, Thread.currentThread().getId(), user, connection.getConnId(), getFullyQualifiedName()); + connectionClass, Thread.currentThread().getId(), user, connection.getConnId(), getFullyQualifiedName()); } catch (Exception e) { throw new LensException(e); } @@ -939,14 +953,14 @@ public class HiveDriver extends AbstractLensDriver { hiveSession = getClient().openSession(ctx.getClusterUser(), ""); lensToHiveSession.put(sessionDbKey, hiveSession); log.info("New hive session for user: {} , lens session: {} , hive session handle: {} , driver : {}", - ctx.getClusterUser(), sessionDbKey, hiveSession.getHandleIdentifier(), getFullyQualifiedName()); + ctx.getClusterUser(), sessionDbKey, hiveSession.getHandleIdentifier(), getFullyQualifiedName()); for (LensEventListener<DriverEvent> eventListener : driverListeners) { try { eventListener.onEvent(new DriverSessionStarted(System.currentTimeMillis(), this, lensSession, hiveSession .getSessionId().toString())); } catch (Exception exc) { log.error("Error sending driver {} start event to listener {}", getFullyQualifiedName(), eventListener, - exc); + exc); } } } catch (Exception e) { @@ -1218,7 +1232,7 @@ public class HiveDriver extends AbstractLensDriver { if (isSessionInvalid(exc, session)) { // We have to expire previous session log.info("{} Hive server session {} for lens session {} has become invalid", getFullyQualifiedName(), session, - lensSession); + lensSession); sessionLock.lock(); try { // We should close all connections and clear the session map since http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java index 11efd3c..06552ea 100644 --- a/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java +++ b/lens-driver-hive/src/test/java/org/apache/lens/driver/hive/TestHiveDriver.java @@ -33,7 +33,9 @@ import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.driver.*; import org.apache.lens.server.api.driver.DriverQueryStatus.DriverQueryState; import org.apache.lens.server.api.error.LensException; -import org.apache.lens.server.api.query.*; +import org.apache.lens.server.api.query.ExplainQueryContext; +import org.apache.lens.server.api.query.PreparedQueryContext; +import org.apache.lens.server.api.query.QueryContext; import org.apache.lens.server.api.query.cost.QueryCost; import org.apache.lens.server.api.query.priority.CostRangePriorityDecider; import org.apache.lens.server.api.query.priority.CostToPriorityRangeConf; @@ -41,8 +43,10 @@ import org.apache.lens.server.api.user.MockDriverQueryHook; import org.apache.lens.server.api.util.LensUtil; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.HiveDriverRunHook; @@ -53,6 +57,7 @@ import org.apache.hive.service.cli.ColumnDescriptor; import org.testng.annotations.*; +import com.beust.jcommander.internal.Maps; import com.google.common.collect.Lists; @@ -81,6 +86,8 @@ public class TestHiveDriver { protected String sessionid; protected SessionState ss; + private CostRangePriorityDecider alwaysNormalPriorityDecider + = new CostRangePriorityDecider(new CostToPriorityRangeConf("")); /** * Before test. @@ -173,6 +180,7 @@ public class TestHiveDriver { * @throws Exception the exception */ protected void createTestTable(String tableName) throws Exception { + int handleSize = getHandleSize(); System.out.println("Hadoop Location: " + System.getProperty("hadoop.bin.path")); String createTable = "CREATE TABLE IF NOT EXISTS " + tableName + "(ID STRING)" + " TBLPROPERTIES ('" + LensConfConstants.STORAGE_COST + "'='500')"; @@ -186,7 +194,7 @@ public class TestHiveDriver { context = createContext(dataLoad, conf); resultSet = driver.execute(context); assertNull(resultSet); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); } /** @@ -196,6 +204,7 @@ public class TestHiveDriver { * @throws Exception the exception */ protected void createPartitionedTable(String tableName) throws Exception { + int handleSize = getHandleSize(); System.out.println("Hadoop Location: " + System.getProperty("hadoop.bin.path")); String createTable = "CREATE TABLE IF NOT EXISTS " + tableName + "(ID STRING)" + " PARTITIONED BY (dt string) TBLPROPERTIES ('" @@ -212,7 +221,7 @@ public class TestHiveDriver { context = createContext(dataLoad, conf); resultSet = driver.execute(context); assertNull(resultSet); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); } // Tests @@ -241,6 +250,7 @@ public class TestHiveDriver { */ @Test public void testTemptable() throws Exception { + int handleSize = getHandleSize(); createTestTable("test_temp"); conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, false); Hive.get(conf).dropTable("test_temp_output"); @@ -248,15 +258,15 @@ public class TestHiveDriver { QueryContext context = createContext(query, conf); LensResultSet resultSet = driver.execute(context); assertNull(resultSet); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); // fetch results from temp table String select = "SELECT * FROM test_temp_output"; context = createContext(select, conf); resultSet = driver.execute(context); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); validateInMemoryResult(resultSet, "test_temp_output"); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); } /** @@ -266,6 +276,7 @@ public class TestHiveDriver { */ @Test public void testExecuteQuery() throws Exception { + int handleSize = getHandleSize(); createTestTable("test_execute"); LensResultSet resultSet = null; // Execute a select query @@ -287,7 +298,7 @@ public class TestHiveDriver { context = createContext(select, conf); resultSet = driver.execute(context); validatePersistentResult(resultSet, TEST_DATA_FILE, context.getHDFSResultDir(), true); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); } /** @@ -383,6 +394,7 @@ public class TestHiveDriver { */ @Test public void testExecuteQueryAsync() throws Exception { + int handleSize = getHandleSize(); createTestTable("test_execute_sync"); // Now run a command that would fail @@ -392,11 +404,11 @@ public class TestHiveDriver { failConf.set("hive.exec.driver.run.hooks", FailHook.class.getCanonicalName()); QueryContext context = createContext(expectFail, failConf); driver.executeAsync(context); - assertEquals(1, driver.getHiveHandleSize()); + assertHandleSize(handleSize + 1); validateExecuteAsync(context, DriverQueryState.FAILED, true, false); - assertEquals(1, driver.getHiveHandleSize()); + assertHandleSize(handleSize + 1); driver.closeQuery(context.getQueryHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); // Async select query String select = "SELECT ID FROM test_execute_sync"; conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, false); @@ -404,18 +416,18 @@ public class TestHiveDriver { driver.executeAsync(context); assertNotNull(context.getDriverConf(driver).get("mapred.job.name")); assertNotNull(context.getDriverConf(driver).get("mapred.job.priority")); - assertEquals(1, driver.getHiveHandleSize()); + assertHandleSize(handleSize + 1); validateExecuteAsync(context, DriverQueryState.SUCCESSFUL, false, false); driver.closeQuery(context.getQueryHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, true); context = createContext(select, conf); driver.executeAsync(context); - assertEquals(1, driver.getHiveHandleSize()); + assertHandleSize(handleSize + 1); validateExecuteAsync(context, DriverQueryState.SUCCESSFUL, true, false); driver.closeQuery(context.getQueryHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); conf.set(LensConfConstants.QUERY_OUTPUT_DIRECTORY_FORMAT, "ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'" @@ -424,10 +436,10 @@ public class TestHiveDriver { select = "SELECT ID, null, ID FROM test_execute_sync"; context = createContext(select, conf); driver.executeAsync(context); - assertEquals(1, driver.getHiveHandleSize()); + assertHandleSize(handleSize + 1); validateExecuteAsync(context, DriverQueryState.SUCCESSFUL, true, true); driver.closeQuery(context.getQueryHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); } /** @@ -483,6 +495,7 @@ public class TestHiveDriver { */ @Test public void testCancelAsyncQuery() throws Exception { + int handleSize = getHandleSize(); createTestTable("test_cancel_async"); conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, false); QueryContext context = createContext("SELECT ID FROM test_cancel_async", conf); @@ -491,7 +504,7 @@ public class TestHiveDriver { driver.updateStatus(context); assertEquals(context.getDriverStatus().getState(), DriverQueryState.CANCELED, "Expecting query to be cancelled"); driver.closeQuery(context.getQueryHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); try { driver.cancelQuery(context.getQueryHandle()); @@ -512,7 +525,7 @@ public class TestHiveDriver { */ private void validatePersistentResult(LensResultSet resultSet, String dataFile, Path outptuDir, boolean formatNulls) throws Exception { - assertTrue(resultSet instanceof HivePersistentResultSet); + assertTrue(resultSet instanceof HivePersistentResultSet, "resultset class: " + resultSet.getClass().getName()); HivePersistentResultSet persistentResultSet = (HivePersistentResultSet) resultSet; String path = persistentResultSet.getOutputPath(); @@ -567,6 +580,7 @@ public class TestHiveDriver { */ @Test public void testPersistentResultSet() throws Exception { + int handleSize = getHandleSize(); createTestTable("test_persistent_result_set"); conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, true); conf.setBoolean(LensConfConstants.QUERY_ADD_INSERT_OVEWRITE, true); @@ -574,14 +588,14 @@ public class TestHiveDriver { QueryContext ctx = createContext("SELECT ID FROM test_persistent_result_set", conf); LensResultSet resultSet = driver.execute(ctx); validatePersistentResult(resultSet, TEST_DATA_FILE, ctx.getHDFSResultDir(), false); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); ctx = createContext("SELECT ID FROM test_persistent_result_set", conf); driver.executeAsync(ctx); - assertEquals(1, driver.getHiveHandleSize()); + assertHandleSize(handleSize + 1); validateExecuteAsync(ctx, DriverQueryState.SUCCESSFUL, true, false); driver.closeQuery(ctx.getQueryHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); conf.set(LensConfConstants.QUERY_OUTPUT_DIRECTORY_FORMAT, "ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'" @@ -589,17 +603,17 @@ public class TestHiveDriver { + " 'field.delim'=',' ) STORED AS TEXTFILE "); ctx = createContext("SELECT ID, null, ID FROM test_persistent_result_set", conf); resultSet = driver.execute(ctx); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); validatePersistentResult(resultSet, TEST_DATA_FILE, ctx.getHDFSResultDir(), true); driver.closeQuery(ctx.getQueryHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); ctx = createContext("SELECT ID, null, ID FROM test_persistent_result_set", conf); driver.executeAsync(ctx); - assertEquals(1, driver.getHiveHandleSize()); + assertHandleSize(handleSize + 1); validateExecuteAsync(ctx, DriverQueryState.SUCCESSFUL, true, true); driver.closeQuery(ctx.getQueryHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); } /** @@ -640,6 +654,22 @@ public class TestHiveDriver { SessionState.setCurrentSessionState(ss); ExplainQueryContext ctx = createExplainContext("cube SELECT ID FROM test_cube", conf); ctx.setOlapQuery(true); + ctx.getDriverContext().setDriverRewriterPlan(driver, new DriverQueryPlan() { + @Override + public String getPlan() { + return null; + } + + @Override + public QueryCost getCost() { + return null; + } + + @Override + public Map<String, Set<?>> getPartitions() { + return Maps.newHashMap(); + } + }); QueryCost cost = driver.estimate(ctx); assertEquals(cost.getEstimatedResourceUsage(), 0.0); cost.getEstimatedExecTimeMillis(); @@ -666,14 +696,14 @@ public class TestHiveDriver { */ @Test public void testExplain() throws Exception { + int handleSize = getHandleSize(); SessionState.setCurrentSessionState(ss); SessionState.get().setCurrentDatabase(dataBase); createTestTable("test_explain"); - DriverQueryPlan plan = driver.explain(createExplainContext("SELECT ID FROM test_explain", conf)); assertTrue(plan instanceof HiveQueryPlan); assertEquals(plan.getTableWeight(dataBase + ".test_explain"), 500.0); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); // test execute prepare PreparedQueryContext pctx = new PreparedQueryContext("SELECT ID FROM test_explain", null, conf, drivers); @@ -686,36 +716,37 @@ public class TestHiveDriver { plan = driver.explainAndPrepare(pctx); QueryContext qctx = createContext(pctx, inConf); LensResultSet result = driver.execute(qctx); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); validateInMemoryResult(result); // test execute prepare async + conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, true); qctx = createContext(pctx, conf); driver.executeAsync(qctx); assertNotNull(qctx.getDriverOpHandle()); validateExecuteAsync(qctx, DriverQueryState.SUCCESSFUL, true, false); - assertEquals(1, driver.getHiveHandleSize()); + assertHandleSize(handleSize + 1); driver.closeQuery(qctx.getQueryHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); // for backward compatibility qctx = createContext(pctx, inConf); qctx.setQueryHandle(new QueryHandle(pctx.getPrepareHandle().getPrepareHandleId())); result = driver.execute(qctx); assertNotNull(qctx.getDriverOpHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); validateInMemoryResult(result); // test execute prepare async qctx = createContext(pctx, conf); qctx.setQueryHandle(new QueryHandle(pctx.getPrepareHandle().getPrepareHandleId())); driver.executeAsync(qctx); - assertEquals(1, driver.getHiveHandleSize()); + assertHandleSize(handleSize + 1); validateExecuteAsync(qctx, DriverQueryState.SUCCESSFUL, true, false); driver.closeQuery(qctx.getQueryHandle()); driver.closePreparedQuery(pctx.getPrepareHandle()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); } /** @@ -725,11 +756,12 @@ public class TestHiveDriver { */ @Test public void testExplainPartitionedTable() throws Exception { + int handleSize = getHandleSize(); createPartitionedTable("test_part_table"); // acquire SessionState.setCurrentSessionState(ss); DriverQueryPlan plan = driver.explain(createExplainContext("SELECT ID FROM test_part_table", conf)); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); assertTrue(plan instanceof HiveQueryPlan); assertNotNull(plan.getTablesQueried()); assertEquals(plan.getTablesQueried().size(), 1); @@ -749,15 +781,15 @@ public class TestHiveDriver { */ @Test public void testExplainOutput() throws Exception { + int handleSize = getHandleSize(); createTestTable("explain_test_1"); createTestTable("explain_test_2"); - SessionState.setCurrentSessionState(ss); DriverQueryPlan plan = driver.explain(createExplainContext("SELECT explain_test_1.ID, count(1) FROM " + " explain_test_1 join explain_test_2 on explain_test_1.ID = explain_test_2.ID" + " WHERE explain_test_1.ID = 'foo' or explain_test_2.ID = 'bar'" + " GROUP BY explain_test_1.ID", conf)); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); assertTrue(plan instanceof HiveQueryPlan); assertNotNull(plan.getTablesQueried()); assertEquals(plan.getTablesQueried().size(), 2); @@ -775,6 +807,7 @@ public class TestHiveDriver { */ @Test public void testExplainOutputPersistent() throws Exception { + int handleSize = getHandleSize(); createTestTable("explain_test_1"); conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, true); SessionState.setCurrentSessionState(ss); @@ -784,19 +817,36 @@ public class TestHiveDriver { pctx.setLensSessionIdentifier(sessionid); DriverQueryPlan plan2 = driver.explainAndPrepare(pctx); // assertNotNull(plan2.getResultDestination()); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); assertNotNull(plan2.getTablesQueried()); assertEquals(plan2.getTablesQueried().size(), 1); assertTrue(plan2.getTableWeights().containsKey(dataBase + ".explain_test_1")); QueryContext ctx = createContext(pctx, conf); LensResultSet resultSet = driver.execute(ctx); - assertEquals(0, driver.getHiveHandleSize()); + assertHandleSize(handleSize); HivePersistentResultSet persistentResultSet = (HivePersistentResultSet) resultSet; String path = persistentResultSet.getOutputPath(); assertEquals(ctx.getDriverResultPath(), path); driver.closeQuery(plan2.getHandle()); } + @DataProvider + public Object[][] priorityDataProvider() throws IOException, ParseException { + BufferedReader br = new BufferedReader(new InputStreamReader( + TestHiveDriver.class.getResourceAsStream("/priority_tests.data"))); + String line; + int numTests = Integer.parseInt(br.readLine()); + Object[][] data = new Object[numTests][2]; + for (int i = 0; i < numTests; i++) { + String[] kv = br.readLine().split("\\s*:\\s*"); + final Set<FactPartition> partitions = getFactParts(Arrays.asList(kv[0].trim().split("\\s*,\\s*"))); + final Priority expected = Priority.valueOf(kv[1]); + data[i] = new Object[]{partitions, expected}; + } + return data; + } + + /** * Testing Duration Based Priority Logic by mocking everything except partitions. * @@ -804,57 +854,47 @@ public class TestHiveDriver { * @throws LensException * @throws ParseException */ - @Test - public void testPriority() throws IOException, LensException, ParseException { + @Test(dataProvider = "priorityDataProvider") + public void testPriority(final Set<FactPartition> partitions, Priority expected) throws Exception { Configuration conf = new Configuration(); - CostRangePriorityDecider alwaysNormalPriorityDecider = - new CostRangePriorityDecider(new CostToPriorityRangeConf("")); - BufferedReader br = new BufferedReader(new InputStreamReader( - TestHiveDriver.class.getResourceAsStream("/priority_tests.data"))); - String line; - int i = 0; - while ((line = br.readLine()) != null) { - String[] kv = line.split("\\s*:\\s*"); + QueryContext ctx = createContext("test priority query", conf); + ctx.getDriverContext().setDriverRewriterPlan(driver, new DriverQueryPlan() { - final Set<FactPartition> partitions = getFactParts(Arrays.asList(kv[0].trim().split("\\s*,\\s*"))); - final Priority expected = Priority.valueOf(kv[1]); - QueryContext ctx = createContext("test priority query", conf); - ctx.getDriverContext().setDriverRewriterPlan(driver, new DriverQueryPlan() { + @Override + public String getPlan() { + return null; + } - @Override - public String getPlan() { - return null; - } + @Override + public QueryCost getCost() { + return null; + } + }); - @Override - public QueryCost getCost() { - return null; + ctx.getDriverContext().getDriverRewriterPlan(driver).getPartitions().putAll( + new HashMap<String, Set<FactPartition>>() { + { + put("table1", partitions); } }); + // table weights only for first calculation + ctx.getDriverContext().getDriverRewriterPlan(driver).getTableWeights().putAll( + new HashMap<String, Double>() { + { + put("table1", 1.0); + } + }); + ctx.setOlapQuery(true); + Priority priority = driver.decidePriority(ctx); + assertEquals(priority, expected, "cost: " + ctx.getDriverQueryCost(driver) + "priority: " + priority); + assertEquals(ctx.decidePriority(driver, + alwaysNormalPriorityDecider), Priority.NORMAL); + } - ctx.getDriverContext().getDriverRewriterPlan(driver).getPartitions().putAll( - new HashMap<String, Set<FactPartition>>() { - { - put("table1", partitions); - } - }); - if (i < 1) { - // table weights only for first calculation - ctx.getDriverContext().getDriverRewriterPlan(driver).getTableWeights().putAll( - new HashMap<String, Double>() { - { - put("table1", 1.0); - } - }); - } - assertEquals(ctx.calculateCostAndDecidePriority(driver, driver.queryCostCalculator, - driver.queryPriorityDecider), expected); - assertEquals(ctx.calculateCostAndDecidePriority(driver, driver.queryCostCalculator, - alwaysNormalPriorityDecider), Priority.NORMAL); - i++; - } + @Test + public void testPriorityWithoutFactPartitions() throws LensException { // test priority without fact partitions - AbstractQueryContext ctx = createContext("test priority query", conf); + QueryContext ctx = createContext("test priority query", conf); ctx.getDriverContext().setDriverRewriterPlan(driver, new DriverQueryPlan() { @Override @@ -881,15 +921,25 @@ public class TestHiveDriver { } }); ctx.setDriverCost(driver, driver.queryCostCalculator.calculateCost(ctx, driver)); - assertEquals(Priority.VERY_HIGH, driver.queryPriorityDecider.decidePriority(ctx.getDriverQueryCost(driver))); - assertEquals(Priority.NORMAL, alwaysNormalPriorityDecider.decidePriority(ctx.getDriverQueryCost(driver))); + assertEquals(driver.decidePriority(ctx), Priority.VERY_HIGH); + assertEquals(alwaysNormalPriorityDecider.decidePriority(ctx.getDriverQueryCost(driver)), Priority.NORMAL); // test priority without rewriter plan ctx = createContext("test priority query", conf); - ctx.setDriverCost(driver, driver.queryCostCalculator.calculateCost(ctx, driver)); - assertEquals(Priority.VERY_HIGH, driver.queryPriorityDecider.decidePriority(ctx.getDriverQueryCost(driver))); - assertEquals(Priority.NORMAL, alwaysNormalPriorityDecider.decidePriority(ctx.getDriverQueryCost(driver))); + ctx.getDriverContext().setDriverRewriterPlan(driver, new DriverQueryPlan() { + @Override + public String getPlan() { + return null; + } + @Override + public QueryCost getCost() { + return null; + } + }); + ctx.setDriverCost(driver, driver.queryCostCalculator.calculateCost(ctx, driver)); + assertEquals(driver.decidePriority(ctx), Priority.VERY_HIGH); + assertEquals(alwaysNormalPriorityDecider.decidePriority(ctx.getDriverQueryCost(driver)), Priority.NORMAL); } private Set<FactPartition> getFactParts(List<String> partStrings) throws ParseException { @@ -915,4 +965,13 @@ public class TestHiveDriver { } return factParts; } + + private int getHandleSize() { + return driver.getHiveHandleSize(); + } + + private void assertHandleSize(int handleSize) { + assertEquals(getHandleSize(), handleSize, "Unexpected handle size, all handles: " + + driver.getHiveHandles()); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-driver-hive/src/test/resources/priority_tests.data ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/test/resources/priority_tests.data b/lens-driver-hive/src/test/resources/priority_tests.data index 98b82ef..177743e 100644 --- a/lens-driver-hive/src/test/resources/priority_tests.data +++ b/lens-driver-hive/src/test/resources/priority_tests.data @@ -1,3 +1,4 @@ +4 dt 2014-01-02-01: VERY_HIGH dt 2013-12,dt 2014-01-01, dt 2014-01-02-00, dt 2014-01-02-01: HIGH dt 2013-12,dt 2014-01, dt 2014-02, dt 2014-02-01-00: NORMAL http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java index 55f1535..ed1fc43 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java @@ -19,11 +19,12 @@ package org.apache.lens.server.api.driver; +import org.apache.lens.api.Priority; import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.query.QueryContext; import org.apache.commons.lang.StringUtils; - import org.apache.hadoop.conf.Configuration; import lombok.Getter; @@ -49,7 +50,7 @@ public abstract class AbstractLensDriver implements LensDriver { if (StringUtils.isBlank(driverType) || StringUtils.isBlank(driverName)) { throw new LensException("Driver Type and Name can not be null or empty"); } - fullyQualifiedName = new StringBuilder(driverType).append(SEPARATOR).append(driverName).toString(); + fullyQualifiedName = new StringBuilder(driverType).append(SEPARATOR).append(driverName).toString(); } /** @@ -61,7 +62,13 @@ public abstract class AbstractLensDriver implements LensDriver { */ protected String getDriverResourcePath(String resourceName) { return new StringBuilder(LensConfConstants.DRIVERS_BASE_DIR).append(SEPARATOR).append(getFullyQualifiedName()) - .append(SEPARATOR).append(resourceName).toString(); + .append(SEPARATOR).append(resourceName).toString(); + } + + @Override + public Priority decidePriority(QueryContext queryContext) { + // no-op by default + return null; } @Override http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java index c220884..3d38ddd 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/LensDriver.java @@ -20,11 +20,14 @@ package org.apache.lens.server.api.driver; import java.io.Externalizable; +import org.apache.lens.api.Priority; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryPrepareHandle; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.events.LensEventListener; -import org.apache.lens.server.api.query.*; +import org.apache.lens.server.api.query.AbstractQueryContext; +import org.apache.lens.server.api.query.PreparedQueryContext; +import org.apache.lens.server.api.query.QueryContext; import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy; import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint; import org.apache.lens.server.api.query.cost.QueryCost; @@ -208,4 +211,12 @@ public interface LensDriver extends Externalizable { * (Examples: hive/hive1, jdbc/mysql1 ) */ String getFullyQualifiedName(); + + /** + * decide priority based on query's cost. The cost should be already computed by estimate call, but it's + * not guaranteed to be pre-computed. It's up to the driver to do an on-demand computation of cost. + * @see QueryContext#decidePriority(LensDriver, QueryPriorityDecider) that handles this on-demand computation. + * @param queryContext + */ + Priority decidePriority(QueryContext queryContext); } http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java index 2f20113..62ed293 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/AbstractQueryContext.java @@ -19,7 +19,10 @@ package org.apache.lens.server.api.query; import java.io.Serializable; -import java.util.*; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -206,6 +209,10 @@ public abstract class AbstractQueryContext implements Serializable { return getDriverContext().getDriverRewriterPlan(driver); } + public String getQueue() { + return getConf().get(LensConfConstants.MAPRED_JOB_QUEUE_NAME); + } + /** * Runnable to wrap estimate computation for a driver. Failure cause and success status * are stored as field members http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java index b637665..1269e45 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java @@ -34,7 +34,6 @@ import org.apache.lens.server.api.driver.LensDriver; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy; import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint; -import org.apache.lens.server.api.query.cost.QueryCostCalculator; import org.apache.lens.server.api.query.priority.QueryPriorityDecider; import org.apache.hadoop.conf.Configuration; @@ -226,7 +225,6 @@ public class QueryContext extends AbstractQueryContext { this.submissionTime = submissionTime; this.queryHandle = new QueryHandle(UUID.randomUUID()); this.status = new QueryStatus(0.0f, null, Status.NEW, "Query just got created", false, null, null, null); - this.priority = Priority.NORMAL; this.lensConf = qconf; this.conf = conf; this.isPersistent = conf.getBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, @@ -429,15 +427,17 @@ public class QueryContext extends AbstractQueryContext { } public Priority decidePriority(LensDriver driver, QueryPriorityDecider queryPriorityDecider) throws LensException { + // On-demand re-computation of cost, in case it's not alredy set by a previous estimate call. + // In driver test cases, estimate doesn't happen. Hence this code path ensures cost is computed and + // priority is set based on correct cost. + calculateCost(driver); priority = queryPriorityDecider.decidePriority(getDriverQueryCost(driver)); return priority; } - public Priority calculateCostAndDecidePriority(LensDriver driver, QueryCostCalculator queryCostCalculator, - QueryPriorityDecider queryPriorityDecider) throws LensException { + private void calculateCost(LensDriver driver) throws LensException { if (getDriverQueryCost(driver) == null) { - setDriverCost(driver, queryCostCalculator.calculateCost(this, driver)); + setDriverCost(driver, driver.estimate(this)); } - return decidePriority(driver, queryPriorityDecider); } } http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraint.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraint.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraint.java index bae2e64..58ebd9a 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraint.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraint.java @@ -19,29 +19,71 @@ package org.apache.lens.server.api.query.constraint; +import java.util.Map; + +import org.apache.lens.api.Priority; import org.apache.lens.server.api.driver.LensDriver; import org.apache.lens.server.api.query.QueryContext; import org.apache.lens.server.api.query.collect.EstimatedImmutableQueryCollection; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @Slf4j +@RequiredArgsConstructor public class MaxConcurrentDriverQueriesConstraint implements QueryLaunchingConstraint { private final int maxConcurrentQueries; - - public MaxConcurrentDriverQueriesConstraint(final int maxConcurrentQueries) { - this.maxConcurrentQueries = maxConcurrentQueries; - } + private final Map<String, Integer> maxConcurrentQueriesPerQueue; + private final Map<Priority, Integer> maxConcurrentQueriesPerPriority; @Override public boolean allowsLaunchOf( final QueryContext candidateQuery, final EstimatedImmutableQueryCollection launchedQueries) { final LensDriver selectedDriver = candidateQuery.getSelectedDriver(); - final boolean canLaunch = (launchedQueries.getQueriesCount(selectedDriver) < maxConcurrentQueries); - + final boolean canLaunch = (launchedQueries.getQueriesCount(selectedDriver) < maxConcurrentQueries) + && canLaunchWithQueueConstraint(candidateQuery, launchedQueries) + && canLaunchWithPriorityConstraint(candidateQuery, launchedQueries); log.debug("canLaunch:{}", canLaunch); return canLaunch; } + + private boolean canLaunchWithQueueConstraint(QueryContext candidateQuery, EstimatedImmutableQueryCollection + launchedQueries) { + if (maxConcurrentQueriesPerQueue == null) { + return true; + } + String queue = candidateQuery.getQueue(); + Integer limit = maxConcurrentQueriesPerQueue.get(queue); + if (limit == null) { + return true; + } + int launchedOnQueue = 0; + for (QueryContext context : launchedQueries.getQueries(candidateQuery.getSelectedDriver())) { + if (context.getQueue().equals(queue)) { + launchedOnQueue++; + } + } + return launchedOnQueue < limit; + } + + private boolean canLaunchWithPriorityConstraint(QueryContext candidateQuery, EstimatedImmutableQueryCollection + launchedQueries) { + if (maxConcurrentQueriesPerPriority == null) { + return true; + } + Priority priority = candidateQuery.getPriority(); + Integer limit = maxConcurrentQueriesPerPriority.get(priority); + if (limit == null) { + return true; + } + int launchedOnPriority = 0; + for (QueryContext context : launchedQueries.getQueries(candidateQuery.getSelectedDriver())) { + if (context.getPriority().equals(priority)) { + launchedOnPriority++; + } + } + return launchedOnPriority < limit; + } } http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintFactory.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintFactory.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintFactory.java index b6e6c2f..6db7da7 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintFactory.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintFactory.java @@ -19,21 +19,60 @@ package org.apache.lens.server.api.query.constraint; -import static java.lang.Integer.parseInt; +import static org.apache.lens.api.util.CommonUtils.parseMapFromString; +import java.util.Map; + +import org.apache.lens.api.Priority; +import org.apache.lens.api.util.CommonUtils.EntryParser; import org.apache.lens.server.api.common.ConfigBasedObjectCreationFactory; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; public class MaxConcurrentDriverQueriesConstraintFactory - implements ConfigBasedObjectCreationFactory<QueryLaunchingConstraint> { + implements ConfigBasedObjectCreationFactory<MaxConcurrentDriverQueriesConstraint> { public static final String MAX_CONCURRENT_QUERIES_KEY = "driver.max.concurrent.launched.queries"; + private static final String PREFIX = MAX_CONCURRENT_QUERIES_KEY + ".per."; + public static final String MAX_CONCURRENT_QUERIES_PER_QUEUE_KEY = PREFIX + "queue"; + public static final String MAX_CONCURRENT_QUERIES_PER_PRIORITY_KEY = PREFIX + "priority"; + private static final EntryParser<String, Integer> STRING_INT_PARSER = new EntryParser<String, Integer>() { + @Override + public String parseKey(String str) { + return str; + } + + @Override + public Integer parseValue(String str) { + return Integer.valueOf(str); + } + }; + private static final EntryParser<Priority, Integer> PRIORITY_INT_PARSER = new EntryParser<Priority, Integer>() { + @Override + public Priority parseKey(String str) { + return Priority.valueOf(str.toUpperCase()); + } + + @Override + public Integer parseValue(String str) { + return Integer.valueOf(str); + } + }; @Override - public QueryLaunchingConstraint create(final Configuration conf) { + public MaxConcurrentDriverQueriesConstraint create(final Configuration conf) { + String maxConcurrentQueriesValue = conf.get(MAX_CONCURRENT_QUERIES_KEY); + Map<String, Integer> maxConcurrentQueriesPerQueue = parseMapFromString( + conf.get(MAX_CONCURRENT_QUERIES_PER_QUEUE_KEY), STRING_INT_PARSER); + Map<Priority, Integer> maxConcurrentQueriesPerPriority = parseMapFromString( + conf.get(MAX_CONCURRENT_QUERIES_PER_PRIORITY_KEY), PRIORITY_INT_PARSER); + int maxConcurrentQueries = Integer.MAX_VALUE; + if (!StringUtils.isBlank(maxConcurrentQueriesValue)) { + maxConcurrentQueries = Integer.parseInt(maxConcurrentQueriesValue); + } + return new MaxConcurrentDriverQueriesConstraint(maxConcurrentQueries, maxConcurrentQueriesPerQueue, + maxConcurrentQueriesPerPriority); - int maxConcurrentQueries = parseInt(conf.get(MAX_CONCURRENT_QUERIES_KEY)); - return new MaxConcurrentDriverQueriesConstraint(maxConcurrentQueries); } } http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestAbstractQueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestAbstractQueryContext.java b/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestAbstractQueryContext.java index a37a4c8..5af45ed 100644 --- a/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestAbstractQueryContext.java +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestAbstractQueryContext.java @@ -33,7 +33,6 @@ import org.apache.lens.server.api.driver.LensDriver; import org.apache.lens.server.api.driver.MockDriver; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.metrics.LensMetricsRegistry; -import org.apache.lens.server.api.query.cost.MockQueryCostCalculator; import org.apache.lens.server.api.query.priority.MockQueryPriorityDecider; import org.apache.hadoop.conf.Configuration; @@ -101,8 +100,7 @@ public class TestAbstractQueryContext { @Test public void testPrioritySetting() throws LensException { MockQueryContext ctx = new MockQueryContext(); - Priority p = ctx.calculateCostAndDecidePriority(ctx.getSelectedDriver(), new - MockQueryCostCalculator(), new MockQueryPriorityDecider()); + Priority p = ctx.decidePriority(ctx.getSelectedDriver(), new MockQueryPriorityDecider()); assertEquals(p, HIGH); assertEquals(ctx.getPriority(), HIGH); } http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java b/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java index 55a2eea..4031122 100644 --- a/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java @@ -19,10 +19,17 @@ package org.apache.lens.server.api.query.constraint; +import static org.apache.lens.api.Priority.*; +import static org.apache.lens.server.api.LensServerAPITestUtil.getConfiguration; + import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import java.util.HashSet; +import java.util.Set; + +import org.apache.lens.api.Priority; import org.apache.lens.server.api.driver.LensDriver; import org.apache.lens.server.api.query.QueryContext; import org.apache.lens.server.api.query.collect.EstimatedImmutableQueryCollection; @@ -30,18 +37,117 @@ import org.apache.lens.server.api.query.collect.EstimatedImmutableQueryCollectio import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import junit.framework.Assert; +import lombok.Data; + public class MaxConcurrentDriverQueriesConstraintTest { + MaxConcurrentDriverQueriesConstraintFactory factory = new MaxConcurrentDriverQueriesConstraintFactory(); + QueryLaunchingConstraint constraint = factory.create(getConfiguration( + "driver.max.concurrent.launched.queries", 10 + )); + QueryLaunchingConstraint perQueueConstraint = factory.create(getConfiguration( + "driver.max.concurrent.launched.queries", 4, + "driver.max.concurrent.launched.queries.per.queue", "q1=2,q2=3" + )); + + QueryLaunchingConstraint perPriorityConstraint = factory.create(getConfiguration( + "driver.max.concurrent.launched.queries", 4, + "driver.max.concurrent.launched.queries.per.priority", "NORMAL=2,HIGH=3" + )); + + QueryLaunchingConstraint perQueueAndPerPriorityConstraint = factory.create(getConfiguration( + "driver.max.concurrent.launched.queries.per.queue", "q1=2,q2=3", + "driver.max.concurrent.launched.queries.per.priority", "NORMAL=2,HIGH=3" + )); + @DataProvider public Object[][] dpTestAllowsLaunchOfQuery() { - return new Object[][] { {2, true} , {10, false}, {11, false}}; + return new Object[][]{{2, true}, {10, false}, {11, false}}; + } + + @DataProvider + public Object[][] dpTestPerQueueConstraints() { + return new Object[][]{ + {queues("q1", "q2"), "q1", true}, + {queues("q1", "q1"), "q2", true}, + {queues("q1", "q1"), "q3", true}, + {queues("q1", "q1", "q1"), "q2", true}, // hypothetical + {queues("q1", "q1", "q2"), "q1", false}, + {queues("q1", "q2", "q2"), "q1", true}, + {queues("q1", "q2", "q2"), "q2", true}, + {queues("q1", "q2", "q1", "q2"), "q2", false}, + {queues("q1", "q2", "q1", "q2"), "q1", false}, + {queues("q1", "q2", "q1", "q2"), "q3", false}, + }; + } + + @DataProvider + public Object[][] dpTestPerPriorityConstraints() { + return new Object[][]{ + {priorities(NORMAL, HIGH), NORMAL, true}, + {priorities(NORMAL, NORMAL), HIGH, true}, + {priorities(NORMAL, NORMAL), LOW, true}, + {priorities(NORMAL, NORMAL, NORMAL), HIGH, true}, // hypothetical + {priorities(NORMAL, NORMAL, HIGH), NORMAL, false}, + {priorities(NORMAL, HIGH, HIGH), NORMAL, true}, + {priorities(NORMAL, HIGH, HIGH), HIGH, true}, + {priorities(NORMAL, HIGH, NORMAL, HIGH), HIGH, false}, + {priorities(NORMAL, HIGH, NORMAL, HIGH), NORMAL, false}, + {priorities(NORMAL, HIGH, NORMAL, HIGH), LOW, false}, + }; + } + + @DataProvider + public Object[][] dpTestPerQueuePerPriorityConstraints() { + return new Object[][]{ + {queuePriorities("q1", NORMAL, "q2", NORMAL), "q2", NORMAL, false}, // can't launch NORMAL + {queuePriorities("q1", NORMAL, "q1", HIGH), "q1", NORMAL, false}, // can't launch on q1 + {queuePriorities("q1", NORMAL, "q1", HIGH, "q2", HIGH), "q2", NORMAL, true}, // can launch NORMAL on q2 + {queuePriorities("q1", NORMAL, "q1", HIGH, "q2", HIGH, "q2", HIGH), "q2", NORMAL, true}, + {queuePriorities("q1", NORMAL, "q1", HIGH, "q2", HIGH, "q2", NORMAL), "q2", NORMAL, false}, // hypothetical + {queuePriorities("q1", NORMAL, "q1", HIGH, "q2", HIGH, "q2", HIGH, "q2", NORMAL), "q3", NORMAL, false}, + {queuePriorities("q1", NORMAL, "q1", HIGH, "q2", HIGH, "q2", HIGH, "q2", NORMAL), "q3", HIGH, false}, + {queuePriorities("q1", NORMAL, "q1", HIGH, "q2", HIGH, "q2", HIGH, "q2", NORMAL), "q1", LOW, false}, + {queuePriorities("q1", NORMAL, "q1", HIGH, "q2", HIGH, "q2", HIGH, "q2", NORMAL), "q2", LOW, false}, + {queuePriorities("q1", NORMAL, "q1", HIGH, "q2", HIGH, "q2", HIGH, "q2", NORMAL), "q3", LOW, true}, + }; + } + + @Data + public static class QueuePriority { + private final String queue; + private final Priority priority; + } + + private static QueuePriority[] queuePriorities(Object... args) { + Assert.assertEquals(args.length % 2, 0); + QueuePriority[] queuePriorities = new QueuePriority[args.length / 2]; + for (int i = 0; i < args.length; i += 2) { + queuePriorities[i / 2] = new QueuePriority((String) args[i], (Priority) args[i + 1]); + } + return queuePriorities; + } + + private static String[] queues(Object... args) { + String[] queues = new String[args.length]; + for (int i = 0; i < args.length; i++) { + queues[i] = (String) args[i]; + } + return queues; + } + + private static Priority[] priorities(Object... args) { + Priority[] priorities = new Priority[args.length]; + for (int i = 0; i < args.length; i++) { + priorities[i] = (Priority) args[i]; + } + return priorities; } @Test(dataProvider = "dpTestAllowsLaunchOfQuery") public void testAllowsLaunchOfQuery(final int currentDriverLaunchedQueries, final boolean expectedCanLaunch) { - int maxConcurrentQueries = 10; - QueryContext mockCandidateQuery = mock(QueryContext.class); EstimatedImmutableQueryCollection mockLaunchedQueries = mock(EstimatedImmutableQueryCollection.class); LensDriver mockDriver = mock(LensDriver.class); @@ -49,9 +155,76 @@ public class MaxConcurrentDriverQueriesConstraintTest { when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver); when(mockLaunchedQueries.getQueriesCount(mockDriver)).thenReturn(currentDriverLaunchedQueries); - QueryLaunchingConstraint constraint = new MaxConcurrentDriverQueriesConstraint(maxConcurrentQueries); boolean actualCanLaunch = constraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); assertEquals(actualCanLaunch, expectedCanLaunch); } + + @Test(dataProvider = "dpTestPerQueueConstraints") + public void testPerQueueConstraints(final String[] launchedQueues, final String candidateQueue, + final boolean expectedCanLaunch) { + EstimatedImmutableQueryCollection mockLaunchedQueries = mock(EstimatedImmutableQueryCollection.class); + LensDriver mockDriver = mock(LensDriver.class); + Set<QueryContext> launchedQueries = new HashSet<>(); + for (String queue : launchedQueues) { + QueryContext context = mock(QueryContext.class); + when(context.getQueue()).thenReturn(queue); + launchedQueries.add(context); + } + when(mockLaunchedQueries.getQueries(mockDriver)).thenReturn(launchedQueries); + when(mockLaunchedQueries.getQueriesCount(mockDriver)).thenReturn(launchedQueries.size()); + + QueryContext mockCandidateQuery = mock(QueryContext.class); + when(mockCandidateQuery.getQueue()).thenReturn(candidateQueue); + when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver); + boolean actualCanLaunch = perQueueConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); + + assertEquals(actualCanLaunch, expectedCanLaunch); + } + + @Test(dataProvider = "dpTestPerPriorityConstraints") + public void testPerPriorityConstraints(final Priority[] launchedPriorities, final Priority candidatePriority, + final boolean expectedCanLaunch) { + EstimatedImmutableQueryCollection mockLaunchedQueries = mock(EstimatedImmutableQueryCollection.class); + LensDriver mockDriver = mock(LensDriver.class); + Set<QueryContext> launchedQueries = new HashSet<>(); + for (Priority priority : launchedPriorities) { + QueryContext context = mock(QueryContext.class); + when(context.getPriority()).thenReturn(priority); + launchedQueries.add(context); + } + when(mockLaunchedQueries.getQueries(mockDriver)).thenReturn(launchedQueries); + when(mockLaunchedQueries.getQueriesCount(mockDriver)).thenReturn(launchedQueries.size()); + + QueryContext mockCandidateQuery = mock(QueryContext.class); + when(mockCandidateQuery.getPriority()).thenReturn(candidatePriority); + when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver); + boolean actualCanLaunch = perPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); + + assertEquals(actualCanLaunch, expectedCanLaunch); + } + + @Test(dataProvider = "dpTestPerQueuePerPriorityConstraints") + public void testPerQueuePerPriorityConstraints(final QueuePriority[] launchedQueuePriorities, + final String candidateQueue, final Priority candidatePriority, final boolean expectedCanLaunch) { + EstimatedImmutableQueryCollection mockLaunchedQueries = mock(EstimatedImmutableQueryCollection.class); + LensDriver mockDriver = mock(LensDriver.class); + Set<QueryContext> launchedQueries = new HashSet<>(); + for (QueuePriority queuePriority : launchedQueuePriorities) { + QueryContext context = mock(QueryContext.class); + when(context.getQueue()).thenReturn(queuePriority.getQueue()); + when(context.getPriority()).thenReturn(queuePriority.getPriority()); + launchedQueries.add(context); + } + when(mockLaunchedQueries.getQueries(mockDriver)).thenReturn(launchedQueries); + when(mockLaunchedQueries.getQueriesCount(mockDriver)).thenReturn(launchedQueries.size()); + + QueryContext mockCandidateQuery = mock(QueryContext.class); + when(mockCandidateQuery.getQueue()).thenReturn(candidateQueue); + when(mockCandidateQuery.getPriority()).thenReturn(candidatePriority); + when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver); + boolean actualCanLaunch = perQueueAndPerPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); + + assertEquals(actualCanLaunch, expectedCanLaunch); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index ffd2d42..2dff9af 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -1734,6 +1734,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE ctx.setLensSessionIdentifier(sessionHandle.getPublicId().toString()); rewriteAndSelect(ctx); + ctx.getSelectedDriver().decidePriority(ctx); return submitQuery(ctx); } http://git-wip-us.apache.org/repos/asf/lens/blob/4d3d2f82/lens-server/src/test/java/org/apache/lens/server/query/constraint/ThreadSafeEstimatedQueryCollectionTest.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/constraint/ThreadSafeEstimatedQueryCollectionTest.java b/lens-server/src/test/java/org/apache/lens/server/query/constraint/ThreadSafeEstimatedQueryCollectionTest.java index 9138f8e..e1bf350 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/constraint/ThreadSafeEstimatedQueryCollectionTest.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/constraint/ThreadSafeEstimatedQueryCollectionTest.java @@ -21,7 +21,6 @@ package org.apache.lens.server.query.constraint; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; - import static org.testng.Assert.assertEquals; import org.apache.lens.server.api.driver.LensDriver; @@ -53,7 +52,7 @@ public class ThreadSafeEstimatedQueryCollectionTest { LensDriver mockDriver = mock(LensDriver.class); LensDriver mockDriver2 = mock(LensDriver.class); - QueryLaunchingConstraint constraint = new MaxConcurrentDriverQueriesConstraint(maxConcurrentQueries); + QueryLaunchingConstraint constraint = new MaxConcurrentDriverQueriesConstraint(maxConcurrentQueries, null, null); ThreadSafeEstimatedQueryCollection col = new ThreadSafeEstimatedQueryCollection(new DefaultEstimatedQueryCollection(new DefaultQueryCollection()));