Repository: lens
Updated Branches:
  refs/heads/master 9d21940be -> 3ab732acc


LENS-1286: Handle Restart cases for Scheduler


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/3ab732ac
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/3ab732ac
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/3ab732ac

Branch: refs/heads/master
Commit: 3ab732accdfb19b87da0bf5c72e301b0c2a42b84
Parents: 9d21940
Author: Lavkesh Lahngir <lavk...@linux.com>
Authored: Wed Sep 14 17:02:00 2016 +0530
Committer: Rajat Khandelwal <rajatgupt...@gmail.com>
Committed: Wed Sep 14 17:02:00 2016 +0530

----------------------------------------------------------------------
 .../lens/server/scheduler/SchedulerDAO.java     | 180 +++++++++++--------
 .../scheduler/SchedulerQueryEventListener.java  |  10 +-
 .../server/scheduler/SchedulerServiceImpl.java  | 139 ++++++++++++--
 .../server/scheduler/SchedulerRestartTest.java  | 124 +++++++++++++
 .../scheduler/TestSchedulerServiceImpl.java     |  43 +----
 .../scheduler/util/SchedulerTestUtils.java      |  46 +++++
 6 files changed, 408 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
index 966a64e..b924167 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
@@ -53,7 +53,7 @@ public class SchedulerDAO {
     this.conf = conf;
     try {
       Class dbStoreClass = Class
-          .forName(conf.get(LensConfConstants.SCHEDULER_STORE_CLASS, 
SchedulerHsqlDBStore.class.getName()));
+        .forName(conf.get(LensConfConstants.SCHEDULER_STORE_CLASS, 
SchedulerHsqlDBStore.class.getName()));
       this.store = (SchedulerDBStore) dbStoreClass.newInstance();
       this.store.init(UtilityMethods.getDataSourceFromConfForScheduler(conf));
       this.store.createJobTable();
@@ -81,7 +81,7 @@ public class SchedulerDAO {
     try {
       return store.insertIntoJobTable(jobInfo);
     } catch (SQLException e) {
-      log.error("Error while storing the jobInfo for " + 
jobInfo.getId().getHandleIdString(), e);
+      log.error("Error while storing the jobInfo for {}", 
jobInfo.getId().getHandleIdString(), e);
       return 0;
     }
   }
@@ -96,7 +96,7 @@ public class SchedulerDAO {
     try {
       return store.getSchedulerJobInfo(id.getHandleIdString());
     } catch (SQLException e) {
-      log.error("Error while getting the job detail for " + 
id.getHandleIdString(), e);
+      log.error("Error while getting the job detail for {}", 
id.getHandleIdString(), e);
       return null;
     }
   }
@@ -111,7 +111,7 @@ public class SchedulerDAO {
     try {
       return store.getJob(id.getHandleIdString());
     } catch (SQLException e) {
-      log.error("Error while getting the job for " + id.getHandleIdString(), 
e);
+      log.error("Error while getting the job for {}", id.getHandleIdString(), 
e);
       return null;
     }
   }
@@ -126,7 +126,7 @@ public class SchedulerDAO {
     try {
       return store.getUser(id.getHandleIdString());
     } catch (SQLException e) {
-      log.error("Error while getting the user for the job with handle " + 
id.getHandleIdString(), e);
+      log.error("Error while getting the user for the job with handle {}", 
id.getHandleIdString(), e);
       return null;
     }
   }
@@ -141,7 +141,7 @@ public class SchedulerDAO {
     try {
       return store.getJobState(id.getHandleIdString());
     } catch (SQLException e) {
-      log.error("Error while getting the job status for " + 
id.getHandleIdString(), e);
+      log.error("Error while getting the job status for {}", 
id.getHandleIdString(), e);
       return null;
     }
   }
@@ -156,7 +156,7 @@ public class SchedulerDAO {
     try {
       return store.updateJob(info.getId().getHandleIdString(), info.getJob(), 
info.getModifiedOn());
     } catch (SQLException e) {
-      log.error("Error while updating job for " + 
info.getId().getHandleIdString(), e);
+      log.error("Error while updating job for {}", 
info.getId().getHandleIdString(), e);
       return 0;
     }
   }
@@ -171,7 +171,7 @@ public class SchedulerDAO {
     try {
       return store.updateJobStatus(info.getId().getHandleIdString(), 
info.getJobState().name(), info.getModifiedOn());
     } catch (SQLException e) {
-      log.error("Error while updating job status for " + 
info.getId().getHandleIdString(), e);
+      log.error("Error while updating job status for {}", 
info.getId().getHandleIdString(), e);
       return 0;
     }
   }
@@ -180,7 +180,7 @@ public class SchedulerDAO {
     try {
       return store.insertIntoJobInstanceTable(instanceInfo);
     } catch (SQLException e) {
-      log.error("Error while storing job instance for " + 
instanceInfo.getId());
+      log.error("Error while storing job instance for {}", 
instanceInfo.getId(), e);
       return 0;
     }
   }
@@ -189,9 +189,9 @@ public class SchedulerDAO {
     try {
       return store.insertIntoJobInstanceRunTable(instanceRun);
     } catch (SQLException e) {
-      log.error(
-          "Error while storing job instance run for " + instanceRun.getRunId() 
+ " and instance handle " + instanceRun
-              .getHandle().getHandleIdString(), e);
+      log.error("Error while storing job instance run for {} with run id {} ",
+        instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(), 
e);
+
       return 0;
     }
   }
@@ -206,7 +206,7 @@ public class SchedulerDAO {
     try {
       return store.getJobInstanceInfo(id.getHandleIdString());
     } catch (SQLException e) {
-      log.error("Error while getting the job instance info for " + 
id.getHandleIdString(), e);
+      log.error("Error while getting the job instance info for {}", 
id.getHandleIdString(), e);
       return null;
     }
   }
@@ -221,8 +221,8 @@ public class SchedulerDAO {
     try {
       return store.updateJobInstanceRun(instanceRun);
     } catch (SQLException e) {
-      log.error("Error while updating the job instance status for " + 
instanceRun.getHandle().getHandleIdString()
-          + " and run: " + instanceRun.getRunId(), e);
+      log.error("Error while updating the job instance status for {} and run 
id {}",
+        instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(), 
e);
       return 0;
     }
   }
@@ -238,7 +238,7 @@ public class SchedulerDAO {
     try {
       return store.getAllJobInstances(id.getHandleIdString());
     } catch (SQLException e) {
-      log.error("Error while getting instances of a job with id " + 
id.getHandleIdString(), e);
+      log.error("Error while getting instances of a job with id {}" , 
id.getHandleIdString(), e);
       return null;
     }
   }
@@ -276,6 +276,21 @@ public class SchedulerDAO {
     }
   }
 
+  /**
+   * Get all instances matching one of the states
+   *
+   * @param states States to be consider for filter
+   * @return A list of SchedulerJobInstanceRun
+   */
+  public List<SchedulerJobInstanceRun> 
getInstanceRuns(SchedulerJobInstanceState... states) {
+    try {
+      return store.getInstanceRuns(states);
+    } catch (SQLException e) {
+      log.error("Error while getting jobs ", e);
+      return null;
+    }
+  }
+
   public abstract static class SchedulerDBStore {
     protected static final String JOB_TABLE = "job_table";
     protected static final String JOB_INSTANCE_TABLE = "job_instance_table";
@@ -356,7 +371,7 @@ public class SchedulerDAO {
       String insertSQL = "INSERT INTO " + JOB_TABLE + " VALUES(?,?,?,?,?,?,?)";
       JAXBElement<XJob> xmlJob = jobFactory.createJob(jobInfo.getJob());
       return runner.update(insertSQL, jobInfo.getId().toString(), 
ToXMLString.toString(xmlJob), jobInfo.getUserName(),
-          jobInfo.getJobState().name(), jobInfo.getCreatedOn(), 
jobInfo.getModifiedOn(), jobInfo.getJob().getName());
+        jobInfo.getJobState().name(), jobInfo.getCreatedOn(), 
jobInfo.getModifiedOn(), jobInfo.getJob().getName());
     }
 
     /**
@@ -369,17 +384,17 @@ public class SchedulerDAO {
     public int insertIntoJobInstanceTable(SchedulerJobInstanceInfo 
instanceInfo) throws SQLException {
       String insertSQL = "INSERT INTO " + JOB_INSTANCE_TABLE + " 
VALUES(?,?,?)";
       return runner
-          .update(insertSQL, instanceInfo.getId().getHandleIdString(), 
instanceInfo.getJobId().getHandleIdString(),
-              instanceInfo.getScheduleTime());
+        .update(insertSQL, instanceInfo.getId().getHandleIdString(), 
instanceInfo.getJobId().getHandleIdString(),
+          instanceInfo.getScheduleTime());
     }
 
     public int insertIntoJobInstanceRunTable(SchedulerJobInstanceRun 
instanceRun) throws SQLException {
       String insetSQL = "INSERT INTO " + JOB_INSTANCE_RUN_TABLE + " 
VALUES(?,?,?,?,?,?,?,?)";
       return runner.update(insetSQL, 
instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(),
-          instanceRun.getSessionHandle() == null ? "" : 
instanceRun.getSessionHandle().toString(),
-          instanceRun.getStartTime(), instanceRun.getEndTime(), 
instanceRun.getResultPath(),
-          instanceRun.getQueryHandle() == null ? "" : 
instanceRun.getQueryHandle().getHandleIdString(),
-          instanceRun.getInstanceState().name());
+        instanceRun.getSessionHandle() == null ? "" : 
instanceRun.getSessionHandle().toString(),
+        instanceRun.getStartTime(), instanceRun.getEndTime(), 
instanceRun.getResultPath(),
+        instanceRun.getQueryHandle() == null ? "" : 
instanceRun.getQueryHandle().getHandleIdString(),
+        instanceRun.getInstanceState().name());
     }
 
     /**
@@ -505,8 +520,8 @@ public class SchedulerDAO {
      */
     public int updateJob(String id, XJob job, long modifiedOn) throws 
SQLException {
       String updateSQL =
-          "UPDATE " + JOB_TABLE + " SET " + COLUMN_JOB + "=?, " + 
COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID
-              + "=?";
+        "UPDATE " + JOB_TABLE + " SET " + COLUMN_JOB + "=?, " + 
COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID
+          + "=?";
       JAXBElement<XJob> xmlJob = jobFactory.createJob(job);
       return runner.update(updateSQL, ToXMLString.toString(xmlJob), 
modifiedOn, id);
     }
@@ -522,8 +537,8 @@ public class SchedulerDAO {
      */
     public int updateJobStatus(String id, String status, long modifiedOn) 
throws SQLException {
       String updateSQL =
-          "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATUS + "=?, " + 
COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID
-              + "=?";
+        "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATUS + "=?, " + 
COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID
+          + "=?";
       return runner.update(updateSQL, status, modifiedOn, id);
     }
 
@@ -552,24 +567,7 @@ public class SchedulerDAO {
       // Get the Runs
       String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_RUN_TABLE + " WHERE " 
+ COLUMN_ID + "=?";
       List<Object[]> instanceRuns = runner.query(fetchSQL, 
multipleRowsHandler, (String) instanceInfo[0]);
-      List<SchedulerJobInstanceRun> runList = new ArrayList<>();
-      for (Object[] run : instanceRuns) {
-        // run[0] will contain the instanceID
-        int runId = (Integer) run[1];
-        LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) 
run[2]);
-        long starttime = (Long) run[3];
-        long endtime = (Long) run[4];
-        String resultPath = (String) run[5];
-        String queryHandleString = (String) run[6];
-        QueryHandle queryHandle = null;
-        if (!queryHandleString.isEmpty()) {
-          queryHandle = QueryHandle.fromString((String) run[6]);
-        }
-        SchedulerJobInstanceState instanceStatus = 
SchedulerJobInstanceState.valueOf((String) run[7]);
-        SchedulerJobInstanceRun instanceRun = new SchedulerJobInstanceRun(id, 
runId, sessionHandle, starttime, endtime,
-            resultPath, queryHandle, instanceStatus);
-        runList.add(instanceRun);
-      }
+      List<SchedulerJobInstanceRun> runList = processInstanceRun(instanceRuns);
       return new SchedulerJobInstanceInfo(id, jobId, createdOn, runList);
     }
 
@@ -582,13 +580,13 @@ public class SchedulerDAO {
      */
     public int updateJobInstanceRun(SchedulerJobInstanceRun instanceRun) 
throws SQLException {
       String updateSQL =
-          "UPDATE " + JOB_INSTANCE_RUN_TABLE + " SET " + COLUMN_END_TIME + 
"=?, " + COLUMN_RESULT_PATH + "=?, "
-              + COLUMN_QUERY_HANDLE + "=?, " + COLUMN_STATUS + "=?" + " WHERE 
" + COLUMN_ID + "=? AND " + COLUMN_RUN_ID
-              + "=?";
+        "UPDATE " + JOB_INSTANCE_RUN_TABLE + " SET " + COLUMN_END_TIME + "=?, 
" + COLUMN_RESULT_PATH + "=?, "
+          + COLUMN_QUERY_HANDLE + "=?, " + COLUMN_STATUS + "=?" + " WHERE " + 
COLUMN_ID + "=? AND " + COLUMN_RUN_ID
+          + "=?";
 
       return runner.update(updateSQL, instanceRun.getEndTime(), 
instanceRun.getResultPath(),
-          instanceRun.getQueryHandle() == null ? "" : 
instanceRun.getQueryHandle().getHandleIdString(),
-          instanceRun.getInstanceState().name(), 
instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId());
+        instanceRun.getQueryHandle() == null ? "" : 
instanceRun.getQueryHandle().getHandleIdString(),
+        instanceRun.getInstanceState().name(), 
instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId());
     }
 
     /**
@@ -625,6 +623,39 @@ public class SchedulerDAO {
         return (String) result.get(0)[0];
       }
     }
+
+    private List<SchedulerJobInstanceRun> processInstanceRun(List<Object[]> 
instanceRuns) throws SQLException {
+      List<SchedulerJobInstanceRun> runList = new ArrayList<>();
+      for (Object[] run : instanceRuns) {
+        // run[0] will contain the instanceID
+        SchedulerJobInstanceHandle id = 
SchedulerJobInstanceHandle.fromString((String) run[0]);
+        int runId = (Integer) run[1];
+        LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) 
run[2]);
+        long starttime = (Long) run[3];
+        long endtime = (Long) run[4];
+        String resultPath = (String) run[5];
+        String queryHandleString = (String) run[6];
+        QueryHandle queryHandle = null;
+        if (!queryHandleString.isEmpty()) {
+          queryHandle = QueryHandle.fromString((String) run[6]);
+        }
+        SchedulerJobInstanceState instanceStatus = 
SchedulerJobInstanceState.valueOf((String) run[7]);
+        SchedulerJobInstanceRun instanceRun = new SchedulerJobInstanceRun(id, 
runId, sessionHandle, starttime, endtime,
+          resultPath, queryHandle, instanceStatus);
+        runList.add(instanceRun);
+      }
+      return runList;
+    }
+
+    public List<SchedulerJobInstanceRun> 
getInstanceRuns(SchedulerJobInstanceState[] states) throws SQLException {
+      String whereClause = "";
+      for (SchedulerJobInstanceState state : states) {
+        whereClause += ((whereClause.isEmpty()) ? " WHERE " : " OR ") + 
COLUMN_STATUS + " = '" + state + "'";
+      }
+      String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_RUN_TABLE + 
whereClause;
+      List<Object[]> instanceRuns = runner.query(fetchSQL, 
multipleRowsHandler);
+      return processInstanceRun(instanceRuns);
+    }
   }
 
   /**
@@ -637,10 +668,10 @@ public class SchedulerDAO {
     @Override
     public void createJobTable() throws SQLException {
       String createSQL =
-          "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " 
VARCHAR(255) NOT NULL," + COLUMN_JOB
-              + " TEXT," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " 
VARCHAR(20)," + COLUMN_CREATED_ON
-              + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + 
COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( "
-              + COLUMN_ID + ")" + ")";
+        "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " 
VARCHAR(255) NOT NULL," + COLUMN_JOB + " TEXT,"
+          + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " VARCHAR(20)," + 
COLUMN_CREATED_ON + " BIGINT, "
+          + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + " 
VARCHAR(255), " + " PRIMARY KEY ( " + COLUMN_ID + ")"
+          + ")";
       runner.update(createSQL);
     }
 
@@ -650,9 +681,9 @@ public class SchedulerDAO {
     @Override
     public void createJobInstanceTable() throws SQLException {
       String createSQL =
-          "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
-              + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + 
COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( "
-              + COLUMN_ID + ")" + ")";
+        "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID 
+ " VARCHAR(255) NOT NULL, "
+          + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME 
+ " BIGINT, " + " PRIMARY KEY ( "
+          + COLUMN_ID + ")" + ")";
       runner.update(createSQL);
     }
 
@@ -662,12 +693,11 @@ public class SchedulerDAO {
     @Override
     public void createJobInstanceRunTable() throws SQLException {
       String createSQL =
-          "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
-              + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " 
VARCHAR(255), " + COLUMN_START_TIME
-              + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + 
COLUMN_RESULT_PATH + " TEXT, " + COLUMN_QUERY_HANDLE
-              + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " 
PRIMARY KEY ( " + COLUMN_ID + ", "
-              + COLUMN_RUN_ID
-              + ")" + ")";
+        "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
+          + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " 
VARCHAR(255), " + COLUMN_START_TIME
+          + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + 
" TEXT, " + COLUMN_QUERY_HANDLE
+          + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY 
KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID
+          + ")" + ")";
       runner.update(createSQL);
     }
   }
@@ -682,10 +712,10 @@ public class SchedulerDAO {
     @Override
     public void createJobTable() throws SQLException {
       String createSQL =
-          "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " 
VARCHAR(255) NOT NULL," + COLUMN_JOB
-              + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + 
COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON
-              + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + 
COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( "
-              + COLUMN_ID + ")" + ")";
+        "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " 
VARCHAR(255) NOT NULL," + COLUMN_JOB
+          + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS 
+ " VARCHAR(20)," + COLUMN_CREATED_ON
+          + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + 
" VARCHAR(255), " + " PRIMARY KEY ( "
+          + COLUMN_ID + ")" + ")";
       runner.update(createSQL);
     }
 
@@ -695,9 +725,9 @@ public class SchedulerDAO {
     @Override
     public void createJobInstanceTable() throws SQLException {
       String createSQL =
-          "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
-              + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + 
COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( "
-              + COLUMN_ID + ")" + ")";
+        "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID 
+ " VARCHAR(255) NOT NULL, "
+          + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME 
+ " BIGINT, " + " PRIMARY KEY ( "
+          + COLUMN_ID + ")" + ")";
       runner.update(createSQL);
     }
 
@@ -707,11 +737,11 @@ public class SchedulerDAO {
     @Override
     public void createJobInstanceRunTable() throws SQLException {
       String createSQL =
-          "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
-              + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " 
VARCHAR(255), " + COLUMN_START_TIME
-              + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + 
COLUMN_RESULT_PATH + " VARCHAR(1024),"
-              + COLUMN_QUERY_HANDLE + " VARCHAR(255), " + COLUMN_STATUS + " 
VARCHAR(20), " + " PRIMARY KEY ( "
-              + COLUMN_ID + ", " + COLUMN_RUN_ID + " )" + ")";
+        "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
+          + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " 
VARCHAR(255), " + COLUMN_START_TIME
+          + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + 
" VARCHAR(1024)," + COLUMN_QUERY_HANDLE
+          + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY 
KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID
+          + " )" + ")";
       runner.update(createSQL);
     }
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
index 077d531..4192134 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
@@ -79,9 +79,13 @@ public class SchedulerQueryEventListener extends 
AsyncEventListener<QueryEnded>
       latestRun.setEndTime(System.currentTimeMillis());
       latestRun.setInstanceState(state);
       latestRun.setResultPath(queryContext.getResultSetPath());
-      schedulerDAO.updateJobInstanceRun(latestRun);
-      log.info("Updated instance run {} for instance {} for job {} to {}", 
latestRun.getRunId(), info.getId(),
-        info.getJobId(), state);
+      if (schedulerDAO.updateJobInstanceRun(latestRun) == 1) {
+        log.info("Updated instance run {} for instance {} for job {} to {}", 
latestRun.getRunId(), info.getId(),
+          info.getJobId(), state);
+      } else {
+        log.error("Failed to update instance run {} for instance {} for job {} 
to {}", latestRun.getRunId(),
+          info.getId(), info.getJobId(), state);
+      }
     } catch (InvalidStateTransitionException e) {
       log.error("Instance Transition Failed ", e);
     }

http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
index 9cee0c2..969d740 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
@@ -29,7 +29,9 @@ import org.apache.lens.api.LensConf;
 import org.apache.lens.api.LensSessionHandle;
 import org.apache.lens.api.error.InvalidStateTransitionException;
 import org.apache.lens.api.error.LensCommonErrorCode;
+import org.apache.lens.api.query.LensQuery;
 import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.api.scheduler.*;
 import org.apache.lens.cube.parse.CubeQueryConfUtil;
 import org.apache.lens.server.BaseLensService;
@@ -86,13 +88,13 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
     super(NAME, cliService);
   }
 
+  @Override
   public synchronized void init(HiveConf hiveConf) {
     super.init(hiveConf);
     try {
       schedulerDAO = new SchedulerDAO(hiveConf);
       alarmService = LensServices.get().getService(AlarmService.NAME);
       queryService = LensServices.get().getService(QueryExecutionService.NAME);
-      // Get the listeners' classes from the configuration.
       this.schedulerEventListener = new SchedulerEventListener(schedulerDAO);
       this.schedulerQueryEventListener = new 
SchedulerQueryEventListener(schedulerDAO);
       getEventService().addListenerForType(schedulerEventListener, 
SchedulerAlarmEvent.class);
@@ -105,14 +107,112 @@ public class SchedulerServiceImpl extends 
BaseLensService implements SchedulerSe
   private void doesSessionBelongToUser(LensSessionHandle sessionHandle, String 
user) throws LensException {
     LensSessionImpl session = getSession(sessionHandle);
     if (!session.getLoggedInUser().equals(user)) {
+      log.warn("Session User {} is not equal to Job owner {}", 
session.getLoggedInUser(), user);
       throw new 
LensException(LensSchedulerErrorCode.CURRENT_USER_IS_NOT_SAME_AS_OWNER.getLensErrorInfo(),
 null,
         session.getLoggedInUser(), sessionHandle.getPublicId().toString(), 
user);
     }
   }
 
+  /**
+   * How the restarts are handled?
+   * Get all the instances with state Running or New.
+   * If They are running then check the query status. If Query is finished, 
take query parameters and update the
+   * instance. If query is still running then do nothing.
+   * If the state is New then Kill the instance and rerun it.
+   */
   @Override
   public synchronized void start() {
     super.start();
+    List<SchedulerJobInstanceRun> instanceRuns = schedulerDAO
+      .getInstanceRuns(SchedulerJobInstanceState.WAITING, 
SchedulerJobInstanceState.LAUNCHED,
+        SchedulerJobInstanceState.RUNNING);
+    for (SchedulerJobInstanceRun run : instanceRuns) {
+      LensSessionHandle sessionHandle = null;
+      try {
+        SchedulerJobInstanceInfo instanceInfo = 
schedulerDAO.getSchedulerJobInstanceInfo(run.getHandle());
+        log.info("Recovering instance {} of job {} ", instanceInfo.getId(), 
instanceInfo.getJobId());
+        switch (run.getInstanceState()) {
+        case WAITING:
+        case LAUNCHED:
+          // Kill and rerun
+          if (updateInstanceRun(run, SchedulerJobInstanceState.KILLED)) {
+            notifyRerun(instanceInfo);
+            log.info("Re-running instance {} of job {}", instanceInfo.getId(), 
instanceInfo.getJobId());
+          } else {
+            log.error("Not able to recover instance {} of job {}", 
instanceInfo.getId(), instanceInfo.getJobId());
+          }
+          break;
+        case RUNNING:
+          sessionHandle = 
openSessionAsUser(schedulerDAO.getUser(instanceInfo.getJobId()));
+          if (!checkQueryState(sessionHandle, run)) {
+            log.info("Re-running instance {} of job {}", instanceInfo.getId(), 
instanceInfo.getJobId());
+            notifyRerun(instanceInfo);
+          }
+          break;
+        }
+      } catch (LensException e) {
+        log.error("Not able to recover instance {} ", 
run.getHandle().getHandleIdString(), e);
+      } finally {
+        try {
+          if (sessionHandle != null) {
+            closeSession(sessionHandle);
+          }
+        } catch (Exception e) {
+          log.error("Error closing session ", e);
+        }
+      }
+    }
+  }
+
+  /**
+   * If query is not found of is invalid then rerun again else get the status 
and update correspondingly.
+   *
+   * @param sessionHandle
+   * @param run
+   * @return
+   * @throws LensException
+   */
+  private boolean checkQueryState(LensSessionHandle sessionHandle, 
SchedulerJobInstanceRun run) throws LensException {
+    QueryHandle queryHandle = run.getQueryHandle();
+    LensQuery query = null;
+    try {
+      query = this.queryService.getQuery(sessionHandle, queryHandle);
+    } catch (Exception e) {
+      updateInstanceRun(run, SchedulerJobInstanceState.KILLED);
+      return false;
+    }
+    if (query == null) {
+      // This means we have no idea what happened to query
+      // Mark it as Killed.
+      updateInstanceRun(run, SchedulerJobInstanceState.KILLED);
+      return false;
+    }
+    QueryStatus.Status status = query.getStatus().getStatus();
+    SchedulerJobInstanceState state = run.getInstanceState();
+    switch (status) {
+    case NEW:
+    case QUEUED:
+    case LAUNCHED:
+    case RUNNING:
+    case EXECUTED:
+      break;
+    case CANCELED:
+      state = SchedulerJobInstanceState.KILLED;
+      break;
+    case SUCCESSFUL:
+      state = SchedulerJobInstanceState.SUCCEEDED;
+      break;
+    case FAILED:
+      state = SchedulerJobInstanceState.FAILED;
+      break;
+    default:
+      // This should not happen
+      log.warn("Unexpected status {} for the query id {}", status, 
queryHandle);
+      state = SchedulerJobInstanceState.KILLED;
+    }
+    run.setResultPath(query.getResultSetPath());
+    updateInstanceRun(run, state);
+    return true;
   }
 
   /**
@@ -150,9 +250,8 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
     validateJob(job);
     SchedulerJobHandle handle = UtilityMethods.generateSchedulerJobHandle();
     long createdOn = System.currentTimeMillis();
-    long modifiedOn = createdOn;
     SchedulerJobInfo info = new SchedulerJobInfo(handle, job, 
session.getLoggedInUser(), SchedulerJobState.NEW,
-      createdOn, modifiedOn);
+      createdOn, createdOn);
     if (schedulerDAO.storeJob(info) == 1) {
       log.info("Successfully submitted job with handle {}", handle);
       return handle;
@@ -335,9 +434,7 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
     SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1);
     try {
       latestRun.getInstanceState().nextTransition(ON_RERUN);
-      getEventService().notifyEvent(
-        new SchedulerAlarmEvent(instanceInfo.getJobId(), new 
DateTime(instanceInfo.getScheduleTime()),
-          SchedulerAlarmEvent.EventType.SCHEDULE, instanceHandle));
+      notifyRerun(instanceInfo);
       log.info("Rerunning the instance with {} for job {} ", instanceHandle, 
instanceInfo.getJobId());
     } catch (InvalidStateTransitionException e) {
       throw new 
LensException(LensSchedulerErrorCode.INVALID_EVENT_FOR_JOB_INSTANCE.getLensErrorInfo(),
 e,
@@ -379,16 +476,22 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
     }
     QueryHandle handle = latestRun.getQueryHandle();
     if (handle == null || handle.getHandleIdString().isEmpty()) {
-      latestRun.setEndTime(System.currentTimeMillis());
-      latestRun.setInstanceState(state);
-      schedulerDAO.updateJobInstanceRun(latestRun);
-      log.info("Killing instance with {} for job {} ", instanceHandle, 
instanceInfo.getJobId());
-      return true;
+      log.info("Killing instance {} for job {} ", instanceInfo.getId(), 
instanceInfo.getJobId());
+      return updateInstanceRun(latestRun, state);
+    } else {
+      log.info("Killing instance {} for job {} with query handle {} ", 
instanceInfo.getId(),
+        instanceInfo.getJobId(), handle);
+      // This will cause the QueryEnd event which will set the status of the 
instance to KILLED.
+      return queryService.cancelQuery(sessionHandle, handle);
     }
-    log.info("Killing instance with {} for job {} with query handle {} ", 
instanceHandle, instanceInfo.getJobId(),
-      handle);
-    // This will cause the QueryEnd event which will set the status of the 
instance to KILLED.
-    return queryService.cancelQuery(sessionHandle, handle);
+
+  }
+
+  private boolean updateInstanceRun(SchedulerJobInstanceRun latestRun, 
SchedulerJobInstanceState state)
+    throws LensException {
+    latestRun.setEndTime(System.currentTimeMillis());
+    latestRun.setInstanceState(state);
+    return schedulerDAO.updateJobInstanceRun(latestRun) == 1;
   }
 
   /**
@@ -418,4 +521,10 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
         currentState.name(), info.getId().getHandleIdString());
     }
   }
+
+  private void notifyRerun(SchedulerJobInstanceInfo instanceInfo) throws 
LensException {
+    getEventService().notifyEvent(
+      new SchedulerAlarmEvent(instanceInfo.getJobId(), new 
DateTime(instanceInfo.getScheduleTime()),
+        SchedulerAlarmEvent.EventType.SCHEDULE, instanceInfo.getId()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/test/java/org/apache/lens/server/scheduler/SchedulerRestartTest.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/scheduler/SchedulerRestartTest.java
 
b/lens-server/src/test/java/org/apache/lens/server/scheduler/SchedulerRestartTest.java
new file mode 100644
index 0000000..de36499
--- /dev/null
+++ 
b/lens-server/src/test/java/org/apache/lens/server/scheduler/SchedulerRestartTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.scheduler;
+
+import static 
org.apache.lens.server.scheduler.util.SchedulerTestUtils.getTestJob;
+import static 
org.apache.lens.server.scheduler.util.SchedulerTestUtils.setupQueryService;
+
+import static org.mockito.Matchers.any;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.lens.api.LensSessionHandle;
+import org.apache.lens.api.query.LensQuery;
+import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.api.query.QueryStatus;
+import org.apache.lens.api.scheduler.*;
+import org.apache.lens.server.LensServerConf;
+import org.apache.lens.server.LensServices;
+import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.metrics.LensMetricsUtil;
+import org.apache.lens.server.api.scheduler.SchedulerService;
+import org.apache.lens.server.model.LogSegregationContext;
+import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext;
+import org.apache.lens.server.util.UtilityMethods;
+
+import org.joda.time.DateTime;
+import org.powermock.api.mockito.PowerMockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "scheduler-restart", dependsOnGroups = "duplicate-query")
+public class SchedulerRestartTest {
+
+  SchedulerServiceImpl scheduler;
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    System.setProperty(LensConfConstants.CONFIG_LOCATION, 
"target/test-classes/");
+    LensServices.get().init(LensServerConf.getHiveConf());
+    scheduler = LensServices.get().getService(SchedulerService.NAME);
+    setupQueryService(scheduler);
+    LensServices.get().start();
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public void testRestart() throws Exception {
+    long start = new DateTime().getMillis() - 3600000 * 24;
+    // One day
+    long end = start + 2 * 3600000 * 24;
+    XJob testJob = getTestJob("0 0 0 * * ?", "test query", start, end);
+    long currentTime = System.currentTimeMillis();
+    LensSessionHandle sessionHandle = scheduler.openSessionAsUser("admin");
+    SchedulerJobHandle jobHandle = 
scheduler.submitAndScheduleJob(sessionHandle, testJob);
+    Thread.sleep(5000);
+    List<SchedulerJobInstanceInfo> instanceInfoList = 
scheduler.getJobInstances(jobHandle, 10L);
+    Assert.assertEquals(instanceInfoList.size(), 1);
+
+    // Store new instance
+    SchedulerJobInstanceHandle instanceHandle = 
UtilityMethods.generateSchedulerJobInstanceHandle();
+    SchedulerJobInstanceInfo instance = new 
SchedulerJobInstanceInfo(instanceHandle, jobHandle, currentTime,
+      new ArrayList<SchedulerJobInstanceRun>());
+    SchedulerDAO store = scheduler.getSchedulerDAO();
+    // Manually Store instance
+    store.storeJobInstance(instance);
+    SchedulerJobInstanceRun run = new SchedulerJobInstanceRun(instanceHandle, 
instance.getInstanceRunList().size() + 1,
+      null, currentTime, 0, "N/A", null, SchedulerJobInstanceState.WAITING);
+    instance.getInstanceRunList().add(run);
+    store.storeJobInstanceRun(run);
+
+    // Restart Lens Services
+    LensServices.get().stop();
+    LensMetricsUtil.clearRegistry();
+    LogSegregationContext logSegregationContext = new 
MappedDiagnosticLogSegregationContext();
+    LensServices.setInstance(new LensServices(LensServices.LENS_SERVICES_NAME, 
logSegregationContext));
+    LensServices.get().init(LensServerConf.getHiveConf());
+    scheduler = LensServices.get().getService(SchedulerService.NAME);
+    setupQueryService(scheduler);
+    LensQuery mockedQuery = PowerMockito.mock(LensQuery.class);
+    QueryStatus mockStatus = PowerMockito.mock(QueryStatus.class);
+    
PowerMockito.when(mockStatus.getStatus()).thenReturn(QueryStatus.Status.SUCCESSFUL);
+    PowerMockito.when(mockedQuery.getStatus()).thenReturn(mockStatus);
+    
PowerMockito.when(mockedQuery.getStatus().getStatus()).thenReturn(QueryStatus.Status.SUCCESSFUL);
+    PowerMockito.when(mockedQuery.getResultSetPath()).thenReturn("/tmp/path");
+    
PowerMockito.when(scheduler.getQueryService().getQuery(any(LensSessionHandle.class),
 any(QueryHandle.class)))
+      .thenReturn(mockedQuery);
+    LensServices.get().start();
+
+    // Sleep for some time to let the event get processed
+    Thread.sleep(5000);
+    // This should have 2 instance Run
+    SchedulerJobInstanceInfo storedInfo = 
scheduler.getInstanceDetails(instanceHandle);
+    Assert.assertEquals(storedInfo.getInstanceRunList().size(), 2);
+    // The first instance  will be killed state after restart.
+    SchedulerJobInstanceInfo previousInstanceInfo = 
scheduler.getInstanceDetails(instanceInfoList.get(0).getId());
+    //Because we mocked the query, It should not rerun.
+    Assert.assertEquals(previousInstanceInfo.getInstanceRunList().size(), 1);
+    
Assert.assertEquals(previousInstanceInfo.getInstanceRunList().get(0).getResultPath(),
 "/tmp/path");
+    
Assert.assertEquals(previousInstanceInfo.getInstanceRunList().get(0).getInstanceState(),
+      SchedulerJobInstanceState.SUCCEEDED);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java
 
b/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java
index 130df5f..aa5c897 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/scheduler/TestSchedulerServiceImpl.java
@@ -18,31 +18,19 @@
  */
 package org.apache.lens.server.scheduler;
 
-import static 
org.apache.lens.server.scheduler.util.SchedulerTestUtils.getTestJob;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
+import static org.apache.lens.server.scheduler.util.SchedulerTestUtils.*;
 
 import java.util.List;
-import java.util.UUID;
 
-import org.apache.lens.api.LensConf;
 import org.apache.lens.api.LensSessionHandle;
-import org.apache.lens.api.query.QueryHandle;
 import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.api.scheduler.*;
 import org.apache.lens.server.EventServiceImpl;
 import org.apache.lens.server.LensServerConf;
 import org.apache.lens.server.LensServices;
 import org.apache.lens.server.api.LensConfConstants;
-import org.apache.lens.server.api.query.QueryContext;
-import org.apache.lens.server.api.query.QueryEnded;
-import org.apache.lens.server.api.query.QueryExecutionService;
 import org.apache.lens.server.api.scheduler.SchedulerService;
 
-import org.apache.hadoop.conf.Configuration;
-
-import org.powermock.api.mockito.PowerMockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -63,34 +51,7 @@ public class TestSchedulerServiceImpl {
     System.setProperty(LensConfConstants.CONFIG_LOCATION, 
"target/test-classes/");
   }
 
-  private void setupQueryService() throws Exception {
-    QueryExecutionService queryExecutionService = 
PowerMockito.mock(QueryExecutionService.class);
-    scheduler.setQueryService(queryExecutionService);
-    PowerMockito.when(
-      scheduler.getQueryService().estimate(anyString(), 
any(LensSessionHandle.class), anyString(), any(LensConf.class)))
-      .thenReturn(null);
-    PowerMockito.when(scheduler.getQueryService()
-      .executeAsync(any(LensSessionHandle.class), anyString(), 
any(LensConf.class), anyString()))
-      .thenReturn(new QueryHandle(UUID.randomUUID()));
-    
PowerMockito.when(scheduler.getQueryService().cancelQuery(any(LensSessionHandle.class),
 any(QueryHandle.class)))
-      .thenReturn(true);
-    
scheduler.getSchedulerEventListener().setQueryService(queryExecutionService);
-  }
 
-  private QueryEnded mockQueryEnded(SchedulerJobInstanceHandle instanceHandle, 
QueryStatus.Status status) {
-    QueryContext mockContext = PowerMockito.mock(QueryContext.class);
-    
PowerMockito.when(mockContext.getResultSetPath()).thenReturn("/tmp/query1/result");
-    Configuration conf = new Configuration();
-    // set the instance handle
-    conf.set("job_instance_key", instanceHandle.getHandleIdString());
-    PowerMockito.when(mockContext.getConf()).thenReturn(conf);
-    // Get the queryHandle.
-    PowerMockito.when(mockContext.getQueryHandle()).thenReturn(new 
QueryHandle(UUID.randomUUID()));
-    QueryEnded queryEnded = PowerMockito.mock(QueryEnded.class);
-    PowerMockito.when(queryEnded.getQueryContext()).thenReturn(mockContext);
-    PowerMockito.when(queryEnded.getCurrentValue()).thenReturn(status);
-    return queryEnded;
-  }
 
   @Test(priority = 1)
   public void testScheduler() throws Exception {
@@ -98,7 +59,7 @@ public class TestSchedulerServiceImpl {
     LensServices.get().start();
     scheduler = LensServices.get().getService(SchedulerService.NAME);
     eventService = LensServices.get().getService(EventServiceImpl.NAME);
-    setupQueryService();
+    setupQueryService(scheduler);
     LensSessionHandle sessionHandle = scheduler.openSessionAsUser(user);
     long currentTime = System.currentTimeMillis();
     XJob job = getTestJob("0/5 * * * * ?", queryString, currentTime, 
currentTime + 15000);

http://git-wip-us.apache.org/repos/asf/lens/blob/3ab732ac/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java
 
b/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java
index a36b2aa..d50474a 100644
--- 
a/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java
+++ 
b/lens-server/src/test/java/org/apache/lens/server/scheduler/util/SchedulerTestUtils.java
@@ -18,18 +18,35 @@
  */
 package org.apache.lens.server.scheduler.util;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+
 import java.util.GregorianCalendar;
+import java.util.UUID;
 
 import javax.xml.datatype.DatatypeFactory;
 import javax.xml.datatype.XMLGregorianCalendar;
 
+import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensSessionHandle;
+import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.api.query.QueryStatus;
 import org.apache.lens.api.scheduler.*;
+import org.apache.lens.server.api.query.QueryContext;
+import org.apache.lens.server.api.query.QueryEnded;
+import org.apache.lens.server.api.query.QueryExecutionService;
+import org.apache.lens.server.scheduler.SchedulerServiceImpl;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.powermock.api.mockito.PowerMockito;
 
 public class SchedulerTestUtils {
 
   private SchedulerTestUtils() {
 
   }
+
   private static XTrigger getTestTrigger(String cron) {
     XTrigger trigger = new XTrigger();
     XFrequency frequency = new XFrequency();
@@ -67,4 +84,33 @@ public class SchedulerTestUtils {
     job.setExecution(getTestExecution(query));
     return job;
   }
+
+  public static void setupQueryService(SchedulerServiceImpl scheduler) throws 
Exception {
+    QueryExecutionService queryExecutionService = 
PowerMockito.mock(QueryExecutionService.class);
+    scheduler.setQueryService(queryExecutionService);
+    PowerMockito
+      .when(queryExecutionService.estimate(anyString(), 
any(LensSessionHandle.class), anyString(), any(LensConf.class)))
+      .thenReturn(null);
+    PowerMockito.when(
+      queryExecutionService.executeAsync(any(LensSessionHandle.class), 
anyString(), any(LensConf.class), anyString()))
+      .thenReturn(new QueryHandle(UUID.randomUUID()));
+    
PowerMockito.when(queryExecutionService.cancelQuery(any(LensSessionHandle.class),
 any(QueryHandle.class)))
+      .thenReturn(true);
+    
scheduler.getSchedulerEventListener().setQueryService(queryExecutionService);
+  }
+
+  public static QueryEnded mockQueryEnded(SchedulerJobInstanceHandle 
instanceHandle, QueryStatus.Status status) {
+    QueryContext mockContext = PowerMockito.mock(QueryContext.class);
+    
PowerMockito.when(mockContext.getResultSetPath()).thenReturn("/tmp/query1/result");
+    Configuration conf = new Configuration();
+    // set the instance handle
+    conf.set("job_instance_key", instanceHandle.getHandleIdString());
+    PowerMockito.when(mockContext.getConf()).thenReturn(conf);
+    // Get the queryHandle.
+    PowerMockito.when(mockContext.getQueryHandle()).thenReturn(new 
QueryHandle(UUID.randomUUID()));
+    QueryEnded queryEnded = PowerMockito.mock(QueryEnded.class);
+    PowerMockito.when(queryEnded.getQueryContext()).thenReturn(mockContext);
+    PowerMockito.when(queryEnded.getCurrentValue()).thenReturn(status);
+    return queryEnded;
+  }
 }

Reply via email to