Repository: lens Updated Branches: refs/heads/master f88cf9bc3 -> 58d863643
LENS-901 : Add option to stream results from execute timeout api, along with persisting the result Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/58d86364 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/58d86364 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/58d86364 Branch: refs/heads/master Commit: 58d8636433f2166e4428a7a1c933e76aeadf2fde Parents: f88cf9b Author: Puneet Gupta <puneet.k.gu...@gmail.com> Authored: Fri Feb 5 08:31:15 2016 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Fri Feb 5 08:31:15 2016 +0530 ---------------------------------------------------------------------- .../api/query/QueryHandleWithResultSet.java | 7 + .../org/apache/lens/api/query/QueryStatus.java | 5 + .../lens/driver/es/client/ESResultSet.java | 6 - .../org/apache/lens/driver/hive/HiveDriver.java | 21 +- .../lens/driver/hive/HiveInMemoryResultSet.java | 8 +- .../org/apache/lens/driver/jdbc/JDBCDriver.java | 18 +- .../apache/lens/driver/jdbc/JDBCResultSet.java | 13 -- .../apache/lens/driver/jdbc/TestJdbcDriver.java | 84 ++++++++ .../lens/server/api/LensConfConstants.java | 20 ++ .../server/api/driver/AbstractLensDriver.java | 34 +++- .../server/api/driver/InMemoryResultSet.java | 4 +- .../PartiallyFetchedInMemoryResultSet.java | 177 +++++++++++++++++ .../lens/server/api/query/QueryContext.java | 86 +++++++-- .../lens/server/api/driver/MockDriver.java | 5 - .../server/query/QueryExecutionServiceImpl.java | 191 +++++++++++++------ .../src/main/resources/lenssession-default.xml | 21 ++ .../apache/lens/server/query/TestLensDAO.java | 2 +- .../lens/server/query/TestQueryService.java | 174 +++++++++++++---- .../lens/server/query/TestResultFormatting.java | 2 +- src/site/apt/admin/session-config.apt | 28 +-- 20 files changed, 733 insertions(+), 173 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java b/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java index a5da867..bf8967e 100644 --- a/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java +++ b/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java @@ -54,6 +54,13 @@ public class QueryHandleWithResultSet extends QuerySubmitResult { private QueryResult result; /** + * The result metadata + */ + @Getter + @Setter + private QueryResultSetMetadata resultMetadata; + + /** * The status. */ @Getter http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java ---------------------------------------------------------------------- diff --git a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java index 915dac7..40e5d87 100644 --- a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java +++ b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java @@ -212,6 +212,11 @@ public class QueryStatus implements Serializable { return status.equals(Status.SUCCESSFUL) || status.equals(Status.FAILED) || status.equals(Status.CANCELED); } + public boolean successful() { + return status.equals(Status.SUCCESSFUL); + } + + public boolean launched() { return status.equals(Status.LAUNCHED); } http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java ---------------------------------------------------------------------- diff --git a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java index 464b535..b59949b 100644 --- a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java +++ b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java @@ -23,7 +23,6 @@ import java.util.Iterator; import org.apache.lens.api.query.ResultRow; import org.apache.lens.server.api.driver.InMemoryResultSet; import org.apache.lens.server.api.driver.LensResultSetMetadata; -import org.apache.lens.server.api.error.LensException; import lombok.NonNull; @@ -68,9 +67,4 @@ public class ESResultSet extends InMemoryResultSet { public LensResultSetMetadata getMetadata() { return resultSetMetadata; } - - @Override - public boolean seekToStart() throws LensException { - return false; - } } http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java index 149c6ab..f422543 100644 --- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java +++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java @@ -445,7 +445,7 @@ public class HiveDriver extends AbstractLensDriver { explainCtx.getSubmittedUser(), new LensConf(), explainConf, this, explainCtx.getLensSessionIdentifier(), false); // Get result set of explain - HiveInMemoryResultSet inMemoryResultSet = (HiveInMemoryResultSet) execute(explainQueryCtx); + InMemoryResultSet inMemoryResultSet = (InMemoryResultSet) execute(explainQueryCtx); List<String> explainOutput = new ArrayList<>(); while (inMemoryResultSet.hasNext()) { explainOutput.add((String) inMemoryResultSet.next().getValues().get(0)); @@ -529,7 +529,7 @@ public class HiveDriver extends AbstractLensDriver { } result = createResultSet(ctx, true); // close the query immediately if the result is not inmemory result set - if (result == null || !(result instanceof HiveInMemoryResultSet)) { + if (result == null || !(result instanceof InMemoryResultSet)) { closeQuery(ctx.getQueryHandle()); } // remove query handle from hiveHandles even in case of inmemory result set @@ -707,18 +707,6 @@ public class HiveDriver extends AbstractLensDriver { /* * (non-Javadoc) * - * @see org.apache.lens.server.api.driver.LensDriver#fetchResultSet(org.apache.lens.server.api.query.QueryContext) - */ - @Override - public LensResultSet fetchResultSet(QueryContext ctx) throws LensException { - log.info("FetchResultSet: {}", ctx.getQueryHandle()); - // This should be applicable only for a async query - return createResultSet(ctx, false); - } - - /* - * (non-Javadoc) - * * @see org.apache.lens.server.api.driver.LensDriver#closeResultSet(org.apache.lens.api.query.QueryHandle) */ @Override @@ -874,6 +862,11 @@ public class HiveDriver extends AbstractLensDriver { } } + @Override + protected LensResultSet createResultSet(QueryContext context) throws LensException { + return createResultSet(context, false); + } + /** * Creates the result set. * http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java ---------------------------------------------------------------------- diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java index f8abd78..4d52e22 100644 --- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java +++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java @@ -81,7 +81,7 @@ public class HiveInMemoryResultSet extends InMemoryResultSet { this.closeAfterFecth = closeAfterFecth; this.metadata = client.getResultSetMetadata(opHandle); this.numColumns = metadata.getColumnDescriptors().size(); - this.seekToStart(); + this.orientation = FetchOrientation.FETCH_FIRST; } /* @@ -103,12 +103,6 @@ public class HiveInMemoryResultSet extends InMemoryResultSet { return hrsMeta; } - @Override - public boolean seekToStart() { - orientation = FetchOrientation.FETCH_FIRST; - return true; - } - /* * (non-Javadoc) * http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java index 82d7513..eef4464 100644 --- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java +++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java @@ -63,7 +63,7 @@ import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext; import org.apache.commons.lang3.StringUtils; - import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; @@ -235,7 +235,7 @@ public class JDBCDriver extends AbstractLensDriver { private boolean isClosed; /** The lens result set. */ - private JDBCResultSet lensResultSet; + private InMemoryResultSet lensResultSet; /** * Close. @@ -996,16 +996,13 @@ public class JDBCDriver extends AbstractLensDriver { } } - /** - * Fetch the results of the query, specified by the handle. - * - * @param context the context - * @return returns the {@link LensResultSet}. - * @throws LensException the lens exception - */ @Override - public LensResultSet fetchResultSet(QueryContext context) throws LensException { + protected LensResultSet createResultSet(QueryContext ctx) throws LensException { checkConfigured(); + return getDriverResult(ctx); + } + + private LensResultSet getDriverResult(QueryContext context) throws LensException { JdbcQueryContext ctx = getQueryContext(context.getQueryHandle()); if (ctx.isCancelled()) { throw new LensException("Result set not available for cancelled query " + context.getQueryHandle()); @@ -1147,6 +1144,5 @@ public class JDBCDriver extends AbstractLensDriver { @Override public void writeExternal(ObjectOutput arg0) throws IOException { // TODO Auto-generated method stub - } } http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java index 8b4da3f..9e1a0c0 100644 --- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java +++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java @@ -79,7 +79,6 @@ public class JDBCResultSet extends InMemoryResultSet { this.queryResult = queryResult; this.resultSet = resultSet; this.closeAfterFetch = closeAfterFetch; - seekToStart(); } private ResultSetMetaData getRsMetadata() throws LensException { @@ -294,18 +293,6 @@ public class JDBCResultSet extends InMemoryResultSet { } } - @Override - public boolean seekToStart() throws LensException { - try { - if (!resultSet.isClosed() && !resultSet.isBeforeFirst()) { - resultSet.beforeFirst(); - } - return true; - } catch (SQLException e) { - throw new LensException(e); - } - } - /* * (non-Javadoc) * http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java ---------------------------------------------------------------------- diff --git a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java index b96cf88..81a9552 100644 --- a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java +++ b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java @@ -46,6 +46,7 @@ import org.apache.lens.server.api.util.LensUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.session.SessionState; + import org.apache.hive.service.cli.ColumnDescriptor; import org.testng.Assert; @@ -54,6 +55,7 @@ import org.testng.annotations.*; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Lists; import com.mchange.v2.c3p0.ComboPooledDataSource; + import lombok.extern.slf4j.Slf4j; /** @@ -411,6 +413,88 @@ public class TestJdbcDriver { } } + /** + * Data provider for test case {@link #testExecuteWithPreFetch()} + * @return + */ + @DataProvider + public Object[][] executeWithPreFetchDP() { + return new Object[][] { + //int rowsToPreFecth, boolean isComplteleyFetched, int rowsPreFetched, boolean createTable, long executeTimeout + {10, true, 10, true, 20000}, //result has 10 rows and all 10 rows are pre fetched + {5, false, 6, false, 8000}, //result has 10 rows and 5 rows are pre fetched. (Extra row is fetched = 5+1 = 6) + {15, true, 10, false, 8000}, //result has 10 rows and 15 rows are requested to be pre fetched + {10, false, 0, false, 10}, //similar to case 1 but executeTimeout is very less. + }; + } + + /** + * @param rowsToPreFecth : requested number of rows to be pre-fetched + * @param isComplteleyFetched : whether the wrapped in memory result has been completely accessed due to pre fetch + * @param rowsPreFetched : actual rows pre-fetched + * @param createTable : whether to create a table before the test case is run + * @param executeTimeoutMillis :If the query does not finish with in this time pre fetch is ignored. + * @throws Exception + */ + @Test(dataProvider = "executeWithPreFetchDP") + public void testExecuteWithPreFetch(int rowsToPreFecth, boolean isComplteleyFetched, int rowsPreFetched, + boolean createTable, long executeTimeoutMillis) throws Exception { + if (createTable) { + createTable("execute_prefetch_test"); + insertData("execute_prefetch_test"); + } + + // Query + final String query = "SELECT * FROM execute_prefetch_test"; + Configuration conf = new Configuration(baseConf); + conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, true); + conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, false); + conf.setBoolean(LensConfConstants.PREFETCH_INMEMORY_RESULTSET, true); + conf.setInt(LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS, rowsToPreFecth); + QueryContext context = createQueryContext(query, conf); + context.setExecuteTimeoutMillis(executeTimeoutMillis); + driver.executeAsync(context); + LensResultSet resultSet = driver.fetchResultSet(context); + assertNotNull(resultSet); + + //Check Type + if (executeTimeoutMillis > 1000) { //enough time to execute the query + assertTrue(resultSet instanceof PartiallyFetchedInMemoryResultSet); + } else { + assertFalse(resultSet instanceof PartiallyFetchedInMemoryResultSet); + return; // NO need to check further in this case + } + + PartiallyFetchedInMemoryResultSet prs = (PartiallyFetchedInMemoryResultSet) resultSet; + assertEquals(prs.isComplteleyFetched(), isComplteleyFetched); + + //Check Streaming flow + if (isComplteleyFetched) { + assertTrue(prs.isComplteleyFetched()); + prs.getPreFetchedRows(); //This will be called while streaming + assertEquals(prs.size().intValue(), rowsPreFetched); + } else { + assertFalse(prs.isComplteleyFetched()); + assertEquals(prs.getPreFetchedRows().size(), rowsPreFetched); + } + + assertEquals(prs.getMetadata().getColumns().size(), 1); + assertEquals(prs.getMetadata().getColumns().get(0).getName(), "ID"); + + // Check Persistence flow + int rowCount = 0; + while (prs.hasNext()) { + ResultRow row = prs.next(); + assertEquals(row.getValues().get(0), rowCount); + rowCount++; + } + assertEquals(rowCount, 10); + prs.setFullyAccessed(true); + + //Check Purge + assertEquals(prs.canBePurged() , true); + } + @Test public void testJdbcSqlException() throws Exception { final String query = "SELECT invalid_column FROM execute_test"; http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java index 8df389b..1b7d0f9 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java @@ -998,6 +998,26 @@ public final class LensConfConstants { public static final String DEFAULT_HDFS_OUTPUT_RETENTION = "1 day"; /** + * Pre Fetch results in case of in memory result sets. + */ + public static final String PREFETCH_INMEMORY_RESULTSET = QUERY_PFX + "prefetch.inmemory.resultset"; + + /** + * Pre Fetch results in case of in memory result sets is enabled by default + */ + public static final boolean DEFAULT_PREFETCH_INMEMORY_RESULTSET = true; + + /** + * Pre-Fetch size for in memory results. Makes sense only if {@link #PREFETCH_INMEMORY_RESULTSET} set to true + */ + public static final String PREFETCH_INMEMORY_RESULTSET_ROWS = QUERY_PFX + "prefetch.inmemory.resultset.rows"; + + /** + * Default Pre-Fetch size for in memory results. + */ + public static final int DEFAULT_PREFETCH_INMEMORY_RESULTSET_ROWS = 100; + + /** * The Constant EXCLUDE_CUBE_TABLES. */ public static final String EXCLUDE_CUBE_TABLES = SESSION_PFX + "metastore.exclude.cubetables.from.nativetables"; http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java index ed1fc43..d447417 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java @@ -18,21 +18,23 @@ */ package org.apache.lens.server.api.driver; - import org.apache.lens.api.Priority; import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.query.QueryContext; import org.apache.commons.lang.StringUtils; + import org.apache.hadoop.conf.Configuration; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; /** * Abstract class for Lens Driver Implementations. Provides default * implementations and some utility methods for drivers */ +@Slf4j public abstract class AbstractLensDriver implements LensDriver { /** * Separator used for constructing fully qualified name and driver resource path @@ -54,6 +56,36 @@ public abstract class AbstractLensDriver implements LensDriver { } /** + * Default implementation for fetchResultSet for all drivers. Should hold good in most cases. + * Note : If a driver is sticking to this default implementation, it should + * override {@link #createResultSet(QueryContext)} + */ + @Override + public LensResultSet fetchResultSet(QueryContext ctx) throws LensException { + log.info("FetchResultSet: {}", ctx.getQueryHandle()); + synchronized (ctx) { + if (!ctx.isDriverResultRegistered()) { + ctx.registerDriverResult(createResultSet(ctx)); + } + } + return ctx.getDriverResult(); + } + + /** + * This method should create ResultSet for the query represented by the context. + * Specific driver should override this method to return driver specific LensResultSet whenever the + * driver relies on default implementation of {@link #fetchResultSet(QueryContext)} + * + * Note: Default Implementation throw exception. + * + * @param ctx + * @return + */ + protected LensResultSet createResultSet(QueryContext ctx) throws LensException { + throw new LensException(this.getClass().getSimpleName() + " should override method createResultSet(QueryContext)"); + } + + /** * Gets the path (relative to lens server's conf location) for the driver resource in the system. This is a utility * method that can be used by extending driver implementations to build path for their resources. * http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java index 0d64471..535065d 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java @@ -39,8 +39,6 @@ public abstract class InMemoryResultSet extends LensResultSet { @Getter private long creationTime = System.currentTimeMillis();; - public abstract boolean seekToStart() throws LensException; - @Override public boolean canBePurged() { return fullyAccessed; @@ -84,7 +82,7 @@ public abstract class InMemoryResultSet extends LensResultSet { while (hasNext()) { rows.add(next()); } - fullyAccessed = true; + this.setFullyAccessed(true); return new InMemoryQueryResult(rows); } public boolean isHttpResultAvailable() throws LensException { http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java new file mode 100644 index 0000000..59918bd --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lens.server.api.driver; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.lens.api.query.ResultRow; +import org.apache.lens.server.api.error.LensException; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * This is a wrapper over InMemoryResultSet which pre-fetches requested number of rows in memory. All calls are + * delegated to the underlying InMemoryResultSet except for the calls that access pre-fetched rows. + * + * This wrapper was created to support partial streaming of big result sets and complete streaming of SMALL result sets + * along with persistence. The pre-fetched result available via {@link #getPreFetchedRows()} can be used for streaming + * while the persistence logic can iterate over complete result set using {@link #hasNext()} and {@link #next()}. + * + * Note + * Streaming and persistence can occur concurrently irrespective of the underlying InMemoryResultSet + * implementation. + * Streaming of partial results is not supported at server level as of now. + */ +@Slf4j +public class PartiallyFetchedInMemoryResultSet extends InMemoryResultSet { + /** + * Underlying in-memory result set + */ + private InMemoryResultSet inMemoryRS; + + /** + *Number for rows pre-fetched and kept in memory. + */ + private int numOfPreFetchedRows; + + /** + *Cursor for the pre-fetched in memory result. + */ + private int cursor; + + /** + * Indicates whether the underlying in-memory result has been completely pre-fetched and kept in memory. + */ + @Getter + private boolean isComplteleyFetched; + + /** + * The pre-fteched in memory result cache. + */ + private List<ResultRow> preFetchedRows; + + /** + * This is set to true once preFetchedRows have been consumed. + */ + private boolean preFetchedRowsConsumed; + + /** + * Constructor + * @param inMemoryRS : Underlying in-memory result set + * @param reqPreFetchSize : requested number of rows to be pre-fetched and cached. + * @throws LensException + */ + public PartiallyFetchedInMemoryResultSet(InMemoryResultSet inMemoryRS, int reqPreFetchSize) throws LensException { + this.inMemoryRS = inMemoryRS; + if (reqPreFetchSize <= 0) { + throw new IllegalArgumentException("Invalid pre fetch size " + reqPreFetchSize); + } + preFetchRows(reqPreFetchSize); + log.info("Pre-Fetched {} rows of result and isComplteleyFetched = {} and doNotPurgeUntilTimeMillis ={}", + numOfPreFetchedRows, isComplteleyFetched); + } + + private void preFetchRows(int reqPreFetchSize) throws LensException { + //rows fetched = reqPreFetchSize+1. One extra row is read to check if underlying inMemoryRS result is completely + //or partially read. + preFetchedRows = new ArrayList<ResultRow>(reqPreFetchSize + 1); + boolean hasNext = inMemoryRS.hasNext(); + while (hasNext) { + if (numOfPreFetchedRows >= reqPreFetchSize) { + break; + } + preFetchedRows.add(inMemoryRS.next()); + numOfPreFetchedRows++; + hasNext = inMemoryRS.hasNext(); + } + + if (!hasNext) { + isComplteleyFetched = true; // No more rows to be read form inMemory result. + } else { + isComplteleyFetched = false; + //we have accessed ( hasNext() for ) one extra row. Lets cache it too. + preFetchedRows.add(inMemoryRS.next()); + numOfPreFetchedRows++; + } + } + + @Override + public boolean hasNext() throws LensException { + cursor++; + if (cursor <= numOfPreFetchedRows) { + return true; + } else if (isComplteleyFetched) { + return false; + } else { + return inMemoryRS.hasNext(); + } + } + + @Override + public ResultRow next() throws LensException { + if (cursor <= numOfPreFetchedRows) { + return preFetchedRows.get(cursor-1); + } else { + return inMemoryRS.next(); + } + } + + @Override + public void setFetchSize(int size) throws LensException { + inMemoryRS.setFetchSize(size); + } + + @Override + public Integer size() throws LensException { + if (isComplteleyFetched) { + return numOfPreFetchedRows; + } else { + return inMemoryRS.size(); + } + } + + @Override + public LensResultSetMetadata getMetadata() throws LensException { + return inMemoryRS.getMetadata(); + } + + @Override + public boolean canBePurged() { + //If the result is completely pre-fetched, defer the purging until preFetchedRows have been consumed. + //In Case not consumed, it should be cleared based on lens.server.inmemory.resultset.ttl.secs + if (isComplteleyFetched && !preFetchedRowsConsumed) { + return false; + } else { + return inMemoryRS.canBePurged(); + } + } + + @Override + public void setFullyAccessed(boolean fullyAccessed) { + inMemoryRS.setFullyAccessed(fullyAccessed); + } + + public List<ResultRow> getPreFetchedRows() { + preFetchedRowsConsumed = true; + return preFetchedRows; + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java index 1269e45..96846c1 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java @@ -18,6 +18,11 @@ */ package org.apache.lens.server.api.query; +import static org.apache.lens.server.api.LensConfConstants.DEFAULT_PREFETCH_INMEMORY_RESULTSET; +import static org.apache.lens.server.api.LensConfConstants.DEFAULT_PREFETCH_INMEMORY_RESULTSET_ROWS; +import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET; +import static org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS; + import java.util.Collection; import java.util.Map; import java.util.UUID; @@ -30,7 +35,10 @@ import org.apache.lens.api.query.QueryStatus; import org.apache.lens.api.query.QueryStatus.Status; import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.driver.DriverQueryStatus; +import org.apache.lens.server.api.driver.InMemoryResultSet; import org.apache.lens.server.api.driver.LensDriver; +import org.apache.lens.server.api.driver.LensResultSet; +import org.apache.lens.server.api.driver.PartiallyFetchedInMemoryResultSet; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy; import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint; @@ -41,15 +49,14 @@ import org.apache.hadoop.fs.Path; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; + import lombok.Getter; import lombok.Setter; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; /** * The Class QueryContext. */ -@ToString @Slf4j public class QueryContext extends AbstractQueryContext { @@ -166,6 +173,28 @@ public class QueryContext extends AbstractQueryContext { private String queryName; /** + * This is the timeout that the client may have provided while initiating query execution + * This value is used for pre fetching in-memory result if applicable + * + * Note: in case the timeout is not provided, this value will not be set. + */ + @Setter + @Getter + private transient long executeTimeoutMillis; + + /** + * Query result registered by driver + */ + @Getter + private transient LensResultSet driverResult; + + /** + * True if driver has registered the result + */ + @Getter + private transient boolean isDriverResultRegistered; + + /** * Creates context from query * * @param query the query @@ -187,7 +216,7 @@ public class QueryContext extends AbstractQueryContext { */ public QueryContext(PreparedQueryContext prepared, String user, LensConf qconf, Configuration conf) { this(prepared.getUserQuery(), user, qconf, mergeConf(prepared.getConf(), conf), prepared.getDriverContext() - .getDriverQueryContextMap().keySet(), prepared.getDriverContext().getSelectedDriver(), true); + .getDriverQueryContextMap().keySet(), prepared.getDriverContext().getSelectedDriver(), true); setDriverContext(prepared.getDriverContext()); setSelectedDriverQuery(prepared.getSelectedDriverQuery()); setSelectedDriverQueryCost(prepared.getSelectedDriverQueryCost()); @@ -203,8 +232,8 @@ public class QueryContext extends AbstractQueryContext { * @param drivers All the drivers * @param selectedDriver SelectedDriver */ - QueryContext(String userQuery, String user, LensConf qconf, Configuration conf, - Collection<LensDriver> drivers, LensDriver selectedDriver, boolean mergeDriverConf) { + QueryContext(String userQuery, String user, LensConf qconf, Configuration conf, Collection<LensDriver> drivers, + LensDriver selectedDriver, boolean mergeDriverConf) { this(userQuery, user, qconf, conf, drivers, selectedDriver, System.currentTimeMillis(), mergeDriverConf); } @@ -219,8 +248,8 @@ public class QueryContext extends AbstractQueryContext { * @param selectedDriver the selected driver * @param submissionTime the submission time */ - QueryContext(String userQuery, String user, LensConf qconf, Configuration conf, - Collection<LensDriver> drivers, LensDriver selectedDriver, long submissionTime, boolean mergeDriverConf) { + QueryContext(String userQuery, String user, LensConf qconf, Configuration conf, Collection<LensDriver> drivers, + LensDriver selectedDriver, long submissionTime, boolean mergeDriverConf) { super(userQuery, user, qconf, conf, drivers, mergeDriverConf); this.submissionTime = submissionTime; this.queryHandle = new QueryHandle(UUID.randomUUID()); @@ -228,9 +257,9 @@ public class QueryContext extends AbstractQueryContext { this.lensConf = qconf; this.conf = conf; this.isPersistent = conf.getBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, - LensConfConstants.DEFAULT_PERSISTENT_RESULT_SET); + LensConfConstants.DEFAULT_PERSISTENT_RESULT_SET); this.isDriverPersistent = conf.getBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, - LensConfConstants.DEFAULT_DRIVER_PERSISTENT_RESULT_SET); + LensConfConstants.DEFAULT_DRIVER_PERSISTENT_RESULT_SET); this.userQuery = userQuery; if (selectedDriver != null) { this.setSelectedDriver(selectedDriver); @@ -252,7 +281,7 @@ public class QueryContext extends AbstractQueryContext { * @return QueryContext object */ public static QueryContext createContextWithSingleDriver(String query, String user, LensConf qconf, - Configuration conf, LensDriver driver, String lensSessionPublicId, boolean mergeDriverConf) { + Configuration conf, LensDriver driver, String lensSessionPublicId, boolean mergeDriverConf) { QueryContext ctx = new QueryContext(query, user, qconf, conf, Lists.newArrayList(driver), driver, mergeDriverConf); ctx.setLensSessionIdentifier(lensSessionPublicId); return ctx; @@ -336,7 +365,6 @@ public class QueryContext extends AbstractQueryContext { } public synchronized void setStatus(final QueryStatus newStatus) throws LensException { - validateTransition(newStatus); log.info("Updating status of {} from {} to {}", getQueryHandle(), this.status, newStatus); this.status = newStatus; } @@ -406,6 +434,10 @@ public class QueryContext extends AbstractQueryContext { return this.status.finished(); } + public boolean successful() { + return this.status.successful(); + } + public boolean launched() { return this.status.launched(); } @@ -440,4 +472,36 @@ public class QueryContext extends AbstractQueryContext { setDriverCost(driver, driver.estimate(this)); } } + + public synchronized void registerDriverResult(LensResultSet result) throws LensException { + if (isDriverResultRegistered) { + return; //already registered + } + this.isDriverResultRegistered = true; + /* + * Check if results needs to be streamed to client in which case driver result needs to be wrapped in + * PartiallyFetchedInMemoryResultSet + * + * 1. Driver Result should be of type InMemory (for streaming) as only such results can be streamed fast + * 2. Query result should be server persistent. Only in this case, an early streaming is required by client + * that starts even before server level result persistence/formatting finishes. + * 3. When execiteTimeout is 0, it refers to an async query. In this case streaming result does not make sense + * 4. PREFETCH_INMEMORY_RESULTSET = true, implies client intends to get early streamed result + * 5. rowsToPreFetch should be > 0 + */ + if (isPersistent && executeTimeoutMillis > 0 + && result instanceof InMemoryResultSet + && conf.getBoolean(PREFETCH_INMEMORY_RESULTSET, DEFAULT_PREFETCH_INMEMORY_RESULTSET)) { + int rowsToPreFetch = conf.getInt(PREFETCH_INMEMORY_RESULTSET_ROWS, DEFAULT_PREFETCH_INMEMORY_RESULTSET_ROWS); + if (rowsToPreFetch > 0) { + long executeTimeOutTime = submissionTime + executeTimeoutMillis; + if (System.currentTimeMillis() < executeTimeOutTime) { + this.driverResult = new PartiallyFetchedInMemoryResultSet((InMemoryResultSet) result, rowsToPreFetch); + return; + } + } + } + this.driverResult = result; + } + } http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java index a20cf47..7f39da1 100644 --- a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java @@ -337,11 +337,6 @@ public class MockDriver extends AbstractLensDriver { } @Override - public boolean seekToStart() throws LensException { - return false; - } - - @Override public boolean hasNext() throws LensException { // TODO Auto-generated method stub return false; http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index e61398b..49ab241 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -74,7 +74,6 @@ import org.apache.lens.server.util.UtilityMethods; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -521,6 +520,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE */ private final Date finishTime; + private LensResultSet driverRS; + /** * Instantiates a new finished query. * @@ -534,22 +535,29 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } else { this.finishTime = new Date(ctx.getEndTime()); } + if (ctx.isResultAvailableInDriver()) { + try { + driverRS = ctx.getSelectedDriver().fetchResultSet(getCtx()); + } catch (LensException e) { + log.error( + "Error while getting result ser form driver {}. Driver result set based purging logic will be ignored", + ctx.getSelectedDriver(), e); + } + } } public boolean canBePurged() { try { - if (getCtx().getStatus().getStatus().equals(SUCCESSFUL)) { - if (getCtx().getStatus().isResultSetAvailable()) { - LensResultSet rs = getResultset(); - log.info("Resultset for {} is {}", getQueryHandle(), rs.getClass().getSimpleName()); - if (rs instanceof InMemoryResultSet - && System.currentTimeMillis() - > ((InMemoryResultSet) rs).getCreationTime() + inMemoryResultsetTTLMillis) { - log.info("InMemoryResultSet for query {} has exceeded its TTL and is eligible for purging now", - getQueryHandle()); - return true; - } - return rs.canBePurged(); + if (getCtx().getStatus().getStatus().equals(SUCCESSFUL) && getCtx().getStatus().isResultSetAvailable()) { + LensResultSet serverRS = getResultset(); + log.info("Server Resultset for {} is {}", getQueryHandle(), serverRS.getClass().getSimpleName()); + // driverRS and serverRS will not match when server persistence is enabled. Check for purgability of both + // result sets in this case + if (driverRS != null && driverRS != serverRS) { + log.info("Driver Resultset for {} is {}", getQueryHandle(), driverRS.getClass().getSimpleName()); + return serverRS.canBePurged() && (driverRS.canBePurged() || hasResultSetExceededTTL(driverRS)); + } else { + return serverRS.canBePurged() || hasResultSetExceededTTL(serverRS); } } return true; @@ -560,6 +568,23 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } } + /** + * Checks the TTL for ResultSet. TTL is applicable to In Memory ResultSets only. + * + * @param resultSet + * @return + */ + private boolean hasResultSetExceededTTL(LensResultSet resultSet) { + if (resultSet instanceof InMemoryResultSet + && System.currentTimeMillis() > ((InMemoryResultSet) resultSet).getCreationTime() + + inMemoryResultsetTTLMillis) { + log.info("InMemoryResultSet for query {} has exceeded its TTL and is eligible for purging now", + getQueryHandle()); + return true; + } + return false; + } + private LensResultSet getResultset() throws LensException { return QueryExecutionServiceImpl.this.getResultset(getQueryHandle()); } @@ -1514,22 +1539,24 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE if (ctx.isFinishedQueryPersisted()) { return getResultsetFromDAO(queryHandle); } - LensResultSet resultSet = resultSets.get(queryHandle); - if (resultSet == null) { - if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) { - resultSets.put(queryHandle, new LensPersistentResult(ctx, conf)); - } else if (allQueries.get(queryHandle).isResultAvailableInDriver()) { - resultSet = getDriverResultset(queryHandle); - resultSets.put(queryHandle, resultSet); - } else { - throw new NotFoundException("Result set not available for query:" + queryHandle); + if (ctx.successful()) { // Do not return any result set for queries that have not finished successfully. + LensResultSet resultSet = resultSets.get(queryHandle); + if (resultSet == null) { + if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) { + resultSets.put(queryHandle, new LensPersistentResult(ctx, conf)); + } else if (allQueries.get(queryHandle).isResultAvailableInDriver()) { + resultSet = getDriverResultset(queryHandle); + resultSets.put(queryHandle, resultSet); + } } } } - if (resultSets.get(queryHandle) instanceof InMemoryResultSet) { - ((InMemoryResultSet) resultSets.get(queryHandle)).seekToStart(); + + LensResultSet result = resultSets.get(queryHandle); + if (result == null) { + throw new NotFoundException("Result set not available for query:" + queryHandle); } - return resultSets.get(queryHandle); + return result; } } @@ -1645,7 +1672,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE PreparedQueryContext pctx = getPreparedQueryContext(sessionHandle, prepareHandle); Configuration qconf = getLensConf(sessionHandle, conf); accept(pctx.getUserQuery(), qconf, SubmitOp.EXECUTE); - QueryContext ctx = createContext(pctx, getSession(sessionHandle).getLoggedInUser(), conf, qconf); + QueryContext ctx = createContext(pctx, getSession(sessionHandle).getLoggedInUser(), conf, qconf, 0); if (StringUtils.isNotBlank(queryName)) { // Override previously set query name ctx.setQueryName(queryName); @@ -1674,7 +1701,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE acquire(sessionHandle); PreparedQueryContext pctx = getPreparedQueryContext(sessionHandle, prepareHandle); Configuration qconf = getLensConf(sessionHandle, conf); - QueryContext ctx = createContext(pctx, getSession(sessionHandle).getLoggedInUser(), conf, qconf); + QueryContext ctx = createContext(pctx, getSession(sessionHandle).getLoggedInUser(), conf, qconf, timeoutMillis); if (StringUtils.isNotBlank(queryName)) { // Override previously set query name ctx.setQueryName(queryName); @@ -1701,7 +1728,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE acquire(sessionHandle); Configuration qconf = getLensConf(sessionHandle, conf); accept(query, qconf, SubmitOp.EXECUTE); - QueryContext ctx = createContext(query, getSession(sessionHandle).getLoggedInUser(), conf, qconf); + QueryContext ctx = createContext(query, getSession(sessionHandle).getLoggedInUser(), conf, qconf, 0); ctx.setQueryName(queryName); return executeAsyncInternal(sessionHandle, ctx); } finally { @@ -1719,9 +1746,10 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE * @return the query context * @throws LensException the lens exception */ - protected QueryContext createContext(String query, String userName, LensConf conf, Configuration qconf) - throws LensException { + protected QueryContext createContext(String query, String userName, LensConf conf, Configuration qconf, + long timeOutMillis) throws LensException { QueryContext ctx = new QueryContext(query, userName, conf, qconf, drivers.values()); + ctx.setExecuteTimeoutMillis(timeOutMillis); return ctx; } @@ -1735,9 +1763,10 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE * @return the query context * @throws LensException the lens exception */ - protected QueryContext createContext(PreparedQueryContext pctx, String userName, LensConf conf, Configuration qconf) - throws LensException { + protected QueryContext createContext(PreparedQueryContext pctx, String userName, LensConf conf, Configuration qconf, + long timeOutMillis) throws LensException { QueryContext ctx = new QueryContext(pctx, userName, conf, qconf); + ctx.setExecuteTimeoutMillis(timeOutMillis); return ctx; } @@ -1781,7 +1810,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE try { log.info("UpdateQueryConf: session:{} queryHandle: {}", sessionHandle, queryHandle); acquire(sessionHandle); - QueryContext ctx = getQueryContext(sessionHandle, queryHandle); + QueryContext ctx = getUpdatedQueryContext(sessionHandle, queryHandle); if (ctx != null && (ctx.queued())) { ctx.updateConf(newconf.getProperties()); // TODO COnf changed event tobe raised @@ -1815,19 +1844,35 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } /** - * Gets the query context. + * Gets the query context either form memory or from DB (after query is purged) + * Note: For non-purged queries the status is updated before returning the context * * @param sessionHandle the session handle * @param queryHandle the query handle * @return the query context * @throws LensException the lens exception */ - QueryContext getQueryContext(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException { + QueryContext getUpdatedQueryContext(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException { + return getUpdatedQueryContext(sessionHandle, queryHandle, false); + } + + /** + * Gets the query context. If the query has been purged, null context is returned if returnNullIfPurged is true, else + * context is read form DB Note: For non-purged queries the status is updated before returning the context + * + * @param sessionHandle + * @param queryHandle + * @param returnNullIfPurged + * @return + * @throws LensException + */ + QueryContext getUpdatedQueryContext(LensSessionHandle sessionHandle, QueryHandle queryHandle, + boolean returnNullIfPurged) throws LensException { try { acquire(sessionHandle); QueryContext ctx = allQueries.get(queryHandle); if (ctx == null) { - return getQueryContextOfFinishedQuery(queryHandle); + return (returnNullIfPurged ? null : getQueryContextOfFinishedQuery(queryHandle)); } updateStatus(queryHandle); return ctx; @@ -1864,7 +1909,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE */ @Override public LensQuery getQuery(LensSessionHandle sessionHandle, QueryHandle queryHandle) throws LensException { - return getQueryContext(sessionHandle, queryHandle).toLensQuery(); + return getUpdatedQueryContext(sessionHandle, queryHandle).toLensQuery(); } /** @@ -1916,7 +1961,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE acquire(sessionHandle); Configuration qconf = getLensConf(sessionHandle, conf); accept(query, qconf, SubmitOp.EXECUTE); - QueryContext ctx = createContext(query, getSession(sessionHandle).getLoggedInUser(), conf, qconf); + QueryContext ctx = createContext(query, getSession(sessionHandle).getLoggedInUser(), conf, qconf, timeoutMillis); ctx.setQueryName(queryName); ctx.setLensSessionIdentifier(sessionHandle.getPublicId().toString()); rewriteAndSelect(ctx); @@ -1939,8 +1984,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE private QueryHandleWithResultSet executeTimeoutInternal(LensSessionHandle sessionHandle, QueryContext ctx, long timeoutMillis, Configuration conf) throws LensException { QueryHandle handle = submitQuery(ctx); + long timeOutTime = System.currentTimeMillis() + timeoutMillis; QueryHandleWithResultSet result = new QueryHandleWithResultSet(handle); - // getQueryContext calls updateStatus, which fires query events if there's a change in status while (isQueued(sessionHandle, handle)) { try { @@ -1949,15 +1994,17 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE log.error("Encountered Interrupted exception.", e); } } - QueryCompletionListener listener = new QueryCompletionListenerImpl(handle); - if (getQueryContext(sessionHandle, handle).getSelectedDriver() == null) { - result.setStatus(getQueryContext(sessionHandle, handle).getStatus()); + + QueryContext queryCtx = getUpdatedQueryContext(sessionHandle, handle); + if (queryCtx.getSelectedDriver() == null) { + result.setStatus(queryCtx.getStatus()); return result; } - synchronized (ctx) { - if (!ctx.getStatus().finished()) { - getQueryContext(sessionHandle, handle).getSelectedDriver() - .registerForCompletionNotification(handle, timeoutMillis, listener); + + QueryCompletionListenerImpl listener = new QueryCompletionListenerImpl(handle); + synchronized (queryCtx) { + if (!queryCtx.getStatus().finished()) { + queryCtx.getSelectedDriver().registerForCompletionNotification(handle, timeoutMillis, listener); try { synchronized (listener) { listener.wait(timeoutMillis); @@ -1968,21 +2015,53 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } } - if (getQueryContext(sessionHandle, handle).getStatus().finished()) { - if (getQueryContext(sessionHandle, handle).getStatus().isResultSetAvailable()) { - result.setResult(getResultset(handle).toQueryResult()); + // At this stage (since the listener waits only for driver completion and not server that may include result + // formatting and persistence) the query status can be RUNNING or EXECUTED or FAILED or SUCCESSFUL + LensResultSet resultSet = null; + queryCtx = getUpdatedQueryContext(sessionHandle, handle, true); // If the query is already purged queryCtx = null + if (queryCtx != null && listener.querySuccessful && queryCtx.getStatus().isResultSetAvailable()) { + resultSet = queryCtx.getSelectedDriver().fetchResultSet(queryCtx); + if (resultSet instanceof PartiallyFetchedInMemoryResultSet) { + PartiallyFetchedInMemoryResultSet partialnMemoryResult = (PartiallyFetchedInMemoryResultSet) resultSet; + if (partialnMemoryResult.isComplteleyFetched()) { // DO not stream the result if its not completely fetched + result.setResult(new InMemoryQueryResult(partialnMemoryResult.getPreFetchedRows())); + result.setResultMetadata(partialnMemoryResult.getMetadata().toQueryResultSetMetadata()); + result.setStatus(queryCtx.getStatus()); + return result; + } + } + } + + // Until timeOutTime, give this query a chance to reach FINISHED status if not already there. + queryCtx = getUpdatedQueryContext(sessionHandle, handle); + while (!queryCtx.finished() && System.currentTimeMillis() < timeOutTime) { + queryCtx = getUpdatedQueryContext(sessionHandle, handle); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // Ignore } } - result.setStatus(getQueryContext(sessionHandle, handle).getStatus()); + if (queryCtx.finished() && queryCtx.getStatus().isResultSetAvailable()) { + resultSet = getResultset(handle); + result.setResultMetadata(resultSet.getMetadata().toQueryResultSetMetadata()); + result.setResult(resultSet.toQueryResult()); + result.setStatus(queryCtx.getStatus()); + return result; + } + // Result is not available. (Explicitly setting values to null for readability) + result.setResult(null); + result.setResultMetadata(null); + result.setStatus(queryCtx.getStatus()); return result; } private boolean isQueued(final LensSessionHandle sessionHandle, final QueryHandle handle) throws LensException { // getQueryContext calls updateStatus, which fires query events if there's a change in status - QueryContext query = getQueryContext(sessionHandle, handle); + QueryContext query = getUpdatedQueryContext(sessionHandle, handle); synchronized (query) { return query.queued(); } @@ -1996,7 +2075,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE /** * The succeeded. */ - boolean succeeded = false; + boolean querySuccessful = false; /** * The handle. @@ -2021,7 +2100,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE @Override public void onCompletion(QueryHandle handle) { synchronized (this) { - succeeded = true; + querySuccessful = true; log.info("Query {} with time out succeeded", handle); this.notify(); } @@ -2036,7 +2115,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE @Override public void onError(QueryHandle handle, String error) { synchronized (this) { - succeeded = false; + querySuccessful = false; log.info("Query {} with time out failed", handle); this.notify(); } @@ -2120,7 +2199,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE try { log.info("CancelQuery: session:{} query:{}", sessionHandle, queryHandle); acquire(sessionHandle); - QueryContext ctx = getQueryContext(sessionHandle, queryHandle); + QueryContext ctx = getUpdatedQueryContext(sessionHandle, queryHandle); synchronized (ctx) { @@ -2592,7 +2671,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } catch (IOException e) { throw new LensException(e); } - final QueryContext ctx = getQueryContext(sessionHandle, queryHandle); + final QueryContext ctx = getUpdatedQueryContext(sessionHandle, queryHandle); String resultFSReadUrl = conf.get(RESULT_FS_READ_URL); if (resultFSReadUrl != null) { try { http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/main/resources/lenssession-default.xml ---------------------------------------------------------------------- diff --git a/lens-server/src/main/resources/lenssession-default.xml b/lens-server/src/main/resources/lenssession-default.xml index a321c3f..ce296cc 100644 --- a/lens-server/src/main/resources/lenssession-default.xml +++ b/lens-server/src/main/resources/lenssession-default.xml @@ -213,6 +213,27 @@ </description> </property> + <property> + <name>lens.query.prefetch.inmemory.resultset</name> + <value>true</value> + <description>When set to true, specified number of rows of result set will be pre-fetched if the result set is of + type InMemoryResultSet and query execution is not asynchronous i.e. query should be launched with + operation as EXECUTE_WITH_TIMEOUT. + Suggested usage of this property: It can be used by client to stream as well as persist results in server for + queries that finish fast and produce results with fewer rows (should be less than number of rows pre-fetched). + Note that the results are streamed to the client early, without waiting for persistence to finish. + Default value of this property is true. + </description> + </property> + + <property> + <name>lens.query.prefetch.inmemory.resultset.rows</name> + <value>100</value> + <description>Specifies the number of rows to pre-fetch when lens.query.prefetch.inmemory.resultset is set to true. + Default value is 100 rows. + </description> + </property> + <!-- properties for session --> <property> <name>lens.session.aux.jars</name> http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java b/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java index 4597f9d..a5ee5cc 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java @@ -63,7 +63,7 @@ public class TestLensDAO { // Test insert query QueryContext queryContext = service.createContext("SELECT ID FROM testTable", "foo@localhost", new LensConf(), - new Configuration()); + new Configuration(), 0); long submissionTime = queryContext.getSubmissionTime(); queryContext.setQueryName("daoTestQuery1"); queryContext.getDriverContext().setSelectedDriver(new MockDriver()); http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java index d9b7679..737c99a 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java @@ -46,12 +46,14 @@ import org.apache.lens.api.result.LensErrorTO; import org.apache.lens.api.result.QueryCostTO; import org.apache.lens.cube.error.LensCubeErrorCode; import org.apache.lens.driver.hive.HiveDriver; +import org.apache.lens.lib.query.FileSerdeFormatter; import org.apache.lens.server.LensJerseyTest; import org.apache.lens.server.LensServerTestUtil; import org.apache.lens.server.LensServices; import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.driver.InMemoryResultSet; import org.apache.lens.server.api.driver.LensDriver; +import org.apache.lens.server.api.driver.LensResultSetMetadata; import org.apache.lens.server.api.error.LensDriverErrorCode; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.metrics.LensMetricsRegistry; @@ -667,13 +669,13 @@ public class TestQueryService extends LensJerseyTest { assertTrue(lensQuery.getDriverStartTime() > 0); assertTrue(lensQuery.getDriverFinishTime() > 0); assertTrue(lensQuery.getFinishTime() > 0); - QueryContext ctx = queryService.getQueryContext(lensSessionId, lensQuery.getQueryHandle()); + QueryContext ctx = queryService.getUpdatedQueryContext(lensSessionId, lensQuery.getQueryHandle()); assertNotNull(ctx.getPhase1RewrittenQuery()); assertEquals(ctx.getPhase1RewrittenQuery(), ctx.getUserQuery()); //Since there is no rewriter in this test assertEquals(lensQuery.getStatus().getStatus(), QueryStatus.Status.SUCCESSFUL); - validatePersistedResult(handle, target(), lensSessionId, new String[][]{{"ID", "INT"}, {"IDSTR", "STRING"}}, - true, mt); + validatePersistedResult(handle, target(), lensSessionId, new String[][]{{"ID", "INT"}, {"IDSTR", "STRING"}}, true, + false, mt); // test cancel query final QueryHandle handle2 = target.request(mt).post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), @@ -723,23 +725,22 @@ public class TestQueryService extends LensJerseyTest { * @param parent the parent * @param lensSessionId the lens session id * @param isDir the is dir + * @param isCSVFormat the result format is csv. * @throws IOException Signals that an I/O exception has occurred. */ static void validatePersistedResult(QueryHandle handle, WebTarget parent, LensSessionHandle lensSessionId, - String[][] schema, boolean isDir, MediaType mt) throws IOException { + String[][] schema, boolean isDir, boolean isCSVFormat, MediaType mt) throws IOException { final WebTarget target = parent.path("queryapi/queries"); // fetch results - validateResultSetMetadata(handle, "", - schema, - parent, lensSessionId, mt); + validateResultSetMetadata(handle, "", schema, parent, lensSessionId, mt); String presultset = target.path(handle.toString()).path("resultset").queryParam("sessionid", lensSessionId) .request(mt).get(String.class); System.out.println("PERSISTED RESULT:" + presultset); PersistentQueryResult resultset = target.path(handle.toString()).path("resultset") - .queryParam("sessionid", lensSessionId).request(mt).get(PersistentQueryResult.class); - validatePersistentResult(resultset, handle, isDir); + .queryParam("sessionid", lensSessionId).request().get(PersistentQueryResult.class); + validatePersistentResult(resultset, handle, isDir, isCSVFormat); if (isDir) { validNotFoundForHttpResult(parent, lensSessionId, handle); @@ -832,12 +833,13 @@ public class TestQueryService extends LensJerseyTest { * @param resultset the resultset * @param handle the handle * @param isDir the is dir + * @param isCSVFormat the result format is csv. * @throws IOException Signals that an I/O exception has occurred. */ - static void validatePersistentResult(PersistentQueryResult resultset, QueryHandle handle, boolean isDir) - throws IOException { + static void validatePersistentResult(PersistentQueryResult resultset, QueryHandle handle, boolean isDir, + boolean isCSVFormat)throws IOException { List<String> actualRows = readResultSet(resultset, handle, isDir); - validatePersistentResult(actualRows); + validatePersistentResult(actualRows, isCSVFormat); if (!isDir) { assertEquals(resultset.getNumRows().intValue(), actualRows.size()); } @@ -845,24 +847,39 @@ public class TestQueryService extends LensJerseyTest { assertEquals(resultset.getFileSize(), fileSize); } - static void validatePersistentResult(List<String> actualRows) { - String[] expected1 = new String[]{ - "1one", - "\\Ntwo123item1item2", - "3\\Nitem1item2", - "\\N\\N", - "5nothing", - }; - String[] expected2 = new String[]{ - "1one[][]", - "\\Ntwo[1,2,3][\"item1\",\"item2\"]", - "3\\N[][\"item1\",\"item2\"]", - "\\N\\N[][]", - "5[][\"nothing\"]", - }; + static void validatePersistentResult(List<String> actualRows, boolean isCSVFormat) { + String[] expected1 = null; + String[] expected2 = null; + if (isCSVFormat) { + //This case will be hit when the result is persisted by the server (CSV result) + expected1 = new String[]{ + "\"1\",\"one\"", + "\"NULL\",\"two\"", + "\"3\",\"NULL\"", + "\"NULL\",\"NULL\"", + "\"5\",\"\"", + }; + } else { + //This is case of hive driver persistence + expected1 = new String[] { + "1one", + "\\Ntwo123item1item2", + "3\\Nitem1item2", + "\\N\\N", + "5nothing", + }; + expected2 = new String[] { + "1one[][]", + "\\Ntwo[1,2,3][\"item1\",\"item2\"]", + "3\\N[][\"item1\",\"item2\"]", + "\\N\\N[][]", + "5[][\"nothing\"]", + }; + } + for (int i = 0; i < actualRows.size(); i++) { - assertEquals( - expected1[i].indexOf(actualRows.get(i)) == 0 || expected2[i].indexOf(actualRows.get(i)) == 0, true); + assertEquals(expected1[i].indexOf(actualRows.get(i)) == 0 + || (expected2 != null && expected2[i].indexOf(actualRows.get(i)) == 0), true); } } @@ -892,7 +909,7 @@ public class TestQueryService extends LensJerseyTest { String result = new String(bos.toByteArray()); List<String> actualRows = Arrays.asList(result.split("\n")); - validatePersistentResult(actualRows); + validatePersistentResult(actualRows, false); } else { assertEquals(SEE_OTHER.getStatusCode(), response.getStatus()); assertTrue(response.getHeaderString("Location").contains(redirectUrl)); @@ -999,7 +1016,7 @@ public class TestQueryService extends LensJerseyTest { waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, defaultMT); // Check TTL - QueryContext ctx = queryService.getQueryContext(lensSessionId, handle); + QueryContext ctx = queryService.getUpdatedQueryContext(lensSessionId, handle); long softExpiryTime = ctx.getDriverStatus().getDriverFinishTime() + queryService.getInMemoryResultsetTTLMillis() - 1000; //Keeping buffer of 1 secs int checkCount = 0; @@ -1199,7 +1216,7 @@ public class TestQueryService extends LensJerseyTest { new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData(); assertNotNull(result.getQueryHandle()); assertNotNull(result.getResult()); - validatePersistentResult((PersistentQueryResult) result.getResult(), result.getQueryHandle(), true); + validatePersistentResult((PersistentQueryResult) result.getResult(), result.getQueryHandle(), true, false); final FormDataMultiPart mp2 = new FormDataMultiPart(); LensConf conf = new LensConf(); @@ -1234,6 +1251,99 @@ public class TestQueryService extends LensJerseyTest { } } /** + * Data provider for test case {@link #testExecuteWithTimeoutAndPreFetechAndServerPersistence()} + * @return + */ + @DataProvider + public Object[][] executeWithTimeoutAndPreFetechAndServerPersistenceDP() { + //Columns: timeOutMillis, preFetchRows, isStreamingResultAvailable, deferPersistenceByMillis + return new Object[][] { + {30000, 5, true, 0}, //result has 5 rows & all 5 rows are requested to be pre-fetched + {30000, 10, true, 6000}, //result has 5 rows & 10 rows are requested to be pre-fetched. + {30000, 2, false, 4000}, //result has 5 rows & 2 rows are requested to be pre-fetched. Will not stream + {10, 5, false, 0}, //result has 5 rows & 5 rows requested. Timeout is less (10ms). Will not stream + }; + } + + /** + * @param timeOutMillis : wait time for execute with timeout api + * @param preFetchRows : number of rows to pre-fetch in case of InMemoryResultSet + * @param isStreamingResultAvailable : whether the execute call is expected to return InMemoryQueryResult + * @param ttlMillis : The time window for which pre-fetched InMemoryResultSet will be available for sure. + * @param deferPersistenceByMillis : The time in millis by which Result formatter will be deferred by. + * @throws IOException + * @throws InterruptedException + */ + @Test(dataProvider = "executeWithTimeoutAndPreFetechAndServerPersistenceDP") + public void testExecuteWithTimeoutAndPreFetechAndServerPersistence(long timeOutMillis, int preFetchRows, + boolean isStreamingResultAvailable, long deferPersistenceByMillis) throws Exception { + final WebTarget target = target().path("queryapi/queries"); + + final FormDataMultiPart mp = new FormDataMultiPart(); + mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), lensSessionId, + MediaType.APPLICATION_XML_TYPE)); + mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, IDSTR from " + + TEST_TABLE)); + mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("operation").build(), "execute_with_timeout")); + // Set a timeout value enough for tests + mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), timeOutMillis + "")); + LensConf conf = new LensConf(); + conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "true"); + conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, "false"); + conf.addProperty(LensConfConstants.PREFETCH_INMEMORY_RESULTSET, "true"); + conf.addProperty(LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS, preFetchRows); + conf.addProperty(LensConfConstants.QUERY_OUTPUT_FORMATTER, DeferredFileSerdeFormatter.class.getName()); + conf.addProperty("deferPersistenceByMillis", deferPersistenceByMillis); // property used for test only + mp.bodyPart(new FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(), conf, + MediaType.APPLICATION_XML_TYPE)); + QueryHandleWithResultSet result =target.request(MediaType.APPLICATION_XML_TYPE) + .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE), + new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData(); + QueryHandle handle = result.getQueryHandle(); + assertNotNull(handle); + assertNotEquals(result.getStatus().getStatus(), QueryStatus.Status.FAILED); + + if (isStreamingResultAvailable) { + // TEST streamed result + assertTrue(result.getStatus().getStatus() == QueryStatus.Status.EXECUTED + || result.getStatus().getStatus() == QueryStatus.Status.SUCCESSFUL, + "Check if timeoutmillis need to be increased based on query status " + result.getStatus()); + assertEquals(result.getResultMetadata().getColumns().size(), 2); + assertNotNull(result.getResult()); + validateInmemoryResult((InMemoryQueryResult) result.getResult()); + } else if (timeOutMillis > 20000) { // timeout is sufficient for query to finish + assertTrue(result.getResult() instanceof PersistentQueryResult); + } else { + assertNull(result.getResult()); // Query execution not finished yet + } + + waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, MediaType.APPLICATION_XML_TYPE); + + // Test Persistent Result + validatePersistedResult(handle, target(), lensSessionId, new String[][] { { "ID", "INT" }, { "IDSTR", "STRING" } }, + false, true, MediaType.APPLICATION_XML_TYPE); + } + + private static class DeferredFileSerdeFormatter extends FileSerdeFormatter { + /** + * Defer init so that this output formatter takes significant time. + */ + @Override + public void init(QueryContext ctx, LensResultSetMetadata metadata) throws IOException { + super.init(ctx, metadata); + long deferPersistenceByMillis = ctx.getConf().getLong("deferPersistenceByMillis", 5000); + if (deferPersistenceByMillis > 0) { + try { + log.info("Deferring result formatting by {} millis", deferPersistenceByMillis); + Thread.sleep(deferPersistenceByMillis); + } catch (InterruptedException e) { + // Ignore + } + } + } + } + + /** * Test execute with timeout query. * * @throws IOException Signals that an I/O exception has occurred. http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java ---------------------------------------------------------------------- diff --git a/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java b/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java index 6db990e..f66f89d 100644 --- a/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java +++ b/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java @@ -243,7 +243,7 @@ public class TestResultFormatting extends LensJerseyTest { // fetch results TestQueryService.validatePersistedResult(handle, target(), lensSessionId, new String[][]{ {"ID", "INT"}, {"IDSTR", "STRING"}, {"IDARR", "ARRAY"}, {"IDSTRARR", "ARRAY"}, - }, isDir, mt); + }, isDir, false, mt); if (!isDir) { TestQueryService.validateHttpEndPoint(target(), lensSessionId, handle, reDirectUrl); } http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/src/site/apt/admin/session-config.apt ---------------------------------------------------------------------- diff --git a/src/site/apt/admin/session-config.apt b/src/site/apt/admin/session-config.apt index e108a13..206d6a4 100644 --- a/src/site/apt/admin/session-config.apt +++ b/src/site/apt/admin/session-config.apt @@ -76,28 +76,32 @@ Lens session configuration *--+--+---+--+ |26|lens.query.output.write.header|false|Whether to write header as part of query result formatting. When enabled the user given header will be added in case of driver persisted results, and column names chosen will be added as header for in-memory results.| *--+--+---+--+ -|27|lens.query.result.email.cc| |When query ends, the result/failure reason will be sent to the user via email. The mail would be cc'ed to the addresses provided in this field.| +|27|lens.query.prefetch.inmemory.resultset|true|When set to true, specified number of rows of result set will be pre-fetched if the result set is of type InMemoryResultSet and query execution is not asynchronous i.e. query should be launched with operation as EXECUTE_WITH_TIMEOUT. Suggested usage of this property: It can be used by client to stream as well as persist results in server for queries that finish fast and produce results with fewer rows (should be less than number of rows pre-fetched). Note that the results are streamed to the client early, without waiting for persistence to finish. Default value of this property is true.| *--+--+---+--+ -|28|lens.query.result.fs.read.url| |Http read URL for FileSystem on which result is present, if available. For example webhdfs as http read url should http://host:port/webhdfs/v1. Currently we support only webhdfs url as the http url for HDFS file system| +|28|lens.query.prefetch.inmemory.resultset.rows|100|Specifies the number of rows to pre-fetch when lens.query.prefetch.inmemory.resultset is set to true. Default value is 100 rows.| *--+--+---+--+ -|29|lens.query.result.output.dir.format| |The format of the output if result is persisted in hdfs. The format should be expressed in HQL.| +|29|lens.query.result.email.cc| |When query ends, the result/failure reason will be sent to the user via email. The mail would be cc'ed to the addresses provided in this field.| *--+--+---+--+ -|30|lens.query.result.output.serde|org.apache.lens.lib.query.CSVSerde|The default serde class name that should be used by org.apache.lens.lib.query.FileSerdeFormatter for formatting the output| +|30|lens.query.result.fs.read.url| |Http read URL for FileSystem on which result is present, if available. For example webhdfs as http read url should http://host:port/webhdfs/v1. Currently we support only webhdfs url as the http url for HDFS file system| *--+--+---+--+ -|31|lens.query.result.parent.dir|file:///tmp/lensreports|The directory for storing persisted result of query. This directory should exist and should have writable permissions by lens server| +|31|lens.query.result.output.dir.format| |The format of the output if result is persisted in hdfs. The format should be expressed in HQL.| *--+--+---+--+ -|32|lens.query.result.size.format.threshold|10737418240|The maximum allowed size of the query result. If exceeds, no server side formatting would be done.| +|32|lens.query.result.output.serde|org.apache.lens.lib.query.CSVSerde|The default serde class name that should be used by org.apache.lens.lib.query.FileSerdeFormatter for formatting the output| *--+--+---+--+ -|33|lens.query.result.split.multiple|false|Whether to split the result into multiple files. If enabled, each file will be restricted to max rows configured. All the files will be available as zip.| +|33|lens.query.result.parent.dir|file:///tmp/lensreports|The directory for storing persisted result of query. This directory should exist and should have writable permissions by lens server| *--+--+---+--+ -|34|lens.query.result.split.multiple.maxrows|100000|The maximum number of rows allowed in each file, when splitting the result into multiple files is enabled.| +|34|lens.query.result.size.format.threshold|10737418240|The maximum allowed size of the query result. If exceeds, no server side formatting would be done.| *--+--+---+--+ -|35|lens.session.aux.jars| |List of comma separated jar paths, which will added to the session| +|35|lens.query.result.split.multiple|false|Whether to split the result into multiple files. If enabled, each file will be restricted to max rows configured. All the files will be available as zip.| *--+--+---+--+ -|36|lens.session.cluster.user| |Session level config which will determine which cluster user will access hdfs| +|36|lens.query.result.split.multiple.maxrows|100000|The maximum number of rows allowed in each file, when splitting the result into multiple files is enabled.| *--+--+---+--+ -|37|lens.session.loggedin.user| |The username used to log in to lens. e.g. LDAP user| +|37|lens.session.aux.jars| |List of comma separated jar paths, which will added to the session| *--+--+---+--+ -|38|lens.session.metastore.exclude.cubetables.from.nativetables|true|Exclude cube related tables when fetching native tables| +|38|lens.session.cluster.user| |Session level config which will determine which cluster user will access hdfs| +*--+--+---+--+ +|39|lens.session.loggedin.user| |The username used to log in to lens. e.g. LDAP user| +*--+--+---+--+ +|40|lens.session.metastore.exclude.cubetables.from.nativetables|true|Exclude cube related tables when fetching native tables| *--+--+---+--+ The configuration parameters and their default values