Repository: lens
Updated Branches:
  refs/heads/master f88cf9bc3 -> 58d863643


LENS-901 : Add option to stream results from execute timeout api, along with 
persisting the result


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/58d86364
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/58d86364
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/58d86364

Branch: refs/heads/master
Commit: 58d8636433f2166e4428a7a1c933e76aeadf2fde
Parents: f88cf9b
Author: Puneet Gupta <puneet.k.gu...@gmail.com>
Authored: Fri Feb 5 08:31:15 2016 +0530
Committer: Amareshwari Sriramadasu <amareshw...@apache.org>
Committed: Fri Feb 5 08:31:15 2016 +0530

----------------------------------------------------------------------
 .../api/query/QueryHandleWithResultSet.java     |   7 +
 .../org/apache/lens/api/query/QueryStatus.java  |   5 +
 .../lens/driver/es/client/ESResultSet.java      |   6 -
 .../org/apache/lens/driver/hive/HiveDriver.java |  21 +-
 .../lens/driver/hive/HiveInMemoryResultSet.java |   8 +-
 .../org/apache/lens/driver/jdbc/JDBCDriver.java |  18 +-
 .../apache/lens/driver/jdbc/JDBCResultSet.java  |  13 --
 .../apache/lens/driver/jdbc/TestJdbcDriver.java |  84 ++++++++
 .../lens/server/api/LensConfConstants.java      |  20 ++
 .../server/api/driver/AbstractLensDriver.java   |  34 +++-
 .../server/api/driver/InMemoryResultSet.java    |   4 +-
 .../PartiallyFetchedInMemoryResultSet.java      | 177 +++++++++++++++++
 .../lens/server/api/query/QueryContext.java     |  86 +++++++--
 .../lens/server/api/driver/MockDriver.java      |   5 -
 .../server/query/QueryExecutionServiceImpl.java | 191 +++++++++++++------
 .../src/main/resources/lenssession-default.xml  |  21 ++
 .../apache/lens/server/query/TestLensDAO.java   |   2 +-
 .../lens/server/query/TestQueryService.java     | 174 +++++++++++++----
 .../lens/server/query/TestResultFormatting.java |   2 +-
 src/site/apt/admin/session-config.apt           |  28 +--
 20 files changed, 733 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java
 
b/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java
index a5da867..bf8967e 100644
--- 
a/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java
+++ 
b/lens-api/src/main/java/org/apache/lens/api/query/QueryHandleWithResultSet.java
@@ -54,6 +54,13 @@ public class QueryHandleWithResultSet extends 
QuerySubmitResult {
   private QueryResult result;
 
   /**
+   * The result metadata
+   */
+  @Getter
+  @Setter
+  private QueryResultSetMetadata resultMetadata;
+
+  /**
    * The status.
    */
   @Getter

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java
----------------------------------------------------------------------
diff --git a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java 
b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java
index 915dac7..40e5d87 100644
--- a/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java
+++ b/lens-api/src/main/java/org/apache/lens/api/query/QueryStatus.java
@@ -212,6 +212,11 @@ public class QueryStatus implements Serializable {
     return status.equals(Status.SUCCESSFUL) || status.equals(Status.FAILED) || 
status.equals(Status.CANCELED);
   }
 
+  public boolean successful() {
+    return status.equals(Status.SUCCESSFUL);
+  }
+
+
   public boolean launched() {
     return status.equals(Status.LAUNCHED);
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
 
b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
index 464b535..b59949b 100644
--- 
a/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
+++ 
b/lens-driver-es/src/main/java/org/apache/lens/driver/es/client/ESResultSet.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import org.apache.lens.api.query.ResultRow;
 import org.apache.lens.server.api.driver.InMemoryResultSet;
 import org.apache.lens.server.api.driver.LensResultSetMetadata;
-import org.apache.lens.server.api.error.LensException;
 
 import lombok.NonNull;
 
@@ -68,9 +67,4 @@ public class ESResultSet extends InMemoryResultSet {
   public LensResultSetMetadata getMetadata() {
     return resultSetMetadata;
   }
-
-  @Override
-  public boolean seekToStart() throws LensException {
-    return false;
-  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java 
b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
index 149c6ab..f422543 100644
--- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
+++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
@@ -445,7 +445,7 @@ public class HiveDriver extends AbstractLensDriver {
       explainCtx.getSubmittedUser(), new LensConf(), explainConf, this, 
explainCtx.getLensSessionIdentifier(), false);
 
     // Get result set of explain
-    HiveInMemoryResultSet inMemoryResultSet = (HiveInMemoryResultSet) 
execute(explainQueryCtx);
+    InMemoryResultSet inMemoryResultSet = (InMemoryResultSet) 
execute(explainQueryCtx);
     List<String> explainOutput = new ArrayList<>();
     while (inMemoryResultSet.hasNext()) {
       explainOutput.add((String) inMemoryResultSet.next().getValues().get(0));
@@ -529,7 +529,7 @@ public class HiveDriver extends AbstractLensDriver {
       }
       result = createResultSet(ctx, true);
       // close the query immediately if the result is not inmemory result set
-      if (result == null || !(result instanceof HiveInMemoryResultSet)) {
+      if (result == null || !(result instanceof InMemoryResultSet)) {
         closeQuery(ctx.getQueryHandle());
       }
       // remove query handle from hiveHandles even in case of inmemory result 
set
@@ -707,18 +707,6 @@ public class HiveDriver extends AbstractLensDriver {
   /*
    * (non-Javadoc)
    *
-   * @see 
org.apache.lens.server.api.driver.LensDriver#fetchResultSet(org.apache.lens.server.api.query.QueryContext)
-   */
-  @Override
-  public LensResultSet fetchResultSet(QueryContext ctx) throws LensException {
-    log.info("FetchResultSet: {}", ctx.getQueryHandle());
-    // This should be applicable only for a async query
-    return createResultSet(ctx, false);
-  }
-
-  /*
-   * (non-Javadoc)
-   *
    * @see 
org.apache.lens.server.api.driver.LensDriver#closeResultSet(org.apache.lens.api.query.QueryHandle)
    */
   @Override
@@ -874,6 +862,11 @@ public class HiveDriver extends AbstractLensDriver {
     }
   }
 
+  @Override
+  protected LensResultSet createResultSet(QueryContext context) throws 
LensException {
+    return createResultSet(context, false);
+  }
+
   /**
    * Creates the result set.
    *

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java
 
b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java
index f8abd78..4d52e22 100644
--- 
a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java
+++ 
b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveInMemoryResultSet.java
@@ -81,7 +81,7 @@ public class HiveInMemoryResultSet extends InMemoryResultSet {
     this.closeAfterFecth = closeAfterFecth;
     this.metadata = client.getResultSetMetadata(opHandle);
     this.numColumns = metadata.getColumnDescriptors().size();
-    this.seekToStart();
+    this.orientation = FetchOrientation.FETCH_FIRST;
   }
 
   /*
@@ -103,12 +103,6 @@ public class HiveInMemoryResultSet extends 
InMemoryResultSet {
     return hrsMeta;
   }
 
-  @Override
-  public boolean seekToStart() {
-    orientation = FetchOrientation.FETCH_FIRST;
-    return true;
-  }
-
   /*
      * (non-Javadoc)
      *

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java 
b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
index 82d7513..eef4464 100644
--- a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
+++ b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCDriver.java
@@ -63,7 +63,7 @@ import 
org.apache.lens.server.model.MappedDiagnosticLogSegregationContext;
 
 import org.apache.commons.lang3.StringUtils;
 
- import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.HiveParser;
@@ -235,7 +235,7 @@ public class JDBCDriver extends AbstractLensDriver {
     private boolean isClosed;
 
     /** The lens result set. */
-    private JDBCResultSet lensResultSet;
+    private InMemoryResultSet lensResultSet;
 
     /**
      * Close.
@@ -996,16 +996,13 @@ public class JDBCDriver extends AbstractLensDriver {
     }
   }
 
-  /**
-   * Fetch the results of the query, specified by the handle.
-   *
-   * @param context the context
-   * @return returns the {@link LensResultSet}.
-   * @throws LensException the lens exception
-   */
   @Override
-  public LensResultSet fetchResultSet(QueryContext context) throws 
LensException {
+  protected LensResultSet createResultSet(QueryContext ctx) throws 
LensException {
     checkConfigured();
+    return getDriverResult(ctx);
+  }
+
+  private LensResultSet getDriverResult(QueryContext context) throws 
LensException {
     JdbcQueryContext ctx = getQueryContext(context.getQueryHandle());
     if (ctx.isCancelled()) {
       throw new LensException("Result set not available for cancelled query " 
+ context.getQueryHandle());
@@ -1147,6 +1144,5 @@ public class JDBCDriver extends AbstractLensDriver {
   @Override
   public void writeExternal(ObjectOutput arg0) throws IOException {
     // TODO Auto-generated method stub
-
   }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java 
b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java
index 8b4da3f..9e1a0c0 100644
--- 
a/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java
+++ 
b/lens-driver-jdbc/src/main/java/org/apache/lens/driver/jdbc/JDBCResultSet.java
@@ -79,7 +79,6 @@ public class JDBCResultSet extends InMemoryResultSet {
     this.queryResult = queryResult;
     this.resultSet = resultSet;
     this.closeAfterFetch = closeAfterFetch;
-    seekToStart();
   }
 
   private ResultSetMetaData getRsMetadata() throws LensException {
@@ -294,18 +293,6 @@ public class JDBCResultSet extends InMemoryResultSet {
     }
   }
 
-  @Override
-  public boolean seekToStart() throws LensException {
-    try {
-      if (!resultSet.isClosed() && !resultSet.isBeforeFirst()) {
-        resultSet.beforeFirst();
-      }
-      return true;
-    } catch (SQLException e) {
-      throw new LensException(e);
-    }
-  }
-
   /*
      * (non-Javadoc)
      *

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
 
b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
index b96cf88..81a9552 100644
--- 
a/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
+++ 
b/lens-driver-jdbc/src/test/java/org/apache/lens/driver/jdbc/TestJdbcDriver.java
@@ -46,6 +46,7 @@ import org.apache.lens.server.api.util.LensUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.session.SessionState;
+
 import org.apache.hive.service.cli.ColumnDescriptor;
 
 import org.testng.Assert;
@@ -54,6 +55,7 @@ import org.testng.annotations.*;
 import com.codahale.metrics.MetricRegistry;
 import com.google.common.collect.Lists;
 import com.mchange.v2.c3p0.ComboPooledDataSource;
+
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -411,6 +413,88 @@ public class TestJdbcDriver {
     }
   }
 
+  /**
+   * Data provider for test case {@link #testExecuteWithPreFetch()}
+   * @return
+   */
+  @DataProvider
+  public Object[][] executeWithPreFetchDP() {
+    return new Object[][] {
+      //int rowsToPreFecth, boolean isComplteleyFetched, int rowsPreFetched, 
boolean createTable, long executeTimeout
+      {10, true, 10, true, 20000}, //result has 10 rows and all 10 rows are 
pre fetched
+      {5, false, 6, false, 8000}, //result has 10 rows and 5 rows are pre 
fetched. (Extra row is  fetched = 5+1 = 6)
+      {15, true, 10, false, 8000}, //result has 10 rows and 15 rows are 
requested to be pre fetched
+      {10, false, 0, false, 10}, //similar to case 1 but executeTimeout is 
very less.
+    };
+  }
+
+  /**
+   * @param rowsToPreFecth  : requested number of rows to be pre-fetched
+   * @param isComplteleyFetched : whether the wrapped in memory result has 
been completely accessed due to pre fetch
+   * @param rowsPreFetched : actual rows pre-fetched
+   * @param createTable : whether to create a table before the test case is run
+   * @param executeTimeoutMillis :If the query does not finish with in this 
time pre fetch is ignored.
+   * @throws Exception
+   */
+  @Test(dataProvider = "executeWithPreFetchDP")
+  public void testExecuteWithPreFetch(int rowsToPreFecth, boolean 
isComplteleyFetched, int rowsPreFetched,
+      boolean createTable, long executeTimeoutMillis) throws Exception {
+    if (createTable) {
+      createTable("execute_prefetch_test");
+      insertData("execute_prefetch_test");
+    }
+
+    // Query
+    final String query = "SELECT * FROM execute_prefetch_test";
+    Configuration conf = new Configuration(baseConf);
+    conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, true);
+    conf.setBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, false);
+    conf.setBoolean(LensConfConstants.PREFETCH_INMEMORY_RESULTSET, true);
+    conf.setInt(LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS, 
rowsToPreFecth);
+    QueryContext context = createQueryContext(query, conf);
+    context.setExecuteTimeoutMillis(executeTimeoutMillis);
+    driver.executeAsync(context);
+    LensResultSet resultSet = driver.fetchResultSet(context);
+    assertNotNull(resultSet);
+
+    //Check Type
+    if (executeTimeoutMillis > 1000) { //enough time to execute the query
+      assertTrue(resultSet instanceof PartiallyFetchedInMemoryResultSet);
+    } else {
+      assertFalse(resultSet instanceof PartiallyFetchedInMemoryResultSet);
+      return; // NO need to check further in this  case
+    }
+
+    PartiallyFetchedInMemoryResultSet prs = 
(PartiallyFetchedInMemoryResultSet) resultSet;
+    assertEquals(prs.isComplteleyFetched(), isComplteleyFetched);
+
+    //Check Streaming flow
+    if (isComplteleyFetched) {
+      assertTrue(prs.isComplteleyFetched());
+      prs.getPreFetchedRows(); //This will be called while streaming
+      assertEquals(prs.size().intValue(), rowsPreFetched);
+    } else {
+      assertFalse(prs.isComplteleyFetched());
+      assertEquals(prs.getPreFetchedRows().size(), rowsPreFetched);
+    }
+
+    assertEquals(prs.getMetadata().getColumns().size(), 1);
+    assertEquals(prs.getMetadata().getColumns().get(0).getName(), "ID");
+
+    // Check Persistence flow
+    int rowCount = 0;
+    while (prs.hasNext()) {
+      ResultRow row = prs.next();
+      assertEquals(row.getValues().get(0), rowCount);
+      rowCount++;
+    }
+    assertEquals(rowCount, 10);
+    prs.setFullyAccessed(true);
+
+    //Check Purge
+    assertEquals(prs.canBePurged() , true);
+  }
+
   @Test
   public void testJdbcSqlException() throws Exception {
     final String query = "SELECT invalid_column FROM execute_test";

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
index 8df389b..1b7d0f9 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
@@ -998,6 +998,26 @@ public final class LensConfConstants {
   public static final String DEFAULT_HDFS_OUTPUT_RETENTION = "1 day";
 
   /**
+   * Pre Fetch results in case of in memory result sets.
+   */
+  public static final String PREFETCH_INMEMORY_RESULTSET = QUERY_PFX + 
"prefetch.inmemory.resultset";
+
+  /**
+   * Pre Fetch results in case of in memory result sets is enabled by default
+   */
+  public static final boolean DEFAULT_PREFETCH_INMEMORY_RESULTSET = true;
+
+  /**
+   * Pre-Fetch size for in memory results. Makes sense only if {@link 
#PREFETCH_INMEMORY_RESULTSET} set to true
+   */
+  public static final String PREFETCH_INMEMORY_RESULTSET_ROWS = QUERY_PFX + 
"prefetch.inmemory.resultset.rows";
+
+  /**
+   * Default Pre-Fetch size for in memory results.
+   */
+  public static final int DEFAULT_PREFETCH_INMEMORY_RESULTSET_ROWS = 100;
+
+  /**
    * The Constant EXCLUDE_CUBE_TABLES.
    */
   public static final String EXCLUDE_CUBE_TABLES = SESSION_PFX + 
"metastore.exclude.cubetables.from.nativetables";

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
index ed1fc43..d447417 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/AbstractLensDriver.java
@@ -18,21 +18,23 @@
  */
 package org.apache.lens.server.api.driver;
 
-
 import org.apache.lens.api.Priority;
 import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.query.QueryContext;
 
 import org.apache.commons.lang.StringUtils;
+
 import org.apache.hadoop.conf.Configuration;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Abstract class for Lens Driver Implementations. Provides default
  * implementations and some utility methods for drivers
  */
+@Slf4j
 public abstract class AbstractLensDriver implements LensDriver {
   /**
    * Separator used for constructing fully qualified name and driver resource 
path
@@ -54,6 +56,36 @@ public abstract class AbstractLensDriver implements 
LensDriver {
   }
 
   /**
+   * Default implementation for fetchResultSet for all drivers. Should hold 
good in most cases.
+   * Note : If a driver is sticking to this default implementation, it should
+   * override {@link #createResultSet(QueryContext)}
+   */
+  @Override
+  public LensResultSet fetchResultSet(QueryContext ctx) throws LensException {
+    log.info("FetchResultSet: {}", ctx.getQueryHandle());
+    synchronized (ctx) {
+      if (!ctx.isDriverResultRegistered()) {
+        ctx.registerDriverResult(createResultSet(ctx));
+      }
+    }
+    return ctx.getDriverResult();
+  }
+
+  /**
+   * This method should create ResultSet for the query represented by the 
context.
+   * Specific driver should override this method to return driver specific 
LensResultSet whenever the
+   * driver relies on default implementation of {@link 
#fetchResultSet(QueryContext)}
+   *
+   * Note: Default Implementation throw exception.
+   *
+   * @param ctx
+   * @return
+   */
+  protected LensResultSet createResultSet(QueryContext ctx) throws 
LensException {
+    throw new LensException(this.getClass().getSimpleName() + " should 
override method createResultSet(QueryContext)");
+  }
+
+  /**
    * Gets the path (relative to lens server's conf location) for the driver 
resource in the system. This is a utility
    * method that can be used by extending driver implementations to build path 
for their resources.
    *

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
index 0d64471..535065d 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/InMemoryResultSet.java
@@ -39,8 +39,6 @@ public abstract class InMemoryResultSet extends LensResultSet 
{
   @Getter
   private long creationTime = System.currentTimeMillis();;
 
-  public abstract boolean seekToStart() throws LensException;
-
   @Override
   public boolean canBePurged() {
     return fullyAccessed;
@@ -84,7 +82,7 @@ public abstract class InMemoryResultSet extends LensResultSet 
{
     while (hasNext()) {
       rows.add(next());
     }
-    fullyAccessed = true;
+    this.setFullyAccessed(true);
     return new InMemoryQueryResult(rows);
   }
   public boolean isHttpResultAvailable() throws LensException {

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java
new file mode 100644
index 0000000..59918bd
--- /dev/null
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/driver/PartiallyFetchedInMemoryResultSet.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.server.api.driver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lens.api.query.ResultRow;
+import org.apache.lens.server.api.error.LensException;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * This is a wrapper over InMemoryResultSet which pre-fetches requested number 
of rows in memory. All calls are
+ * delegated to the underlying InMemoryResultSet except for the calls that 
access pre-fetched rows.
+ *
+ * This wrapper was created to support partial streaming of big result sets 
and complete streaming of SMALL result sets
+ * along with persistence. The pre-fetched result available via {@link 
#getPreFetchedRows()} can be used for streaming
+ * while the persistence logic can iterate over complete result set using 
{@link #hasNext()} and {@link #next()}.
+ *
+ * Note
+ * Streaming and persistence can occur concurrently irrespective of the 
underlying InMemoryResultSet
+ * implementation.
+ * Streaming of partial results is not supported at server level as of now.
+ */
+@Slf4j
+public class PartiallyFetchedInMemoryResultSet extends InMemoryResultSet {
+  /**
+   * Underlying in-memory result set
+   */
+  private InMemoryResultSet inMemoryRS;
+
+  /**
+   *Number for rows pre-fetched and kept in memory.
+   */
+  private int numOfPreFetchedRows;
+
+  /**
+   *Cursor for the pre-fetched in memory result.
+   */
+  private int cursor;
+
+  /**
+   * Indicates whether the underlying in-memory result has been completely 
pre-fetched and kept in memory.
+   */
+  @Getter
+  private boolean isComplteleyFetched;
+
+  /**
+   * The pre-fteched in memory result cache.
+   */
+  private List<ResultRow> preFetchedRows;
+
+  /**
+   * This is set to true once preFetchedRows have been consumed.
+   */
+  private boolean preFetchedRowsConsumed;
+
+  /**
+   * Constructor
+   * @param inMemoryRS : Underlying in-memory result set
+   * @param reqPreFetchSize : requested number of rows to be pre-fetched and 
cached.
+   * @throws LensException
+   */
+  public PartiallyFetchedInMemoryResultSet(InMemoryResultSet inMemoryRS, int 
reqPreFetchSize) throws LensException {
+    this.inMemoryRS = inMemoryRS;
+    if (reqPreFetchSize <= 0) {
+      throw new IllegalArgumentException("Invalid pre fetch size " + 
reqPreFetchSize);
+    }
+    preFetchRows(reqPreFetchSize);
+    log.info("Pre-Fetched {} rows of result and isComplteleyFetched = {} and 
doNotPurgeUntilTimeMillis ={}",
+        numOfPreFetchedRows, isComplteleyFetched);
+  }
+
+  private void preFetchRows(int reqPreFetchSize) throws LensException {
+    //rows fetched = reqPreFetchSize+1. One extra row is read to check if 
underlying inMemoryRS result is completely
+    //or partially read.
+    preFetchedRows = new ArrayList<ResultRow>(reqPreFetchSize + 1);
+    boolean hasNext = inMemoryRS.hasNext();
+    while (hasNext) {
+      if (numOfPreFetchedRows >= reqPreFetchSize) {
+        break;
+      }
+      preFetchedRows.add(inMemoryRS.next());
+      numOfPreFetchedRows++;
+      hasNext = inMemoryRS.hasNext();
+    }
+
+    if (!hasNext) {
+      isComplteleyFetched = true; // No more rows to be read form inMemory 
result.
+    } else {
+      isComplteleyFetched = false;
+      //we have accessed ( hasNext() for ) one extra row. Lets cache it too.
+      preFetchedRows.add(inMemoryRS.next());
+      numOfPreFetchedRows++;
+    }
+  }
+
+  @Override
+  public boolean hasNext() throws LensException {
+    cursor++;
+    if (cursor <= numOfPreFetchedRows) {
+      return true;
+    } else if (isComplteleyFetched) {
+      return false;
+    } else {
+      return inMemoryRS.hasNext();
+    }
+  }
+
+  @Override
+  public ResultRow next() throws LensException {
+    if (cursor <= numOfPreFetchedRows) {
+      return preFetchedRows.get(cursor-1);
+    } else {
+      return inMemoryRS.next();
+    }
+  }
+
+  @Override
+  public void setFetchSize(int size) throws LensException {
+    inMemoryRS.setFetchSize(size);
+  }
+
+  @Override
+  public Integer size() throws LensException {
+    if (isComplteleyFetched) {
+      return numOfPreFetchedRows;
+    } else {
+      return inMemoryRS.size();
+    }
+  }
+
+  @Override
+  public LensResultSetMetadata getMetadata() throws LensException {
+    return inMemoryRS.getMetadata();
+  }
+
+  @Override
+  public boolean canBePurged() {
+    //If the result is completely pre-fetched, defer the purging until 
preFetchedRows have been consumed.
+    //In Case not consumed, it should be cleared based on 
lens.server.inmemory.resultset.ttl.secs
+    if (isComplteleyFetched && !preFetchedRowsConsumed) {
+      return false;
+    } else {
+      return inMemoryRS.canBePurged();
+    }
+  }
+
+  @Override
+  public void setFullyAccessed(boolean fullyAccessed) {
+    inMemoryRS.setFullyAccessed(fullyAccessed);
+  }
+
+  public List<ResultRow> getPreFetchedRows() {
+    preFetchedRowsConsumed = true;
+    return preFetchedRows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
index 1269e45..96846c1 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/query/QueryContext.java
@@ -18,6 +18,11 @@
  */
 package org.apache.lens.server.api.query;
 
+import static 
org.apache.lens.server.api.LensConfConstants.DEFAULT_PREFETCH_INMEMORY_RESULTSET;
+import static 
org.apache.lens.server.api.LensConfConstants.DEFAULT_PREFETCH_INMEMORY_RESULTSET_ROWS;
+import static 
org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET;
+import static 
org.apache.lens.server.api.LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
@@ -30,7 +35,10 @@ import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.api.query.QueryStatus.Status;
 import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.driver.DriverQueryStatus;
+import org.apache.lens.server.api.driver.InMemoryResultSet;
 import org.apache.lens.server.api.driver.LensDriver;
+import org.apache.lens.server.api.driver.LensResultSet;
+import org.apache.lens.server.api.driver.PartiallyFetchedInMemoryResultSet;
 import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy;
 import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint;
@@ -41,15 +49,14 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+
 import lombok.Getter;
 import lombok.Setter;
-import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 /**
  * The Class QueryContext.
  */
-@ToString
 @Slf4j
 public class QueryContext extends AbstractQueryContext {
 
@@ -166,6 +173,28 @@ public class QueryContext extends AbstractQueryContext {
   private String queryName;
 
   /**
+   * This is the timeout that the client may have provided while initiating 
query execution
+   * This value is used for pre fetching in-memory result if applicable
+   *
+   * Note: in case the timeout is not provided, this value will not be set.
+   */
+  @Setter
+  @Getter
+  private transient long executeTimeoutMillis;
+
+  /**
+   * Query result registered by driver
+   */
+  @Getter
+  private transient LensResultSet driverResult;
+
+  /**
+   * True if driver has registered the result
+   */
+  @Getter
+  private transient boolean isDriverResultRegistered;
+
+  /**
    * Creates context from query
    *
    * @param query the query
@@ -187,7 +216,7 @@ public class QueryContext extends AbstractQueryContext {
    */
   public QueryContext(PreparedQueryContext prepared, String user, LensConf 
qconf, Configuration conf) {
     this(prepared.getUserQuery(), user, qconf, mergeConf(prepared.getConf(), 
conf), prepared.getDriverContext()
-      .getDriverQueryContextMap().keySet(), 
prepared.getDriverContext().getSelectedDriver(), true);
+        .getDriverQueryContextMap().keySet(), 
prepared.getDriverContext().getSelectedDriver(), true);
     setDriverContext(prepared.getDriverContext());
     setSelectedDriverQuery(prepared.getSelectedDriverQuery());
     setSelectedDriverQueryCost(prepared.getSelectedDriverQueryCost());
@@ -203,8 +232,8 @@ public class QueryContext extends AbstractQueryContext {
    * @param drivers All the drivers
    * @param selectedDriver SelectedDriver
    */
-  QueryContext(String userQuery, String user, LensConf qconf, Configuration 
conf,
-    Collection<LensDriver> drivers, LensDriver selectedDriver, boolean 
mergeDriverConf) {
+  QueryContext(String userQuery, String user, LensConf qconf, Configuration 
conf, Collection<LensDriver> drivers,
+      LensDriver selectedDriver, boolean mergeDriverConf) {
     this(userQuery, user, qconf, conf, drivers, selectedDriver, 
System.currentTimeMillis(), mergeDriverConf);
   }
 
@@ -219,8 +248,8 @@ public class QueryContext extends AbstractQueryContext {
    * @param selectedDriver the selected driver
    * @param submissionTime the submission time
    */
-  QueryContext(String userQuery, String user, LensConf qconf, Configuration 
conf,
-    Collection<LensDriver> drivers, LensDriver selectedDriver, long 
submissionTime, boolean mergeDriverConf) {
+  QueryContext(String userQuery, String user, LensConf qconf, Configuration 
conf, Collection<LensDriver> drivers,
+      LensDriver selectedDriver, long submissionTime, boolean mergeDriverConf) 
{
     super(userQuery, user, qconf, conf, drivers, mergeDriverConf);
     this.submissionTime = submissionTime;
     this.queryHandle = new QueryHandle(UUID.randomUUID());
@@ -228,9 +257,9 @@ public class QueryContext extends AbstractQueryContext {
     this.lensConf = qconf;
     this.conf = conf;
     this.isPersistent = 
conf.getBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_SET,
-      LensConfConstants.DEFAULT_PERSISTENT_RESULT_SET);
+        LensConfConstants.DEFAULT_PERSISTENT_RESULT_SET);
     this.isDriverPersistent = 
conf.getBoolean(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER,
-      LensConfConstants.DEFAULT_DRIVER_PERSISTENT_RESULT_SET);
+        LensConfConstants.DEFAULT_DRIVER_PERSISTENT_RESULT_SET);
     this.userQuery = userQuery;
     if (selectedDriver != null) {
       this.setSelectedDriver(selectedDriver);
@@ -252,7 +281,7 @@ public class QueryContext extends AbstractQueryContext {
    * @return QueryContext object
    */
   public static QueryContext createContextWithSingleDriver(String query, 
String user, LensConf qconf,
-    Configuration conf, LensDriver driver, String lensSessionPublicId, boolean 
mergeDriverConf) {
+      Configuration conf, LensDriver driver, String lensSessionPublicId, 
boolean mergeDriverConf) {
     QueryContext ctx = new QueryContext(query, user, qconf, conf, 
Lists.newArrayList(driver), driver, mergeDriverConf);
     ctx.setLensSessionIdentifier(lensSessionPublicId);
     return ctx;
@@ -336,7 +365,6 @@ public class QueryContext extends AbstractQueryContext {
   }
 
   public synchronized void setStatus(final QueryStatus newStatus) throws 
LensException {
-    validateTransition(newStatus);
     log.info("Updating status of {} from {} to {}", getQueryHandle(), 
this.status, newStatus);
     this.status = newStatus;
   }
@@ -406,6 +434,10 @@ public class QueryContext extends AbstractQueryContext {
     return this.status.finished();
   }
 
+  public boolean successful() {
+    return this.status.successful();
+  }
+
   public boolean launched() {
     return this.status.launched();
   }
@@ -440,4 +472,36 @@ public class QueryContext extends AbstractQueryContext {
       setDriverCost(driver, driver.estimate(this));
     }
   }
+
+  public synchronized void registerDriverResult(LensResultSet result) throws 
LensException {
+    if (isDriverResultRegistered) {
+      return; //already registered
+    }
+    this.isDriverResultRegistered = true;
+    /*
+     *  Check if results needs to be streamed to client in which case driver 
result needs to be wrapped in
+     *  PartiallyFetchedInMemoryResultSet
+     *
+     *  1. Driver Result should be of type InMemory (for streaming) as only 
such results can be streamed fast
+     *  2. Query result should be server persistent. Only in this case, an 
early streaming is required by client
+     *  that starts even before server level result persistence/formatting 
finishes.
+     *  3. When execiteTimeout is 0, it refers to an async query. In this case 
streaming result does not make sense
+     *  4. PREFETCH_INMEMORY_RESULTSET = true, implies client intends to get 
early streamed result
+     *  5. rowsToPreFetch should be > 0
+     */
+    if (isPersistent && executeTimeoutMillis > 0
+        && result instanceof InMemoryResultSet
+        && conf.getBoolean(PREFETCH_INMEMORY_RESULTSET, 
DEFAULT_PREFETCH_INMEMORY_RESULTSET)) {
+      int rowsToPreFetch = conf.getInt(PREFETCH_INMEMORY_RESULTSET_ROWS, 
DEFAULT_PREFETCH_INMEMORY_RESULTSET_ROWS);
+      if (rowsToPreFetch > 0) {
+        long executeTimeOutTime = submissionTime + executeTimeoutMillis;
+        if (System.currentTimeMillis() < executeTimeOutTime) {
+          this.driverResult = new 
PartiallyFetchedInMemoryResultSet((InMemoryResultSet) result, rowsToPreFetch);
+          return;
+        }
+      }
+    }
+    this.driverResult = result;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
 
b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
index a20cf47..7f39da1 100644
--- 
a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
+++ 
b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java
@@ -337,11 +337,6 @@ public class MockDriver extends AbstractLensDriver {
       }
 
       @Override
-      public boolean seekToStart() throws LensException {
-        return false;
-      }
-
-      @Override
       public boolean hasNext() throws LensException {
         // TODO Auto-generated method stub
         return false;

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index e61398b..49ab241 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -74,7 +74,6 @@ import org.apache.lens.server.util.UtilityMethods;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -521,6 +520,8 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
      */
     private final Date finishTime;
 
+    private LensResultSet driverRS;
+
     /**
      * Instantiates a new finished query.
      *
@@ -534,22 +535,29 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       } else {
         this.finishTime = new Date(ctx.getEndTime());
       }
+      if (ctx.isResultAvailableInDriver()) {
+        try {
+          driverRS = ctx.getSelectedDriver().fetchResultSet(getCtx());
+        } catch (LensException e) {
+          log.error(
+              "Error while getting result ser form driver {}. Driver result 
set based purging logic will be ignored",
+              ctx.getSelectedDriver(), e);
+        }
+      }
     }
 
     public boolean canBePurged() {
       try {
-        if (getCtx().getStatus().getStatus().equals(SUCCESSFUL)) {
-          if (getCtx().getStatus().isResultSetAvailable()) {
-            LensResultSet rs = getResultset();
-            log.info("Resultset for {} is {}", getQueryHandle(), 
rs.getClass().getSimpleName());
-            if (rs instanceof InMemoryResultSet
-                && System.currentTimeMillis()
-                > ((InMemoryResultSet) rs).getCreationTime() + 
inMemoryResultsetTTLMillis) {
-              log.info("InMemoryResultSet for query {} has exceeded its TTL 
and is eligible for purging now",
-                  getQueryHandle());
-              return true;
-            }
-            return rs.canBePurged();
+        if (getCtx().getStatus().getStatus().equals(SUCCESSFUL) && 
getCtx().getStatus().isResultSetAvailable()) {
+          LensResultSet serverRS = getResultset();
+          log.info("Server Resultset for {} is {}", getQueryHandle(), 
serverRS.getClass().getSimpleName());
+          // driverRS and serverRS will not match when server persistence is 
enabled. Check for purgability of both
+          // result sets in this case
+          if (driverRS != null && driverRS != serverRS) {
+            log.info("Driver Resultset for {} is {}", getQueryHandle(), 
driverRS.getClass().getSimpleName());
+            return serverRS.canBePurged() && (driverRS.canBePurged() || 
hasResultSetExceededTTL(driverRS));
+          } else {
+            return serverRS.canBePurged() || hasResultSetExceededTTL(serverRS);
           }
         }
         return true;
@@ -560,6 +568,23 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       }
     }
 
+    /**
+     * Checks the TTL for ResultSet. TTL is applicable to In Memory ResultSets 
only.
+     *
+     * @param resultSet
+     * @return
+     */
+    private boolean hasResultSetExceededTTL(LensResultSet resultSet) {
+      if (resultSet instanceof InMemoryResultSet
+          && System.currentTimeMillis() > ((InMemoryResultSet) 
resultSet).getCreationTime()
+              + inMemoryResultsetTTLMillis) {
+        log.info("InMemoryResultSet for query {} has exceeded its TTL and is 
eligible for purging now",
+            getQueryHandle());
+        return true;
+      }
+      return false;
+    }
+
     private LensResultSet getResultset() throws LensException {
       return QueryExecutionServiceImpl.this.getResultset(getQueryHandle());
     }
@@ -1514,22 +1539,24 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
         if (ctx.isFinishedQueryPersisted()) {
           return getResultsetFromDAO(queryHandle);
         }
-        LensResultSet resultSet = resultSets.get(queryHandle);
-        if (resultSet == null) {
-          if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) {
-            resultSets.put(queryHandle, new LensPersistentResult(ctx, conf));
-          } else if (allQueries.get(queryHandle).isResultAvailableInDriver()) {
-            resultSet = getDriverResultset(queryHandle);
-            resultSets.put(queryHandle, resultSet);
-          } else {
-            throw new NotFoundException("Result set not available for query:" 
+ queryHandle);
+        if (ctx.successful()) { // Do not return any result set for queries 
that have not finished successfully.
+          LensResultSet resultSet = resultSets.get(queryHandle);
+          if (resultSet == null) {
+            if (ctx.isPersistent() && ctx.getQueryOutputFormatter() != null) {
+              resultSets.put(queryHandle, new LensPersistentResult(ctx, conf));
+            } else if 
(allQueries.get(queryHandle).isResultAvailableInDriver()) {
+              resultSet = getDriverResultset(queryHandle);
+              resultSets.put(queryHandle, resultSet);
+            }
           }
         }
       }
-      if (resultSets.get(queryHandle) instanceof InMemoryResultSet) {
-        ((InMemoryResultSet) resultSets.get(queryHandle)).seekToStart();
+
+      LensResultSet result = resultSets.get(queryHandle);
+      if (result == null) {
+        throw new NotFoundException("Result set not available for query:" + 
queryHandle);
       }
-      return resultSets.get(queryHandle);
+      return result;
     }
   }
 
@@ -1645,7 +1672,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       PreparedQueryContext pctx = getPreparedQueryContext(sessionHandle, 
prepareHandle);
       Configuration qconf = getLensConf(sessionHandle, conf);
       accept(pctx.getUserQuery(), qconf, SubmitOp.EXECUTE);
-      QueryContext ctx = createContext(pctx, 
getSession(sessionHandle).getLoggedInUser(), conf, qconf);
+      QueryContext ctx = createContext(pctx, 
getSession(sessionHandle).getLoggedInUser(), conf, qconf, 0);
       if (StringUtils.isNotBlank(queryName)) {
         // Override previously set query name
         ctx.setQueryName(queryName);
@@ -1674,7 +1701,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       acquire(sessionHandle);
       PreparedQueryContext pctx = getPreparedQueryContext(sessionHandle, 
prepareHandle);
       Configuration qconf = getLensConf(sessionHandle, conf);
-      QueryContext ctx = createContext(pctx, 
getSession(sessionHandle).getLoggedInUser(), conf, qconf);
+      QueryContext ctx = createContext(pctx, 
getSession(sessionHandle).getLoggedInUser(), conf, qconf, timeoutMillis);
       if (StringUtils.isNotBlank(queryName)) {
         // Override previously set query name
         ctx.setQueryName(queryName);
@@ -1701,7 +1728,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       acquire(sessionHandle);
       Configuration qconf = getLensConf(sessionHandle, conf);
       accept(query, qconf, SubmitOp.EXECUTE);
-      QueryContext ctx = createContext(query, 
getSession(sessionHandle).getLoggedInUser(), conf, qconf);
+      QueryContext ctx = createContext(query, 
getSession(sessionHandle).getLoggedInUser(), conf, qconf, 0);
       ctx.setQueryName(queryName);
       return executeAsyncInternal(sessionHandle, ctx);
     } finally {
@@ -1719,9 +1746,10 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
    * @return the query context
    * @throws LensException the lens exception
    */
-  protected QueryContext createContext(String query, String userName, LensConf 
conf, Configuration qconf)
-    throws LensException {
+  protected QueryContext createContext(String query, String userName, LensConf 
conf, Configuration qconf,
+      long timeOutMillis) throws LensException {
     QueryContext ctx = new QueryContext(query, userName, conf, qconf, 
drivers.values());
+    ctx.setExecuteTimeoutMillis(timeOutMillis);
     return ctx;
   }
 
@@ -1735,9 +1763,10 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
    * @return the query context
    * @throws LensException the lens exception
    */
-  protected QueryContext createContext(PreparedQueryContext pctx, String 
userName, LensConf conf, Configuration qconf)
-    throws LensException {
+  protected QueryContext createContext(PreparedQueryContext pctx, String 
userName, LensConf conf, Configuration qconf,
+      long timeOutMillis) throws LensException {
     QueryContext ctx = new QueryContext(pctx, userName, conf, qconf);
+    ctx.setExecuteTimeoutMillis(timeOutMillis);
     return ctx;
   }
 
@@ -1781,7 +1810,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     try {
       log.info("UpdateQueryConf: session:{} queryHandle: {}", sessionHandle, 
queryHandle);
       acquire(sessionHandle);
-      QueryContext ctx = getQueryContext(sessionHandle, queryHandle);
+      QueryContext ctx = getUpdatedQueryContext(sessionHandle, queryHandle);
       if (ctx != null && (ctx.queued())) {
         ctx.updateConf(newconf.getProperties());
         // TODO COnf changed event tobe raised
@@ -1815,19 +1844,35 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   }
 
   /**
-   * Gets the query context.
+   *  Gets the query context either form memory or from DB (after query is 
purged)
+   *  Note: For non-purged queries the status is updated before returning the 
context
    *
    * @param sessionHandle the session handle
    * @param queryHandle   the query handle
    * @return the query context
    * @throws LensException the lens exception
    */
-  QueryContext getQueryContext(LensSessionHandle sessionHandle, QueryHandle 
queryHandle) throws LensException {
+  QueryContext getUpdatedQueryContext(LensSessionHandle sessionHandle, 
QueryHandle queryHandle) throws LensException {
+    return getUpdatedQueryContext(sessionHandle, queryHandle, false);
+  }
+
+  /**
+   * Gets the query context. If the query has been purged, null context is 
returned if returnNullIfPurged is true, else
+   * context is read form DB Note: For non-purged queries the status is 
updated before returning the context
+   *
+   * @param sessionHandle
+   * @param queryHandle
+   * @param returnNullIfPurged
+   * @return
+   * @throws LensException
+   */
+  QueryContext getUpdatedQueryContext(LensSessionHandle sessionHandle, 
QueryHandle queryHandle,
+      boolean returnNullIfPurged) throws LensException {
     try {
       acquire(sessionHandle);
       QueryContext ctx = allQueries.get(queryHandle);
       if (ctx == null) {
-        return getQueryContextOfFinishedQuery(queryHandle);
+        return (returnNullIfPurged ? null : 
getQueryContextOfFinishedQuery(queryHandle));
       }
       updateStatus(queryHandle);
       return ctx;
@@ -1864,7 +1909,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
    */
   @Override
   public LensQuery getQuery(LensSessionHandle sessionHandle, QueryHandle 
queryHandle) throws LensException {
-    return getQueryContext(sessionHandle, queryHandle).toLensQuery();
+    return getUpdatedQueryContext(sessionHandle, queryHandle).toLensQuery();
   }
 
   /**
@@ -1916,7 +1961,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       acquire(sessionHandle);
       Configuration qconf = getLensConf(sessionHandle, conf);
       accept(query, qconf, SubmitOp.EXECUTE);
-      QueryContext ctx = createContext(query, 
getSession(sessionHandle).getLoggedInUser(), conf, qconf);
+      QueryContext ctx = createContext(query, 
getSession(sessionHandle).getLoggedInUser(), conf, qconf, timeoutMillis);
       ctx.setQueryName(queryName);
       ctx.setLensSessionIdentifier(sessionHandle.getPublicId().toString());
       rewriteAndSelect(ctx);
@@ -1939,8 +1984,8 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   private QueryHandleWithResultSet executeTimeoutInternal(LensSessionHandle 
sessionHandle, QueryContext ctx,
     long timeoutMillis, Configuration conf) throws LensException {
     QueryHandle handle = submitQuery(ctx);
+    long timeOutTime = System.currentTimeMillis() + timeoutMillis;
     QueryHandleWithResultSet result = new QueryHandleWithResultSet(handle);
-    // getQueryContext calls updateStatus, which fires query events if there's 
a change in status
 
     while (isQueued(sessionHandle, handle)) {
       try {
@@ -1949,15 +1994,17 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
         log.error("Encountered Interrupted exception.", e);
       }
     }
-    QueryCompletionListener listener = new QueryCompletionListenerImpl(handle);
-    if (getQueryContext(sessionHandle, handle).getSelectedDriver() == null) {
-      result.setStatus(getQueryContext(sessionHandle, handle).getStatus());
+
+    QueryContext queryCtx = getUpdatedQueryContext(sessionHandle, handle);
+    if (queryCtx.getSelectedDriver() == null) {
+      result.setStatus(queryCtx.getStatus());
       return result;
     }
-    synchronized (ctx) {
-      if (!ctx.getStatus().finished()) {
-        getQueryContext(sessionHandle, handle).getSelectedDriver()
-          .registerForCompletionNotification(handle, timeoutMillis, listener);
+
+    QueryCompletionListenerImpl listener = new 
QueryCompletionListenerImpl(handle);
+    synchronized (queryCtx) {
+      if (!queryCtx.getStatus().finished()) {
+        queryCtx.getSelectedDriver().registerForCompletionNotification(handle, 
timeoutMillis, listener);
         try {
           synchronized (listener) {
             listener.wait(timeoutMillis);
@@ -1968,21 +2015,53 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       }
     }
 
-    if (getQueryContext(sessionHandle, handle).getStatus().finished()) {
-      if (getQueryContext(sessionHandle, 
handle).getStatus().isResultSetAvailable()) {
-        result.setResult(getResultset(handle).toQueryResult());
+    // At this stage (since the listener waits only for driver completion and 
not server that may include result
+    // formatting and persistence) the query status can be RUNNING or EXECUTED 
or FAILED or SUCCESSFUL
+    LensResultSet resultSet = null;
+    queryCtx = getUpdatedQueryContext(sessionHandle, handle, true); // If the 
query is already purged queryCtx = null
+    if (queryCtx != null && listener.querySuccessful && 
queryCtx.getStatus().isResultSetAvailable()) {
+      resultSet = queryCtx.getSelectedDriver().fetchResultSet(queryCtx);
+      if (resultSet instanceof PartiallyFetchedInMemoryResultSet) {
+        PartiallyFetchedInMemoryResultSet partialnMemoryResult = 
(PartiallyFetchedInMemoryResultSet) resultSet;
+        if (partialnMemoryResult.isComplteleyFetched()) { // DO not stream the 
result if its not completely fetched
+          result.setResult(new 
InMemoryQueryResult(partialnMemoryResult.getPreFetchedRows()));
+          
result.setResultMetadata(partialnMemoryResult.getMetadata().toQueryResultSetMetadata());
+          result.setStatus(queryCtx.getStatus());
+          return result;
+        }
+      }
+    }
+
+    // Until timeOutTime, give this query a chance to reach FINISHED status if 
not already there.
+    queryCtx = getUpdatedQueryContext(sessionHandle, handle);
+    while (!queryCtx.finished() && System.currentTimeMillis() < timeOutTime) {
+      queryCtx = getUpdatedQueryContext(sessionHandle, handle);
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        // Ignore
       }
     }
 
-    result.setStatus(getQueryContext(sessionHandle, handle).getStatus());
+    if (queryCtx.finished() && queryCtx.getStatus().isResultSetAvailable()) {
+      resultSet = getResultset(handle);
+      
result.setResultMetadata(resultSet.getMetadata().toQueryResultSetMetadata());
+      result.setResult(resultSet.toQueryResult());
+      result.setStatus(queryCtx.getStatus());
+      return result;
+    }
 
+    // Result is not available. (Explicitly setting values to null for 
readability)
+    result.setResult(null);
+    result.setResultMetadata(null);
+    result.setStatus(queryCtx.getStatus());
     return result;
   }
 
   private boolean isQueued(final LensSessionHandle sessionHandle, final 
QueryHandle handle)
     throws LensException {
     // getQueryContext calls updateStatus, which fires query events if there's 
a change in status
-    QueryContext query = getQueryContext(sessionHandle, handle);
+    QueryContext query = getUpdatedQueryContext(sessionHandle, handle);
     synchronized (query) {
       return query.queued();
     }
@@ -1996,7 +2075,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     /**
      * The succeeded.
      */
-    boolean succeeded = false;
+    boolean querySuccessful = false;
 
     /**
      * The handle.
@@ -2021,7 +2100,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     @Override
     public void onCompletion(QueryHandle handle) {
       synchronized (this) {
-        succeeded = true;
+        querySuccessful = true;
         log.info("Query {} with time out succeeded", handle);
         this.notify();
       }
@@ -2036,7 +2115,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     @Override
     public void onError(QueryHandle handle, String error) {
       synchronized (this) {
-        succeeded = false;
+        querySuccessful = false;
         log.info("Query {} with time out failed", handle);
         this.notify();
       }
@@ -2120,7 +2199,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     try {
       log.info("CancelQuery: session:{} query:{}", sessionHandle, queryHandle);
       acquire(sessionHandle);
-      QueryContext ctx = getQueryContext(sessionHandle, queryHandle);
+      QueryContext ctx = getUpdatedQueryContext(sessionHandle, queryHandle);
 
       synchronized (ctx) {
 
@@ -2592,7 +2671,7 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     } catch (IOException e) {
       throw new LensException(e);
     }
-    final QueryContext ctx = getQueryContext(sessionHandle, queryHandle);
+    final QueryContext ctx = getUpdatedQueryContext(sessionHandle, 
queryHandle);
     String resultFSReadUrl = conf.get(RESULT_FS_READ_URL);
     if (resultFSReadUrl != null) {
       try {

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/main/resources/lenssession-default.xml
----------------------------------------------------------------------
diff --git a/lens-server/src/main/resources/lenssession-default.xml 
b/lens-server/src/main/resources/lenssession-default.xml
index a321c3f..ce296cc 100644
--- a/lens-server/src/main/resources/lenssession-default.xml
+++ b/lens-server/src/main/resources/lenssession-default.xml
@@ -213,6 +213,27 @@
     </description>
   </property>
 
+  <property>
+    <name>lens.query.prefetch.inmemory.resultset</name>
+    <value>true</value>
+    <description>When set to true, specified number of rows of result set will 
be pre-fetched if the result set is of
+    type InMemoryResultSet and query execution is not asynchronous i.e. query 
should be launched with
+    operation as EXECUTE_WITH_TIMEOUT.
+    Suggested usage of this property: It can be used by client to stream as 
well as persist results in server for
+    queries that finish fast and produce results with fewer rows (should be 
less than number of rows pre-fetched).
+    Note that the results are streamed to the client early, without waiting 
for persistence to finish.
+    Default value of this property is true.
+    </description>
+  </property>
+
+  <property>
+    <name>lens.query.prefetch.inmemory.resultset.rows</name>
+    <value>100</value>
+    <description>Specifies the number of rows to pre-fetch when 
lens.query.prefetch.inmemory.resultset is set to true.
+    Default value is 100 rows.
+    </description>
+  </property>
+
   <!--  properties for session -->
   <property>
     <name>lens.session.aux.jars</name>

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java 
b/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java
index 4597f9d..a5ee5cc 100644
--- a/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java
+++ b/lens-server/src/test/java/org/apache/lens/server/query/TestLensDAO.java
@@ -63,7 +63,7 @@ public class TestLensDAO {
 
     // Test insert query
     QueryContext queryContext = service.createContext("SELECT ID FROM 
testTable", "foo@localhost", new LensConf(),
-      new Configuration());
+      new Configuration(), 0);
     long submissionTime = queryContext.getSubmissionTime();
     queryContext.setQueryName("daoTestQuery1");
     queryContext.getDriverContext().setSelectedDriver(new MockDriver());

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
index d9b7679..737c99a 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryService.java
@@ -46,12 +46,14 @@ import org.apache.lens.api.result.LensErrorTO;
 import org.apache.lens.api.result.QueryCostTO;
 import org.apache.lens.cube.error.LensCubeErrorCode;
 import org.apache.lens.driver.hive.HiveDriver;
+import org.apache.lens.lib.query.FileSerdeFormatter;
 import org.apache.lens.server.LensJerseyTest;
 import org.apache.lens.server.LensServerTestUtil;
 import org.apache.lens.server.LensServices;
 import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.driver.InMemoryResultSet;
 import org.apache.lens.server.api.driver.LensDriver;
+import org.apache.lens.server.api.driver.LensResultSetMetadata;
 import org.apache.lens.server.api.error.LensDriverErrorCode;
 import org.apache.lens.server.api.error.LensException;
 import org.apache.lens.server.api.metrics.LensMetricsRegistry;
@@ -667,13 +669,13 @@ public class TestQueryService extends LensJerseyTest {
     assertTrue(lensQuery.getDriverStartTime() > 0);
     assertTrue(lensQuery.getDriverFinishTime() > 0);
     assertTrue(lensQuery.getFinishTime() > 0);
-    QueryContext ctx = queryService.getQueryContext(lensSessionId, 
lensQuery.getQueryHandle());
+    QueryContext ctx = queryService.getUpdatedQueryContext(lensSessionId, 
lensQuery.getQueryHandle());
     assertNotNull(ctx.getPhase1RewrittenQuery());
     assertEquals(ctx.getPhase1RewrittenQuery(), ctx.getUserQuery()); //Since 
there is no rewriter in this test
     assertEquals(lensQuery.getStatus().getStatus(), 
QueryStatus.Status.SUCCESSFUL);
 
-    validatePersistedResult(handle, target(), lensSessionId, new 
String[][]{{"ID", "INT"}, {"IDSTR", "STRING"}},
-      true, mt);
+    validatePersistedResult(handle, target(), lensSessionId, new 
String[][]{{"ID", "INT"}, {"IDSTR", "STRING"}}, true,
+        false, mt);
 
     // test cancel query
     final QueryHandle handle2 = target.request(mt).post(Entity.entity(mp, 
MediaType.MULTIPART_FORM_DATA_TYPE),
@@ -723,23 +725,22 @@ public class TestQueryService extends LensJerseyTest {
    * @param parent        the parent
    * @param lensSessionId the lens session id
    * @param isDir         the is dir
+   * @param isCSVFormat   the result format is csv.
    * @throws IOException Signals that an I/O exception has occurred.
    */
   static void validatePersistedResult(QueryHandle handle, WebTarget parent, 
LensSessionHandle lensSessionId,
-    String[][] schema, boolean isDir, MediaType mt) throws IOException {
+    String[][] schema, boolean isDir, boolean isCSVFormat, MediaType mt) 
throws IOException {
     final WebTarget target = parent.path("queryapi/queries");
     // fetch results
-    validateResultSetMetadata(handle, "",
-      schema,
-      parent, lensSessionId, mt);
+    validateResultSetMetadata(handle, "", schema, parent, lensSessionId, mt);
 
     String presultset = 
target.path(handle.toString()).path("resultset").queryParam("sessionid", 
lensSessionId)
       .request(mt).get(String.class);
     System.out.println("PERSISTED RESULT:" + presultset);
 
     PersistentQueryResult resultset = 
target.path(handle.toString()).path("resultset")
-      .queryParam("sessionid", 
lensSessionId).request(mt).get(PersistentQueryResult.class);
-    validatePersistentResult(resultset, handle, isDir);
+      .queryParam("sessionid", 
lensSessionId).request().get(PersistentQueryResult.class);
+    validatePersistentResult(resultset, handle, isDir, isCSVFormat);
 
     if (isDir) {
       validNotFoundForHttpResult(parent, lensSessionId, handle);
@@ -832,12 +833,13 @@ public class TestQueryService extends LensJerseyTest {
    * @param resultset the resultset
    * @param handle    the handle
    * @param isDir     the is dir
+   * @param isCSVFormat   the result format is csv.
    * @throws IOException Signals that an I/O exception has occurred.
    */
-  static void validatePersistentResult(PersistentQueryResult resultset, 
QueryHandle handle, boolean isDir)
-    throws IOException {
+  static void validatePersistentResult(PersistentQueryResult resultset, 
QueryHandle handle, boolean isDir,
+      boolean isCSVFormat)throws IOException {
     List<String> actualRows = readResultSet(resultset, handle, isDir);
-    validatePersistentResult(actualRows);
+    validatePersistentResult(actualRows, isCSVFormat);
     if (!isDir) {
       assertEquals(resultset.getNumRows().intValue(), actualRows.size());
     }
@@ -845,24 +847,39 @@ public class TestQueryService extends LensJerseyTest {
     assertEquals(resultset.getFileSize(), fileSize);
   }
 
-  static void validatePersistentResult(List<String> actualRows) {
-    String[] expected1 = new String[]{
-      "1one",
-      "\\Ntwo123item1item2",
-      "3\\Nitem1item2",
-      "\\N\\N",
-      "5nothing",
-    };
-    String[] expected2 = new String[]{
-      "1one[][]",
-      "\\Ntwo[1,2,3][\"item1\",\"item2\"]",
-      "3\\N[][\"item1\",\"item2\"]",
-      "\\N\\N[][]",
-      "5[][\"nothing\"]",
-    };
+  static void validatePersistentResult(List<String> actualRows,  boolean 
isCSVFormat) {
+    String[] expected1 = null;
+    String[] expected2 = null;
+    if (isCSVFormat) {
+      //This case will be hit when the result is persisted by the server (CSV 
result)
+      expected1 = new String[]{
+        "\"1\",\"one\"",
+        "\"NULL\",\"two\"",
+        "\"3\",\"NULL\"",
+        "\"NULL\",\"NULL\"",
+        "\"5\",\"\"",
+      };
+    } else {
+      //This is case of hive driver persistence
+      expected1 = new String[] {
+        "1one",
+        "\\Ntwo123item1item2",
+        "3\\Nitem1item2",
+        "\\N\\N",
+        "5nothing",
+      };
+      expected2 = new String[] {
+        "1one[][]",
+        "\\Ntwo[1,2,3][\"item1\",\"item2\"]",
+        "3\\N[][\"item1\",\"item2\"]",
+        "\\N\\N[][]",
+        "5[][\"nothing\"]",
+      };
+    }
+
     for (int i = 0; i < actualRows.size(); i++) {
-      assertEquals(
-        expected1[i].indexOf(actualRows.get(i)) == 0 || 
expected2[i].indexOf(actualRows.get(i)) == 0, true);
+      assertEquals(expected1[i].indexOf(actualRows.get(i)) == 0
+          || (expected2 != null && expected2[i].indexOf(actualRows.get(i)) == 
0), true);
     }
   }
 
@@ -892,7 +909,7 @@ public class TestQueryService extends LensJerseyTest {
 
       String result = new String(bos.toByteArray());
       List<String> actualRows = Arrays.asList(result.split("\n"));
-      validatePersistentResult(actualRows);
+      validatePersistentResult(actualRows, false);
     } else {
       assertEquals(SEE_OTHER.getStatusCode(), response.getStatus());
       assertTrue(response.getHeaderString("Location").contains(redirectUrl));
@@ -999,7 +1016,7 @@ public class TestQueryService extends LensJerseyTest {
       waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, 
defaultMT);
 
       // Check TTL
-      QueryContext ctx = queryService.getQueryContext(lensSessionId, handle);
+      QueryContext ctx = queryService.getUpdatedQueryContext(lensSessionId, 
handle);
       long softExpiryTime = ctx.getDriverStatus().getDriverFinishTime()
           + queryService.getInMemoryResultsetTTLMillis() - 1000; //Keeping 
buffer of 1 secs
       int checkCount = 0;
@@ -1199,7 +1216,7 @@ public class TestQueryService extends LensJerseyTest {
       new GenericType<LensAPIResult<QueryHandleWithResultSet>>() {}).getData();
     assertNotNull(result.getQueryHandle());
     assertNotNull(result.getResult());
-    validatePersistentResult((PersistentQueryResult) result.getResult(), 
result.getQueryHandle(), true);
+    validatePersistentResult((PersistentQueryResult) result.getResult(), 
result.getQueryHandle(), true, false);
 
     final FormDataMultiPart mp2 = new FormDataMultiPart();
     LensConf conf = new LensConf();
@@ -1234,6 +1251,99 @@ public class TestQueryService extends LensJerseyTest {
     }
   }
   /**
+   * Data provider for test case {@link 
#testExecuteWithTimeoutAndPreFetechAndServerPersistence()}
+   * @return
+   */
+  @DataProvider
+  public Object[][] executeWithTimeoutAndPreFetechAndServerPersistenceDP() {
+    //Columns: timeOutMillis, preFetchRows, isStreamingResultAvailable, 
deferPersistenceByMillis
+    return new Object[][] {
+      {30000, 5, true, 0}, //result has 5 rows & all 5 rows are requested to 
be pre-fetched
+      {30000, 10, true, 6000}, //result has 5 rows & 10 rows are requested to 
be pre-fetched.
+      {30000, 2, false, 4000}, //result has 5 rows & 2 rows are requested to 
be pre-fetched. Will not stream
+      {10, 5, false, 0}, //result has 5 rows & 5 rows requested. Timeout is 
less (10ms). Will not stream
+    };
+  }
+
+  /**
+   * @param timeOutMillis : wait time for execute with timeout api
+   * @param preFetchRows : number of rows to pre-fetch in case of 
InMemoryResultSet
+   * @param isStreamingResultAvailable : whether the execute call is expected 
to return InMemoryQueryResult
+   * @param ttlMillis : The time window for which pre-fetched 
InMemoryResultSet will be available for sure.
+   * @param deferPersistenceByMillis : The time in millis by which Result 
formatter will be deferred by.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test(dataProvider = "executeWithTimeoutAndPreFetechAndServerPersistenceDP")
+  public void testExecuteWithTimeoutAndPreFetechAndServerPersistence(long 
timeOutMillis, int preFetchRows,
+      boolean isStreamingResultAvailable, long deferPersistenceByMillis) 
throws Exception {
+    final WebTarget target = target().path("queryapi/queries");
+
+    final FormDataMultiPart mp = new FormDataMultiPart();
+    mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("sessionid").build(), 
lensSessionId,
+        MediaType.APPLICATION_XML_TYPE));
+    mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("query").build(), "select ID, 
IDSTR from "
+        + TEST_TABLE));
+    mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("operation").build(), 
"execute_with_timeout"));
+    // Set a timeout value enough for tests
+    mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("timeoutmillis").build(), 
timeOutMillis + ""));
+    LensConf conf = new LensConf();
+    conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_SET, "true");
+    conf.addProperty(LensConfConstants.QUERY_PERSISTENT_RESULT_INDRIVER, 
"false");
+    conf.addProperty(LensConfConstants.PREFETCH_INMEMORY_RESULTSET, "true");
+    conf.addProperty(LensConfConstants.PREFETCH_INMEMORY_RESULTSET_ROWS, 
preFetchRows);
+    conf.addProperty(LensConfConstants.QUERY_OUTPUT_FORMATTER, 
DeferredFileSerdeFormatter.class.getName());
+    conf.addProperty("deferPersistenceByMillis", deferPersistenceByMillis); // 
property used for test only
+    mp.bodyPart(new 
FormDataBodyPart(FormDataContentDisposition.name("conf").fileName("conf").build(),
 conf,
+        MediaType.APPLICATION_XML_TYPE));
+    QueryHandleWithResultSet result 
=target.request(MediaType.APPLICATION_XML_TYPE)
+            .post(Entity.entity(mp, MediaType.MULTIPART_FORM_DATA_TYPE),
+                new GenericType<LensAPIResult<QueryHandleWithResultSet>>() 
{}).getData();
+    QueryHandle handle = result.getQueryHandle();
+    assertNotNull(handle);
+    assertNotEquals(result.getStatus().getStatus(), QueryStatus.Status.FAILED);
+
+    if (isStreamingResultAvailable) {
+      // TEST streamed result
+      assertTrue(result.getStatus().getStatus() == QueryStatus.Status.EXECUTED
+          || result.getStatus().getStatus() == QueryStatus.Status.SUCCESSFUL,
+          "Check if timeoutmillis need to be increased based on query status " 
+ result.getStatus());
+      assertEquals(result.getResultMetadata().getColumns().size(), 2);
+      assertNotNull(result.getResult());
+      validateInmemoryResult((InMemoryQueryResult) result.getResult());
+    } else if (timeOutMillis > 20000) { // timeout is sufficient for query to 
finish
+      assertTrue(result.getResult() instanceof PersistentQueryResult);
+    } else {
+      assertNull(result.getResult()); // Query execution not finished yet
+    }
+
+    waitForQueryToFinish(target(), lensSessionId, handle, Status.SUCCESSFUL, 
MediaType.APPLICATION_XML_TYPE);
+
+    // Test Persistent Result
+    validatePersistedResult(handle, target(), lensSessionId, new String[][] { 
{ "ID", "INT" }, { "IDSTR", "STRING" } },
+        false, true, MediaType.APPLICATION_XML_TYPE);
+  }
+
+  private static class DeferredFileSerdeFormatter extends FileSerdeFormatter {
+    /**
+     * Defer init so that this output formatter takes significant time.
+     */
+    @Override
+    public void init(QueryContext ctx, LensResultSetMetadata metadata) throws 
IOException {
+      super.init(ctx, metadata);
+      long deferPersistenceByMillis = 
ctx.getConf().getLong("deferPersistenceByMillis", 5000);
+      if (deferPersistenceByMillis > 0) {
+        try {
+          log.info("Deferring result formatting by {} millis", 
deferPersistenceByMillis);
+          Thread.sleep(deferPersistenceByMillis);
+        } catch (InterruptedException e) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  /**
    * Test execute with timeout query.
    *
    * @throws IOException          Signals that an I/O exception has occurred.

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java
 
b/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java
index 6db990e..f66f89d 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestResultFormatting.java
@@ -243,7 +243,7 @@ public class TestResultFormatting extends LensJerseyTest {
       // fetch results
       TestQueryService.validatePersistedResult(handle, target(), 
lensSessionId, new String[][]{
         {"ID", "INT"}, {"IDSTR", "STRING"}, {"IDARR", "ARRAY"}, {"IDSTRARR", 
"ARRAY"},
-      }, isDir, mt);
+      }, isDir, false, mt);
       if (!isDir) {
         TestQueryService.validateHttpEndPoint(target(), lensSessionId, handle, 
reDirectUrl);
       }

http://git-wip-us.apache.org/repos/asf/lens/blob/58d86364/src/site/apt/admin/session-config.apt
----------------------------------------------------------------------
diff --git a/src/site/apt/admin/session-config.apt 
b/src/site/apt/admin/session-config.apt
index e108a13..206d6a4 100644
--- a/src/site/apt/admin/session-config.apt
+++ b/src/site/apt/admin/session-config.apt
@@ -76,28 +76,32 @@ Lens session configuration
 *--+--+---+--+
 |26|lens.query.output.write.header|false|Whether to write header as part of 
query result formatting. When enabled the user given header will be added in 
case of driver persisted results, and column names chosen will be added as 
header for in-memory results.|
 *--+--+---+--+
-|27|lens.query.result.email.cc| |When query ends, the result/failure reason 
will be sent to the user via email. The mail would be cc'ed to the addresses 
provided in this field.|
+|27|lens.query.prefetch.inmemory.resultset|true|When set to true, specified 
number of rows of result set will be pre-fetched if the result set is of type 
InMemoryResultSet and query execution is not asynchronous i.e. query should be 
launched with operation as EXECUTE_WITH_TIMEOUT. Suggested usage of this 
property: It can be used by client to stream as well as persist results in 
server for queries that finish fast and produce results with fewer rows (should 
be less than number of rows pre-fetched). Note that the results are streamed to 
the client early, without waiting for persistence to finish. Default value of 
this property is true.|
 *--+--+---+--+
-|28|lens.query.result.fs.read.url| |Http read URL for FileSystem on which 
result is present, if available. For example webhdfs as http read url should 
http://host:port/webhdfs/v1. Currently we support only webhdfs url as the http 
url for HDFS file system|
+|28|lens.query.prefetch.inmemory.resultset.rows|100|Specifies the number of 
rows to pre-fetch when lens.query.prefetch.inmemory.resultset is set to true. 
Default value is 100 rows.|
 *--+--+---+--+
-|29|lens.query.result.output.dir.format| |The format of the output if result 
is persisted in hdfs. The format should be expressed in HQL.|
+|29|lens.query.result.email.cc| |When query ends, the result/failure reason 
will be sent to the user via email. The mail would be cc'ed to the addresses 
provided in this field.|
 *--+--+---+--+
-|30|lens.query.result.output.serde|org.apache.lens.lib.query.CSVSerde|The 
default serde class name that should be used by 
org.apache.lens.lib.query.FileSerdeFormatter for formatting the output|
+|30|lens.query.result.fs.read.url| |Http read URL for FileSystem on which 
result is present, if available. For example webhdfs as http read url should 
http://host:port/webhdfs/v1. Currently we support only webhdfs url as the http 
url for HDFS file system|
 *--+--+---+--+
-|31|lens.query.result.parent.dir|file:///tmp/lensreports|The directory for 
storing persisted result of query. This directory should exist and should have 
writable permissions by lens server|
+|31|lens.query.result.output.dir.format| |The format of the output if result 
is persisted in hdfs. The format should be expressed in HQL.|
 *--+--+---+--+
-|32|lens.query.result.size.format.threshold|10737418240|The maximum allowed 
size of the query result. If exceeds, no server side formatting would be done.|
+|32|lens.query.result.output.serde|org.apache.lens.lib.query.CSVSerde|The 
default serde class name that should be used by 
org.apache.lens.lib.query.FileSerdeFormatter for formatting the output|
 *--+--+---+--+
-|33|lens.query.result.split.multiple|false|Whether to split the result into 
multiple files. If enabled, each file will be restricted to max rows 
configured. All the files will be available as zip.|
+|33|lens.query.result.parent.dir|file:///tmp/lensreports|The directory for 
storing persisted result of query. This directory should exist and should have 
writable permissions by lens server|
 *--+--+---+--+
-|34|lens.query.result.split.multiple.maxrows|100000|The maximum number of rows 
allowed in each file, when splitting the result into multiple files is enabled.|
+|34|lens.query.result.size.format.threshold|10737418240|The maximum allowed 
size of the query result. If exceeds, no server side formatting would be done.|
 *--+--+---+--+
-|35|lens.session.aux.jars| |List of comma separated jar paths, which will 
added to the session|
+|35|lens.query.result.split.multiple|false|Whether to split the result into 
multiple files. If enabled, each file will be restricted to max rows 
configured. All the files will be available as zip.|
 *--+--+---+--+
-|36|lens.session.cluster.user| |Session level config which will determine 
which cluster user will access hdfs|
+|36|lens.query.result.split.multiple.maxrows|100000|The maximum number of rows 
allowed in each file, when splitting the result into multiple files is enabled.|
 *--+--+---+--+
-|37|lens.session.loggedin.user| |The username used to log in to lens. e.g. 
LDAP user|
+|37|lens.session.aux.jars| |List of comma separated jar paths, which will 
added to the session|
 *--+--+---+--+
-|38|lens.session.metastore.exclude.cubetables.from.nativetables|true|Exclude 
cube related tables when fetching native tables|
+|38|lens.session.cluster.user| |Session level config which will determine 
which cluster user will access hdfs|
+*--+--+---+--+
+|39|lens.session.loggedin.user| |The username used to log in to lens. e.g. 
LDAP user|
+*--+--+---+--+
+|40|lens.session.metastore.exclude.cubetables.from.nativetables|true|Exclude 
cube related tables when fetching native tables|
 *--+--+---+--+
 The configuration parameters and their default values

Reply via email to