Repository: lens Updated Branches: refs/heads/master 92456effb -> 1cff38768
LENS-1164: Query getting stuck in queue for inmemory queries Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/1cff3876 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/1cff3876 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/1cff3876 Branch: refs/heads/master Commit: 1cff38768f7999610b4d8ee3b636f9b4a337b076 Parents: 92456ef Author: Sushil Mohanty <sushil.k.moha...@gmail.com> Authored: Tue Jun 14 17:50:39 2016 +0530 Committer: Rajat Khandelwal <rajatgupt...@gmail.com> Committed: Tue Jun 14 17:50:39 2016 +0530 ---------------------------------------------------------------------- .../lens/server/api/query/QueryContext.java | 4 ++ .../server/query/QueryExecutionServiceImpl.java | 74 ++++++++++++-------- .../lens/server/query/TestQueryConstraints.java | 28 ++++++++ 3 files changed, 75 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/1cff3876/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java index aebb395..8ba0689 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java @@ -160,6 +160,10 @@ public class QueryContext extends AbstractQueryContext { @Setter private boolean finishedQueryPersisted = false; + @Getter + @Setter + private boolean isQueryClosedOnDriver = false; + /** * The query name. */ http://git-wip-us.apache.org/repos/asf/lens/blob/1cff3876/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index 39984d4..1f6ec13 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -26,6 +26,7 @@ import static org.apache.lens.server.session.LensSessionImpl.ResourceEntry; import java.io.*; import java.net.URI; import java.net.URISyntaxException; +import java.sql.SQLException; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -1040,43 +1041,26 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE finished = iter.next(); if (finished.canBePurged()) { try { - FinishedLensQuery finishedQuery = new FinishedLensQuery(finished.getCtx()); - if (finished.ctx.getStatus().getStatus() == SUCCESSFUL) { - if (finished.ctx.getStatus().isResultSetAvailable()) { - try { - LensResultSet set = finished.getResultset(); - if (set != null && PersistentResultSet.class.isAssignableFrom(set.getClass())) { - LensResultSetMetadata metadata = set.getMetadata(); - String outputPath = set.getOutputPath(); - Long fileSize = ((PersistentResultSet) set).getFileSize(); - Integer rows = set.size(); - finishedQuery.setResult(outputPath); - finishedQuery.setMetadata(metadata.toJson()); - finishedQuery.setRows(rows); - finishedQuery.setFileSize(fileSize); - } - } catch (Exception e) { - log.error("Couldn't obtain result set info for the query: {}. Going ahead with purge", - finished.getQueryHandle(), e); - } - } - } - lensServerDao.insertFinishedQuery(finishedQuery); - log.info("Saved query {} to DB", finishedQuery.getHandle()); + persistQuery(finished); iter.remove(); } catch (Exception e) { log.warn("Exception while purging query {}", finished.getQueryHandle(), e); continue; + } finally { + if (!finished.getCtx().isQueryClosedOnDriver()) { + try { + if (finished.getCtx().getSelectedDriver() != null) { + finished.getCtx().getSelectedDriver().closeQuery(finished.getQueryHandle()); + } + finished.getCtx().setQueryClosedOnDriver(true); + } catch (Exception e) { + log.warn("Exception while closing query with selected driver.", e); + } + processWaitingQueriesAsync(finished.ctx); + } } synchronized (finished.ctx) { finished.ctx.setFinishedQueryPersisted(true); - try { - if (finished.getCtx().getSelectedDriver() != null) { - finished.getCtx().getSelectedDriver().closeQuery(finished.getQueryHandle()); - } - } catch (Exception e) { - log.warn("Exception while closing query with selected driver.", e); - } log.info("Purging: {}", finished.getQueryHandle()); allQueries.remove(finished.getQueryHandle()); resultSets.remove(finished.getQueryHandle()); @@ -1097,6 +1081,32 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } log.info("QueryPurger exited"); } + + private void persistQuery(FinishedQuery finished) throws SQLException { + FinishedLensQuery finishedQuery = new FinishedLensQuery(finished.getCtx()); + if (finished.ctx.getStatus().getStatus() == SUCCESSFUL) { + if (finished.ctx.getStatus().isResultSetAvailable()) { + try { + LensResultSet set = finished.getResultset(); + if (set != null && PersistentResultSet.class.isAssignableFrom(set.getClass())) { + LensResultSetMetadata metadata = set.getMetadata(); + String outputPath = set.getOutputPath(); + Long fileSize = ((PersistentResultSet) set).getFileSize(); + Integer rows = set.size(); + finishedQuery.setResult(outputPath); + finishedQuery.setMetadata(metadata.toJson()); + finishedQuery.setRows(rows); + finishedQuery.setFileSize(fileSize); + } + } catch (Exception e) { + log.error("Couldn't obtain result set info for the query: {}. Going ahead with perstsiting the query", + finished.getQueryHandle(), e); + } + } + } + lensServerDao.insertFinishedQuery(finishedQuery); + log.info("Saved query {} to DB", finishedQuery.getHandle()); + } } /** @@ -1684,6 +1694,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE */ LensResultSet getResultset(QueryHandle queryHandle) throws LensException { QueryContext ctx = allQueries.get(queryHandle); + if (ctx == null) { return getResultsetFromDAO(queryHandle); } else { @@ -1696,7 +1707,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE if (resultSet == null) { if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) { resultSets.put(queryHandle, new LensPersistentResult(ctx, conf)); - } else if (allQueries.get(queryHandle).isResultAvailableInDriver()) { + } else if (ctx.isResultAvailableInDriver() && !ctx.isQueryClosedOnDriver()) { + //InMemory result can not be returned for a closed query resultSet = getDriverResultset(queryHandle); resultSets.put(queryHandle, resultSet); } http://git-wip-us.apache.org/repos/asf/lens/blob/1cff3876/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java index 6ed3770..0df436a 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryConstraints.java @@ -27,6 +27,7 @@ import java.util.*; import javax.ws.rs.core.Application; import javax.ws.rs.core.MediaType; +import org.apache.lens.api.LensConf; import org.apache.lens.api.LensSessionHandle; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.driver.hive.HiveDriver; @@ -37,6 +38,7 @@ import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.LensServerAPITestUtil; import org.apache.lens.server.api.driver.DriverSelector; import org.apache.lens.server.api.driver.LensDriver; +import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.metrics.MetricsService; import org.apache.lens.server.api.query.AbstractQueryContext; import org.apache.lens.server.api.query.QueryExecutionService; @@ -195,6 +197,22 @@ public class TestQueryConstraints extends LensJerseyTest { } } + @Test(dataProvider = "mediaTypeData") + public void testInmemoryQueryThrottling(MediaType mt) throws InterruptedException, LensException { + List<QueryHandle> handles = Lists.newArrayList(); + for (int j = 0; j < 5; j++) { + for (int i = 0; i < 10; i++) { + handles.add(launchInmemoryQuery(mt)); + assertValidity(); + } + } + for (QueryHandle handle : handles) { + RestAPITestUtil.waitForQueryToFinish(target(), lensSessionId, handle, mt); + queryService.closeResultSet(lensSessionId, handle); + assertValidity(); + } + } + private void assertValidity() { QueryExecutionServiceImpl.QueryCount count = queryService.getQueryCountSnapshot(); assertTrue(count.running <= 4, System.currentTimeMillis() + " " + count.running + " running queries: " @@ -207,6 +225,16 @@ public class TestQueryConstraints extends LensJerseyTest { Optional.of(LensServerAPITestUtil.getLensConf(QUERY_METRIC_UNIQUE_ID_CONF_KEY, UUID.randomUUID())), mt); } + private QueryHandle launchInmemoryQuery(MediaType mt) { + LensConf conf = LensServerAPITestUtil.getLensConf(QUERY_METRIC_UNIQUE_ID_CONF_KEY, UUID.randomUUID()); + conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "false"); + conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); + conf.addProperty(LensConfConstants.INMEMORY_RESULT_SET_TTL_SECS, 600); + return RestAPITestUtil.executeAndGetHandle(target(), Optional.of(lensSessionId), + Optional.of("select ID from " + TEST_TABLE), + Optional.of(conf), mt); + } + @AfterMethod private void waitForPurge() throws InterruptedException { waitForPurge(0, queryService.finishedQueries);