http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
index bf99fde..7a2b06a 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java
@@ -28,10 +28,11 @@ import javax.xml.bind.JAXBElement;
 
 import org.apache.lens.api.LensSessionHandle;
 import org.apache.lens.api.ToXMLString;
+import org.apache.lens.api.query.QueryHandle;
 import org.apache.lens.api.scheduler.*;
 import org.apache.lens.server.api.LensConfConstants;
 import org.apache.lens.server.api.error.LensException;
-import org.apache.lens.server.scheduler.util.UtilityMethods;
+import org.apache.lens.server.util.UtilityMethods;
 
 import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.commons.dbutils.QueryRunner;
@@ -54,9 +55,10 @@ public class SchedulerDAO {
       Class dbStoreClass = Class
           .forName(conf.get(LensConfConstants.SCHEDULER_STORE_CLASS, 
SchedulerHsqlDBStore.class.getName()));
       this.store = (SchedulerDBStore) dbStoreClass.newInstance();
-      this.store.init(UtilityMethods.getDataSourceFromConf(conf));
+      this.store.init(UtilityMethods.getDataSourceFromConfForScheduler(conf));
       this.store.createJobTable();
-      this.store.createJobInstaceTable();
+      this.store.createJobInstanceTable();
+      this.store.createJobInstanceRunTable();
     } catch (SQLException e) {
       log.error("Error creating job tables", e);
       throw new LensException("Error creating job tables ", e);
@@ -115,16 +117,31 @@ public class SchedulerDAO {
   }
 
   /**
-   * Gets the Job state
+   * Get the user of the job who submitted the job.
+   *
+   * @param id
+   * @return
+   */
+  public String getUser(SchedulerJobHandle id) {
+    try {
+      return store.getUser(id.getHandleIdString());
+    } catch (SQLException e) {
+      log.error("Error while getting the user for the job with handle " + 
id.getHandleIdString(), e);
+      return null;
+    }
+  }
+
+  /**
+   * Gets the Job status
    *
    * @param id : Job handle id.
-   * @return SchedulerJobState of the job.
+   * @return SchedulerJobStatus of the job.
    */
-  public SchedulerJobStatus getJobState(SchedulerJobHandle id) {
+  public SchedulerJobState getJobState(SchedulerJobHandle id) {
     try {
       return store.getJobState(id.getHandleIdString());
     } catch (SQLException e) {
-      log.error("Error while getting the job state for " + 
id.getHandleIdString(), e);
+      log.error("Error while getting the job status for " + 
id.getHandleIdString(), e);
       return null;
     }
   }
@@ -145,16 +162,16 @@ public class SchedulerDAO {
   }
 
   /**
-   * Updates the job state form the new SchedulerJobInfo
+   * Updates the job status form the new SchedulerJobInfo
    *
    * @param info: Updated info objects
    * @return number of rows updated.
    */
-  public int updateJobState(SchedulerJobInfo info) {
+  public int updateJobStatus(SchedulerJobInfo info) {
     try {
-      return store.updateJobState(info.getId().getHandleIdString(), 
info.getState().name(), info.getModifiedOn());
+      return store.updateJobStatus(info.getId().getHandleIdString(), 
info.getJobState().name(), info.getModifiedOn());
     } catch (SQLException e) {
-      log.error("Error while updating job state for " + 
info.getId().getHandleIdString(), e);
+      log.error("Error while updating job status for " + 
info.getId().getHandleIdString(), e);
       return 0;
     }
   }
@@ -168,6 +185,17 @@ public class SchedulerDAO {
     }
   }
 
+  public int storeJobInstanceRun(SchedulerJobInstanceRun instanceRun) {
+    try {
+      return store.insertIntoJobInstanceRunTable(instanceRun);
+    } catch (SQLException e) {
+      log.error(
+          "Error while storing job instance run for " + instanceRun.getRunId() 
+ " and instance handle " + instanceRun
+              .getHandle().getHandleIdString(), e);
+      return 0;
+    }
+  }
+
   /**
    * Gets the SchedulerJobInstanceInfo corresponding instance handle id.
    *
@@ -184,16 +212,17 @@ public class SchedulerDAO {
   }
 
   /**
-   * Updates the instance state
+   * Updates the instance status
    *
-   * @param info: Updated instance info
+   * @param instanceRun : instance Run object
    * @return number of rows updated.
    */
-  public int updateJobInstanceState(SchedulerJobInstanceInfo info) {
+  public int updateJobInstanceRun(SchedulerJobInstanceRun instanceRun) {
     try {
-      return store.updateJobInstanceState(info.getId().getHandleIdString(), 
info.getState().name());
+      return store.updateJobInstanceRun(instanceRun);
     } catch (SQLException e) {
-      log.error("Error while updating the job instance state for " + 
info.getId().getHandleIdString(), e);
+      log.error("Error while updating the job instance status for " + 
instanceRun.getHandle().getHandleIdString()
+          + " and run: " + instanceRun.getRunId(), e);
       return 0;
     }
   }
@@ -204,7 +233,7 @@ public class SchedulerDAO {
    * @param id: Job handle id.
    * @return List of instance handles.
    */
-  public List<SchedulerJobInstanceHandle> getJobInstances(SchedulerJobHandle 
id) {
+  public List<SchedulerJobInstanceInfo> getJobInstances(SchedulerJobHandle id) 
{
     // TODO: Add number of results to be fetched
     try {
       return store.getAllJobInstances(id.getHandleIdString());
@@ -218,15 +247,29 @@ public class SchedulerDAO {
    * Gets all jobs which match the filter requirements.
    *
    * @param username  : User name of the job
-   * @param state     : State of the job
+   * @param jobState  : state of the job
    * @param startTime : Created on should be greater than this start time.
    * @param endTime   : Created on should be less than the end time.
    * @return List of Job handles
    */
-  public List<SchedulerJobHandle> getJobs(String username, SchedulerJobStatus 
state, Long startTime,
-      Long endTime) {
+  public List<SchedulerJobHandle> getJobs(String username, SchedulerJobState 
jobState, Long startTime, Long endTime) {
+    try {
+      return store.getJobs(username, jobState == null ? null : 
jobState.name(), startTime, endTime);
+    } catch (SQLException e) {
+      log.error("Error while getting jobs ", e);
+      return null;
+    }
+  }
+
+  /**
+   * Get the handle given the job name.
+   *
+   * @param jobName
+   * @return
+   */
+  public List<SchedulerJobHandle> getJob(String jobName) {
     try {
-      return store.getJobs(username, state == null ? null : state.name(), 
startTime, endTime);
+      return store.getJobsByName(jobName);
     } catch (SQLException e) {
       log.error("Error while getting jobs ", e);
       return null;
@@ -236,18 +279,22 @@ public class SchedulerDAO {
   public abstract static class SchedulerDBStore {
     protected static final String JOB_TABLE = "job_table";
     protected static final String JOB_INSTANCE_TABLE = "job_instance_table";
+    protected static final String JOB_INSTANCE_RUN_TABLE = 
"job_instance_run_table";
     protected static final String COLUMN_ID = "id";
+    protected static final String COLUMN_RUN_ID = "runid";
     protected static final String COLUMN_JOB = "job";
     protected static final String COLUMN_USER = "username";
-    protected static final String COLUMN_STATE = "state";
+    protected static final String COLUMN_STATUS = "status";
     protected static final String COLUMN_CREATED_ON = "createdon";
+    protected static final String COLUMN_SCHEDULE_TIME = "schedultime";
     protected static final String COLUMN_MODIFIED_ON = "modifiedon";
     protected static final String COLUMN_JOB_ID = "jobid";
     protected static final String COLUMN_SESSION_HANDLE = "sessionhandle";
     protected static final String COLUMN_START_TIME = "starttime";
     protected static final String COLUMN_END_TIME = "endtime";
     protected static final String COLUMN_RESULT_PATH = "resultpath";
-    protected static final String COLUMN_QUERY = "query";
+    protected static final String COLUMN_QUERY_HANDLE = "queryhandle";
+    protected static final String COLUMN_JOB_NAME = "jobname";
     protected QueryRunner runner;
     protected ObjectFactory jobFactory = new ObjectFactory();
     // Generic multiple row handler for the fetch query.
@@ -289,7 +336,14 @@ public class SchedulerDAO {
      *
      * @throws SQLException
      */
-    public abstract void createJobInstaceTable() throws SQLException;
+    public abstract void createJobInstanceTable() throws SQLException;
+
+    /**
+     * Creates the job instance run table
+     *
+     * @throws SQLException
+     */
+    public abstract void createJobInstanceRunTable() throws SQLException;
 
     /**
      * Inserts the Job info object into job table
@@ -299,10 +353,10 @@ public class SchedulerDAO {
      * @throws SQLException
      */
     public int insertIntoJobTable(SchedulerJobInfo jobInfo) throws 
SQLException {
-      String insertSQL = "INSERT INTO " + JOB_TABLE + " VALUES(?,?,?,?,?,?)";
+      String insertSQL = "INSERT INTO " + JOB_TABLE + " VALUES(?,?,?,?,?,?,?)";
       JAXBElement<XJob> xmlJob = jobFactory.createJob(jobInfo.getJob());
       return runner.update(insertSQL, jobInfo.getId().toString(), 
ToXMLString.toString(xmlJob), jobInfo.getUserName(),
-          jobInfo.getState().name(), jobInfo.getCreatedOn(), 
jobInfo.getModifiedOn());
+          jobInfo.getJobState().name(), jobInfo.getCreatedOn(), 
jobInfo.getModifiedOn(), jobInfo.getJob().getName());
     }
 
     /**
@@ -313,12 +367,19 @@ public class SchedulerDAO {
      * @throws SQLException
      */
     public int insertIntoJobInstanceTable(SchedulerJobInstanceInfo 
instanceInfo) throws SQLException {
-      String insertSQL = "INSERT INTO " + JOB_INSTANCE_TABLE + " 
VALUES(?,?,?,?,?,?,?,?,?)";
+      String insertSQL = "INSERT INTO " + JOB_INSTANCE_TABLE + " 
VALUES(?,?,?)";
       return runner
           .update(insertSQL, instanceInfo.getId().getHandleIdString(), 
instanceInfo.getJobId().getHandleIdString(),
-              instanceInfo.getSessionHandle().toString(), 
instanceInfo.getStartTime(), instanceInfo.getEndTime(),
-              instanceInfo.getResultPath(), instanceInfo.getQuery(), 
instanceInfo.getState().name(),
-              instanceInfo.getCreatedOn());
+              instanceInfo.getScheduleTime());
+    }
+
+    public int insertIntoJobInstanceRunTable(SchedulerJobInstanceRun 
instanceRun) throws SQLException {
+      String insetSQL = "INSERT INTO " + JOB_INSTANCE_RUN_TABLE + " 
VALUES(?,?,?,?,?,?,?,?)";
+      return runner.update(insetSQL, 
instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(),
+          instanceRun.getSessionHandle().toString(), 
instanceRun.getStartTime(), instanceRun.getEndTime(),
+          instanceRun.getResultPath(),
+          instanceRun.getQueryHandle() == null ? "" : 
instanceRun.getQueryHandle().getHandleIdString(),
+          instanceRun.getInstanceState().name());
     }
 
     /**
@@ -338,10 +399,11 @@ public class SchedulerDAO {
         SchedulerJobHandle id = SchedulerJobHandle.fromString((String) 
jobInfo[0]);
         XJob xJob = ToXMLString.valueOf((String) jobInfo[1], 
ObjectFactory.class);
         String userName = (String) jobInfo[2];
-        String state = (String) jobInfo[3];
+        String status = (String) jobInfo[3];
         long createdOn = (Long) jobInfo[4];
         long modifiedOn = (Long) jobInfo[5];
-        return new SchedulerJobInfo(id, xJob, userName, 
SchedulerJobStatus.valueOf(state), createdOn, modifiedOn);
+        SchedulerJobState jobState = SchedulerJobState.valueOf(status);
+        return new SchedulerJobInfo(id, xJob, userName, jobState, createdOn, 
modifiedOn);
       }
     }
 
@@ -363,19 +425,37 @@ public class SchedulerDAO {
     }
 
     /**
-     * Gets the job state
+     * Get the job handles given the job name
+     *
+     * @param jobname
+     * @return A list of handles
+     * @throws SQLException
+     */
+    public List<SchedulerJobHandle> getJobsByName(String jobname) throws 
SQLException {
+      String fetchSQL = "SELCET " + COLUMN_ID + " FROM " + JOB_TABLE + " WHERE 
" + COLUMN_JOB_NAME + "=?";
+      List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, 
jobname);
+      List<SchedulerJobHandle> resOut = new ArrayList<>();
+      for (int i = 0; i < result.size(); i++) {
+        Object[] row = result.get(i);
+        resOut.add(SchedulerJobHandle.fromString((String) row[0]));
+      }
+      return resOut;
+    }
+
+    /**
+     * Gets the job status
      *
      * @param id
-     * @return SchedulerJobState
+     * @return SchedulerJobStatus
      * @throws SQLException
      */
-    public SchedulerJobStatus getJobState(String id) throws SQLException {
-      String fetchSQL = "SELECT " + COLUMN_STATE + " FROM " + JOB_TABLE + " 
WHERE " + COLUMN_ID + "=?";
+    public SchedulerJobState getJobState(String id) throws SQLException {
+      String fetchSQL = "SELECT " + COLUMN_STATUS + " FROM " + JOB_TABLE + " 
WHERE " + COLUMN_ID + "=?";
       List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, id);
       if (result.size() == 0) {
         return null;
       } else {
-        return SchedulerJobStatus.valueOf((String) result.get(0)[0]);
+        return SchedulerJobState.valueOf((String) result.get(0)[0]);
       }
     }
 
@@ -383,20 +463,20 @@ public class SchedulerDAO {
      * Gets all the jobs which match the filter requirements.
      *
      * @param username
-     * @param state
+     * @param status
      * @param starttime
      * @param endtime
      * @return the list of job handles.
      * @throws SQLException
      */
-    public List<SchedulerJobHandle> getJobs(String username, String state, 
Long starttime, Long endtime)
+    public List<SchedulerJobHandle> getJobs(String username, String status, 
Long starttime, Long endtime)
       throws SQLException {
       String whereClause = "";
       if (username != null && !username.isEmpty()) {
-        whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + 
COLUMN_USER + " = '" + username+"'";
+        whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + 
COLUMN_USER + " = '" + username + "'";
       }
-      if (state != null && !state.isEmpty()) {
-        whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + 
COLUMN_STATE + " = '" + state + "'";
+      if (status != null && !status.isEmpty()) {
+        whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + 
COLUMN_STATUS + " = '" + status + "'";
       }
       if (starttime != null && starttime > 0) {
         whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + 
COLUMN_CREATED_ON + " >= " + starttime;
@@ -432,19 +512,19 @@ public class SchedulerDAO {
     }
 
     /**
-     * Updates the job state into the job table
+     * Updates the job status into the job table
      *
      * @param id
-     * @param state
+     * @param status
      * @param modifiedOn
      * @return number of rows updated.
      * @throws SQLException
      */
-    public int updateJobState(String id, String state, long modifiedOn) throws 
SQLException {
+    public int updateJobStatus(String id, String status, long modifiedOn) 
throws SQLException {
       String updateSQL =
-          "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATE + "=?, " + 
COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID
+          "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATUS + "=?, " + 
COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID
               + "=?";
-      return runner.update(updateSQL, state, modifiedOn, id);
+      return runner.update(updateSQL, status, modifiedOn, id);
     }
 
     /**
@@ -461,31 +541,54 @@ public class SchedulerDAO {
         return null;
       } else {
         Object[] instanceInfo = result.get(0);
-        SchedulerJobInstanceHandle id = 
SchedulerJobInstanceHandle.fromString((String) instanceInfo[0]);
-        SchedulerJobHandle jobId = SchedulerJobHandle.fromString((String) 
instanceInfo[1]);
-        LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) 
instanceInfo[2]);
-        long starttime = (Long) instanceInfo[3];
-        long endtime = (Long) instanceInfo[4];
-        String resultPath = (String) instanceInfo[5];
-        String query = (String) instanceInfo[6];
-        SchedulerJobInstanceStatus state = 
SchedulerJobInstanceStatus.valueOf((String) instanceInfo[7]);
-        long createdOn = (Long) instanceInfo[8];
-        return new SchedulerJobInstanceInfo(id, jobId, sessionHandle, 
starttime, endtime, resultPath, query, state,
-            createdOn);
+        return parseSchedulerInstance(instanceInfo);
+      }
+    }
+
+    private SchedulerJobInstanceInfo parseSchedulerInstance(Object[] 
instanceInfo) throws SQLException {
+      SchedulerJobInstanceHandle id = 
SchedulerJobInstanceHandle.fromString((String) instanceInfo[0]);
+      SchedulerJobHandle jobId = SchedulerJobHandle.fromString((String) 
instanceInfo[1]);
+      long createdOn = (Long) instanceInfo[2];
+      // Get the Runs
+      String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_RUN_TABLE + " WHERE " 
+ COLUMN_ID + "=?";
+      List<Object[]> instanceRuns = runner.query(fetchSQL, 
multipleRowsHandler, (String) instanceInfo[0]);
+      List<SchedulerJobInstanceRun> runList = new ArrayList<>();
+      for (Object[] run : instanceRuns) {
+        // run[0] will contain the instanceID
+        int runId = (Integer) run[1];
+        LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) 
run[2]);
+        long starttime = (Long) run[3];
+        long endtime = (Long) run[4];
+        String resultPath = (String) run[5];
+        String queryHandleString = (String) run[6];
+        QueryHandle queryHandle = null;
+        if (!queryHandleString.isEmpty()) {
+          queryHandle = QueryHandle.fromString((String) run[6]);
+        }
+        SchedulerJobInstanceState instanceStatus = 
SchedulerJobInstanceState.valueOf((String) run[7]);
+        SchedulerJobInstanceRun instanceRun = new SchedulerJobInstanceRun(id, 
runId, sessionHandle, starttime, endtime,
+            resultPath, queryHandle, instanceStatus);
+        runList.add(instanceRun);
       }
+      return new SchedulerJobInstanceInfo(id, jobId, createdOn, runList);
     }
 
     /**
-     * Updates the state of a job instance.
+     * Updates the status of a job instance.
      *
-     * @param id
-     * @param state
+     * @param instanceRun
      * @return number of rows updated.
      * @throws SQLException
      */
-    public int updateJobInstanceState(String id, String state) throws 
SQLException {
-      String updateSQL = "UPDATE " + JOB_INSTANCE_TABLE + " SET " + 
COLUMN_STATE + "=?" + " WHERE " + COLUMN_ID + "=?";
-      return runner.update(updateSQL, state, id);
+    public int updateJobInstanceRun(SchedulerJobInstanceRun instanceRun) 
throws SQLException {
+      String updateSQL =
+          "UPDATE " + JOB_INSTANCE_RUN_TABLE + " SET " + COLUMN_END_TIME + 
"=?, " + COLUMN_RESULT_PATH + "=?, "
+              + COLUMN_QUERY_HANDLE + "=?, " + COLUMN_STATUS + "=?" + " WHERE 
" + COLUMN_ID + "=? AND " + COLUMN_RUN_ID
+              + "=?";
+
+      return runner.update(updateSQL, instanceRun.getEndTime(), 
instanceRun.getResultPath(),
+          instanceRun.getQueryHandle() == null ? "" : 
instanceRun.getQueryHandle().getHandleIdString(),
+          instanceRun.getInstanceState().name(), 
instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId());
     }
 
     /**
@@ -495,17 +598,33 @@ public class SchedulerDAO {
      * @return List of SchedulerJobInstanceHandle
      * @throws SQLException
      */
-    public List<SchedulerJobInstanceHandle> getAllJobInstances(String jobId) 
throws SQLException {
-      String fetchSQL = "SELECT " + COLUMN_ID + " FROM " + JOB_INSTANCE_TABLE 
+ " WHERE " + COLUMN_JOB_ID + "=?";
+    public List<SchedulerJobInstanceInfo> getAllJobInstances(String jobId) 
throws SQLException {
+      String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_TABLE + " WHERE " + 
COLUMN_JOB_ID + "=?";
       List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, 
jobId);
-      List<SchedulerJobInstanceHandle> resOut = new ArrayList<>();
+      List<SchedulerJobInstanceInfo> resOut = new ArrayList<>();
       for (int i = 0; i < result.size(); i++) {
         Object[] row = result.get(i);
-        resOut.add(SchedulerJobInstanceHandle.fromString((String) row[0]));
+        resOut.add(parseSchedulerInstance(row));
       }
       return resOut;
     }
 
+    /**
+     * Get the user who submitted the job.
+     *
+     * @param id
+     * @return
+     * @throws SQLException
+     */
+    public String getUser(String id) throws SQLException {
+      String fetchSQL = "SELECT " + COLUMN_USER + " FROM " + JOB_TABLE + " 
WHERE " + COLUMN_ID + "=?";
+      List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, id);
+      if (result.size() == 0) {
+        return null;
+      } else {
+        return (String) result.get(0)[0];
+      }
+    }
   }
 
   /**
@@ -519,8 +638,9 @@ public class SchedulerDAO {
     public void createJobTable() throws SQLException {
       String createSQL =
           "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " 
VARCHAR(255) NOT NULL," + COLUMN_JOB
-              + " TEXT," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATE + " 
VARCHAR(20)," + COLUMN_CREATED_ON
-              + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + " PRIMARY KEY 
( " + COLUMN_ID + ")" + ")";
+              + " TEXT," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " 
VARCHAR(20)," + COLUMN_CREATED_ON
+              + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + 
COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( "
+              + COLUMN_ID + ")" + ")";
       runner.update(createSQL);
     }
 
@@ -528,13 +648,25 @@ public class SchedulerDAO {
      * {@inheritDoc}
      */
     @Override
-    public void createJobInstaceTable() throws SQLException {
+    public void createJobInstanceTable() throws SQLException {
       String createSQL =
           "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
-              + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + 
COLUMN_SESSION_HANDLE + " VARCHAR(255), "
-              + COLUMN_START_TIME + " BIGINT, " + COLUMN_END_TIME + " BIGINT, 
" + COLUMN_RESULT_PATH + " TEXT, "
-              + COLUMN_QUERY + " TEXT, " + COLUMN_STATE + " VARCHAR(20), " + 
COLUMN_CREATED_ON + " BIGINT, "
-              + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")";
+              + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + 
COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( "
+              + COLUMN_ID + ")" + ")";
+      runner.update(createSQL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void createJobInstanceRunTable() throws SQLException {
+      String createSQL =
+          "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
+              + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " 
VARCHAR(255), " + COLUMN_START_TIME
+              + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + 
COLUMN_RESULT_PATH + " TEXT, " + COLUMN_QUERY_HANDLE
+              + " TEXT, " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( 
" + COLUMN_ID + ", " + COLUMN_RUN_ID
+              + ")" + ")";
       runner.update(createSQL);
     }
   }
@@ -550,8 +682,9 @@ public class SchedulerDAO {
     public void createJobTable() throws SQLException {
       String createSQL =
           "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " 
VARCHAR(255) NOT NULL," + COLUMN_JOB
-              + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + 
COLUMN_STATE + " VARCHAR(20)," + COLUMN_CREATED_ON
-              + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + " PRIMARY KEY 
( " + COLUMN_ID + ")" + ")";
+              + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + 
COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON
+              + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + 
COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( "
+              + COLUMN_ID + ")" + ")";
       runner.update(createSQL);
     }
 
@@ -559,13 +692,25 @@ public class SchedulerDAO {
      * {@inheritDoc}
      */
     @Override
-    public void createJobInstaceTable() throws SQLException {
+    public void createJobInstanceTable() throws SQLException {
       String createSQL =
           "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
-              + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + 
COLUMN_SESSION_HANDLE + " VARCHAR(255), "
-              + COLUMN_START_TIME + " BIGINT, " + COLUMN_END_TIME + " BIGINT, 
" + COLUMN_RESULT_PATH + " VARCHAR(1024),"
-              + COLUMN_QUERY + " VARCHAR(1024), " + COLUMN_STATE + " 
VARCHAR(20), " + COLUMN_CREATED_ON + " BIGINT, "
-              + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")";
+              + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + 
COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( "
+              + COLUMN_ID + ")" + ")";
+      runner.update(createSQL);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void createJobInstanceRunTable() throws SQLException {
+      String createSQL =
+          "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + 
COLUMN_ID + " VARCHAR(255) NOT NULL, "
+              + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " 
VARCHAR(255), " + COLUMN_START_TIME
+              + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + 
COLUMN_RESULT_PATH + " VARCHAR(1024),"
+              + COLUMN_QUERY_HANDLE + " VARCHAR(1024), " + COLUMN_STATUS + " 
VARCHAR(20), " + " PRIMARY KEY ( "
+              + COLUMN_ID + ", " + COLUMN_RUN_ID + " )" + ")";
       runner.update(createSQL);
     }
   }

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java
 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java
new file mode 100644
index 0000000..7323add
--- /dev/null
+++ 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.scheduler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.lens.api.LensConf;
+import org.apache.lens.api.LensSessionHandle;
+import org.apache.lens.api.error.InvalidStateTransitionException;
+import org.apache.lens.api.query.QueryHandle;
+import org.apache.lens.api.scheduler.*;
+import org.apache.lens.server.LensServices;
+import org.apache.lens.server.api.LensConfConstants;
+import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.api.events.AsyncEventListener;
+import org.apache.lens.server.api.events.SchedulerAlarmEvent;
+import org.apache.lens.server.api.query.QueryExecutionService;
+import org.apache.lens.server.api.scheduler.SchedulerService;
+import org.apache.lens.server.query.QueryExecutionServiceImpl;
+import org.apache.lens.server.util.UtilityMethods;
+
+import org.joda.time.DateTime;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SchedulerEventListener extends 
AsyncEventListener<SchedulerAlarmEvent> {
+  private static final int CORE_POOL_SIZE = 10;
+  private static final String JOB_INSTANCE_ID_KEY = "job_instance_key";
+  @Getter
+  @Setter
+  @VisibleForTesting
+  protected QueryExecutionService queryService;
+  private SchedulerDAO schedulerDAO;
+  private SchedulerService schedulerService;
+
+  public SchedulerEventListener(SchedulerDAO schedulerDAO) {
+    super(CORE_POOL_SIZE);
+    this.queryService = 
LensServices.get().getService(QueryExecutionService.NAME);
+    this.schedulerService = 
LensServices.get().getService(SchedulerService.NAME);
+    this.schedulerDAO = schedulerDAO;
+  }
+
+  /**
+   * @param event the event
+   */
+  @Override
+  public void process(SchedulerAlarmEvent event) {
+    DateTime scheduledTime = event.getNominalTime();
+    SchedulerJobHandle jobHandle = event.getJobHandle();
+    if (event.getType() == SchedulerAlarmEvent.EventType.EXPIRE) {
+      try {
+        schedulerService.expireJob(null, jobHandle);
+      } catch (LensException e) {
+        log.error("Error while expiring the job", e);
+      }
+      return;
+    }
+    /*
+     * Get the job from the store.
+     * Create an instance.
+     * Store the instance.
+     * Try to run the instance.
+     * If successfully submitted change the status to running.
+     * Otherwise update the status to killed.
+     */
+    //TODO: Get the job status and if it is not Scheduled, don't do anything.
+    XJob job = schedulerDAO.getJob(jobHandle);
+    String user = schedulerDAO.getUser(jobHandle);
+    SchedulerJobInstanceHandle instanceHandle = event.getPreviousInstance() == 
null
+                                                ? 
UtilityMethods.generateSchedulerJobInstanceHandle()
+                                                : event.getPreviousInstance();
+    Map<String, String> conf = new HashMap<>();
+    LensSessionHandle sessionHandle = null;
+    try {
+      // Open the session with the user who submitted the job.
+      sessionHandle = ((QueryExecutionServiceImpl) 
LensServices.get().getService(QueryExecutionServiceImpl.NAME))
+          .openSession(user, "dummy", conf, false);
+    } catch (LensException e) {
+      log.error("Error occurred while opening a session ", e);
+      return;
+    }
+    SchedulerJobInstanceInfo instance = null;
+    SchedulerJobInstanceRun run = null;
+    // Session needs to be closed after the launch.
+    try {
+      long scheduledTimeMillis = scheduledTime.getMillis();
+      String query = job.getExecution().getQuery().getQuery();
+      List<MapType> jobConf = job.getExecution().getQuery().getConf();
+      LensConf queryConf = new LensConf();
+      for (MapType element : jobConf) {
+        queryConf.addProperty(element.getKey(), element.getValue());
+      }
+      queryConf.addProperty(JOB_INSTANCE_ID_KEY, instanceHandle.getHandleId());
+      // Current time is used for resolving date.
+      queryConf.addProperty(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, 
scheduledTime.getMillis());
+      String queryName = job.getName();
+      queryName += "-" + scheduledTime.getMillis();
+      // If the instance is new then create otherwise get from the store
+      if (event.getPreviousInstance() == null) {
+        instance = new SchedulerJobInstanceInfo(instanceHandle, jobHandle, 
scheduledTimeMillis,
+            new ArrayList<SchedulerJobInstanceRun>());
+      } else {
+        instance = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle);
+      }
+      // Next run of the instance
+      long currentTime = System.currentTimeMillis();
+      run = new SchedulerJobInstanceRun(instanceHandle, 
instance.getInstanceRunList().size() + 1, sessionHandle,
+          currentTime, 0, "N/A", null, SchedulerJobInstanceState.WAITING);
+      instance.getInstanceRunList().add(run);
+      boolean success;
+      if (event.getPreviousInstance() == null) {
+        success = schedulerDAO.storeJobInstance(instance) == 1;
+        if (!success) {
+          log.error(
+              "Exception occurred while storing the instance for instance 
handle " + instance + " of job " + jobHandle);
+          return;
+        }
+      }
+      success = schedulerDAO.storeJobInstanceRun(run) == 1;
+      if (!success) {
+        log.error(
+            "Exception occurred while storing the instance for instance handle 
" + instance + " of job " + jobHandle);
+        return;
+      }
+
+      QueryHandle handle = queryService.executeAsync(sessionHandle, query, 
queryConf, queryName);
+      run.setQueryHandle(handle);
+      
run.setInstanceState(run.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_RUN));
+      run.setEndTime(System.currentTimeMillis());
+      schedulerDAO.updateJobInstanceRun(run);
+    } catch (LensException | InvalidStateTransitionException e) {
+      log.error(
+          "Exception occurred while launching the job instance for " + 
jobHandle + " for nominal time " + scheduledTime
+              .getMillis(), e);
+      try {
+        
run.setInstanceState(run.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_FAILURE));
+        run.setEndTime(System.currentTimeMillis());
+        schedulerDAO.updateJobInstanceRun(run);
+      } catch (InvalidStateTransitionException e1) {
+        log.error("Can't make transition for instance " + instance.getId() + " 
of job " + instance.getJobId(), e);
+      }
+    } finally {
+      try {
+        ((QueryExecutionServiceImpl) 
LensServices.get().getService(QueryExecutionServiceImpl.NAME))
+            .closeSession(sessionHandle);
+      } catch (LensException e) {
+        log.error("Error closing session ", e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
new file mode 100644
index 0000000..5b12720
--- /dev/null
+++ 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.lens.server.scheduler;
+
+import java.util.List;
+
+import org.apache.lens.api.error.InvalidStateTransitionException;
+import org.apache.lens.api.query.QueryStatus;
+import org.apache.lens.api.scheduler.*;
+import org.apache.lens.server.api.events.AsyncEventListener;
+import org.apache.lens.server.api.query.QueryContext;
+import org.apache.lens.server.api.query.QueryEnded;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SchedulerQueryEventListener extends 
AsyncEventListener<QueryEnded> {
+  private static final String JOB_INSTANCE_ID_KEY = "job_instance_key";
+  private static final int CORE_POOL_SIZE = 10;
+  private SchedulerDAO schedulerDAO;
+
+  public SchedulerQueryEventListener(SchedulerDAO schedulerDAO) {
+    super(CORE_POOL_SIZE);
+    this.schedulerDAO = schedulerDAO;
+  }
+
+  @Override
+  public void process(QueryEnded event) {
+    if (event.getCurrentValue() == QueryStatus.Status.CLOSED) {
+      return;
+    }
+    QueryContext queryContext = event.getQueryContext();
+    if (queryContext == null) {
+      log.warn("Could not find the context for {} for event:{}.", 
event.getQueryHandle(), event.getCurrentValue());
+      return;
+    }
+    String instanceHandle = queryContext.getConf().get(JOB_INSTANCE_ID_KEY);
+    if (instanceHandle == null) {
+      // Nothing to do
+      return;
+    }
+    SchedulerJobInstanceInfo info = schedulerDAO
+        
.getSchedulerJobInstanceInfo(SchedulerJobInstanceHandle.fromString(instanceHandle));
+    List<SchedulerJobInstanceRun> runList = info.getInstanceRunList();
+    if (runList.size() == 0) {
+      log.error("No instance run for " + instanceHandle + " with query " + 
queryContext.getQueryHandle());
+      return;
+    }
+    SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1);
+    SchedulerJobInstanceState state = latestRun.getInstanceState();
+    try {
+      switch (event.getCurrentValue()) {
+      case CANCELED:
+        state = state.nextTransition(SchedulerJobInstanceEvent.ON_KILL);
+        break;
+      case SUCCESSFUL:
+        state = state.nextTransition(SchedulerJobInstanceEvent.ON_SUCCESS);
+        break;
+      case FAILED:
+        state = state.nextTransition(SchedulerJobInstanceEvent.ON_FAILURE);
+        break;
+      }
+      latestRun.setEndTime(System.currentTimeMillis());
+      latestRun.setInstanceState(state);
+      latestRun.setResultPath(queryContext.getDriverResultPath());
+      schedulerDAO.updateJobInstanceRun(latestRun);
+    } catch (InvalidStateTransitionException e) {
+      log.error("Instance Transition Failed ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
index 3952671..14ca32d 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,43 +20,83 @@ package org.apache.lens.server.scheduler;
 
 import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
 
+import org.apache.lens.api.LensConf;
 import org.apache.lens.api.LensSessionHandle;
+import org.apache.lens.api.error.InvalidStateTransitionException;
+import org.apache.lens.api.query.QueryHandle;
 import org.apache.lens.api.scheduler.*;
+import org.apache.lens.cube.parse.CubeQueryConfUtil;
 import org.apache.lens.server.BaseLensService;
-import org.apache.lens.server.LensServerConf;
+import org.apache.lens.server.LensServices;
 import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.api.events.SchedulerAlarmEvent;
 import org.apache.lens.server.api.health.HealthStatus;
+import org.apache.lens.server.api.query.QueryEnded;
+import org.apache.lens.server.api.query.QueryExecutionService;
 import org.apache.lens.server.api.scheduler.SchedulerService;
 import org.apache.lens.server.session.LensSessionImpl;
+import org.apache.lens.server.util.UtilityMethods;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hive.service.cli.CLIService;
 
+import org.joda.time.DateTime;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
 /**
  * This class handles all the scheduler operations.
  */
+@Slf4j
 public class SchedulerServiceImpl extends BaseLensService implements 
SchedulerService {
 
-  // get the state store
-  private SchedulerDAO schedulerDAO;
+  @Getter
+  @Setter
+  @VisibleForTesting
+  protected QueryExecutionService queryService;
+  @Getter
+  @VisibleForTesting
+  protected SchedulerEventListener schedulerEventListener;
+  @Getter
+  @VisibleForTesting
+  protected SchedulerQueryEventListener schedulerQueryEventListener;
+  @Getter
+  @VisibleForTesting
+  protected SchedulerDAO schedulerDAO;
+  private AlarmService alarmService;
 
-  private LensScheduler scheduler;
   /**
-   * The constant name for scheduler service.
+   * Instantiates a new scheduler service.
+   *
+   * @param cliService the cli service
    */
-  public static final String NAME = "scheduler";
-
   public SchedulerServiceImpl(CLIService cliService) throws LensException {
     super(NAME, cliService);
-    this.schedulerDAO = new SchedulerDAO(LensServerConf.getHiveConf());
-    this.scheduler = LensScheduler.get();
   }
 
-  public SchedulerServiceImpl(CLIService cliService, SchedulerDAO 
schedulerDAO) {
-    super(NAME, cliService);
-    this.schedulerDAO = schedulerDAO;
-    this.scheduler = LensScheduler.get();
+  public synchronized void init(HiveConf hiveConf) {
+    super.init(hiveConf);
+    try {
+      schedulerDAO = new SchedulerDAO(hiveConf);
+      alarmService = LensServices.get().getService(AlarmService.NAME);
+      queryService = LensServices.get().getService(QueryExecutionService.NAME);
+      // Get the listeners' classes from the configuration.
+      this.schedulerEventListener = new SchedulerEventListener(schedulerDAO);
+      this.schedulerQueryEventListener = new 
SchedulerQueryEventListener(schedulerDAO);
+      getEventService().addListenerForType(schedulerEventListener, 
SchedulerAlarmEvent.class);
+      getEventService().addListenerForType(schedulerQueryEventListener, 
QueryEnded.class);
+    } catch (LensException e) {
+      log.error("Error Initialising Scheduler-service", e);
+    }
+  }
+
+  @Override
+  public synchronized void start() {
+    super.start();
   }
 
   /**
@@ -65,8 +105,8 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
   @Override
   public HealthStatus getHealthStatus() {
     return this.getServiceState().equals(STATE.STARTED)
-        ? new HealthStatus(true, "Scheduler service is healthy.")
-        : new HealthStatus(false, "Scheduler service is down.");
+           ? new HealthStatus(true, "Scheduler service is healthy.")
+           : new HealthStatus(false, "Scheduler service is down.");
   }
 
   /**
@@ -74,27 +114,57 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
    */
   @Override
   public SchedulerJobHandle submitJob(LensSessionHandle sessionHandle, XJob 
job) throws LensException {
-    //TBD place holder code
     LensSessionImpl session = getSession(sessionHandle);
-    return null;
+    // Validate XJob
+    validateJob(job);
+    SchedulerJobHandle handle = UtilityMethods.generateSchedulerJobHandle();
+    long createdOn = System.currentTimeMillis();
+    long modifiedOn = createdOn;
+    SchedulerJobInfo info = new SchedulerJobInfo(handle, job, 
session.getLoggedInUser(), SchedulerJobState.NEW,
+        createdOn, modifiedOn);
+    if (schedulerDAO.storeJob(info) == 1) {
+      return handle;
+    } else {
+      throw new LensException("Could not Submit the job");
+    }
+  }
+
+  private void validateJob(XJob job) throws LensException {
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public void scheduleJob(LensSessionHandle sessionHandle, SchedulerJobHandle 
jobHandle) throws LensException {
-    //TBD place holder code
-    // send the schedule request to the scheduler.
-    UUID externalID = jobHandle.getHandleId();
-    // get the job from the database and schedule
+  public boolean scheduleJob(LensSessionHandle sessionHandle, 
SchedulerJobHandle jobHandle) throws LensException {
+    SchedulerJobInfo jobInfo = schedulerDAO.getSchedulerJobInfo(jobHandle);
+    XJob job = jobInfo.getJob();
+    DateTime start = new 
DateTime(job.getStartTime().toGregorianCalendar().getTime());
+    DateTime end = new 
DateTime(job.getEndTime().toGregorianCalendar().getTime());
+    XFrequency frequency = job.getTrigger().getFrequency();
+    // check query
+    checkQuery(sessionHandle, job);
+    alarmService.schedule(start, end, frequency, 
jobHandle.getHandleIdString());
+    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_SCHEDULE) == 1;
+  }
+
+  private void checkQuery(LensSessionHandle sessionHandle, XJob job) throws 
LensException {
+    List<MapType> jobConf = job.getExecution().getQuery().getConf();
+    LensConf queryConf = new LensConf();
+    for (MapType element : jobConf) {
+      queryConf.addProperty(element.getKey(), element.getValue());
+    }
+    queryConf.addProperty(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, false);
+    
queryService.estimate(LensServices.get().getLogSegregationContext().getLogSegragationId(),
 sessionHandle,
+        job.getExecution().getQuery().getQuery(), queryConf);
+    return;
   }
 
   @Override
   public SchedulerJobHandle submitAndScheduleJob(LensSessionHandle 
sessionHandle, XJob job) throws LensException {
-    //TBD place holder code
-    // take job, validate it, submit it(check duplicate, persist it), schedule 
it.
-    return null;
+    SchedulerJobHandle handle = submitJob(sessionHandle, job);
+    scheduleJob(sessionHandle, handle);
+    return handle;
   }
 
   /**
@@ -102,39 +172,42 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
    */
   @Override
   public XJob getJobDefinition(LensSessionHandle sessionHandle, 
SchedulerJobHandle jobHandle) throws LensException {
-    //TBD place holder code
-    // get the job definition from the persisted store, return it.
-    return null;
+    return schedulerDAO.getJob(jobHandle);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle,
-                                        SchedulerJobHandle jobHandle) throws 
LensException {
-    //TBD place holder code
-    return null;
+  public SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, 
SchedulerJobHandle jobHandle)
+    throws LensException {
+    return schedulerDAO.getSchedulerJobInfo(jobHandle);
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean updateJob(LensSessionHandle sessionHandle, SchedulerJobHandle 
jobHandle,
-                           XJob newJobDefinition) throws LensException {
-    //TBD place holder code
-    XJob job = schedulerDAO.getJob(jobHandle);
-    return false;
+  public boolean updateJob(LensSessionHandle sessionHandle, SchedulerJobHandle 
jobHandle, XJob newJobDefinition)
+    throws LensException {
+    SchedulerJobInfo jobInfo = schedulerDAO.getSchedulerJobInfo(jobHandle);
+    // This will allow only the job definition and configuration change.
+    // TODO: fix start and end time changes
+    jobInfo.setJob(newJobDefinition);
+    jobInfo.setModifiedOn(System.currentTimeMillis());
+    int updated = schedulerDAO.updateJob(jobInfo);
+    return updated > 0;
   }
 
   /**
-   *
    * {@inheritDoc}
    */
   @Override
-  public void expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle 
jobHandle) throws LensException {
-    //TBD place holder code
+  public boolean expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle 
jobHandle) throws LensException {
+    if (alarmService.checkExists(jobHandle)) {
+      alarmService.unSchedule(jobHandle);
+    }
+    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_EXPIRE) == 1;
   }
 
   /**
@@ -142,8 +215,8 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
    */
   @Override
   public boolean suspendJob(LensSessionHandle sessionHandle, 
SchedulerJobHandle jobHandle) throws LensException {
-    //TBD place holder code
-    return false;
+    alarmService.pauseJob(jobHandle);
+    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_SUSPEND) == 1;
   }
 
   /**
@@ -151,8 +224,8 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
    */
   @Override
   public boolean resumeJob(LensSessionHandle sessionHandle, SchedulerJobHandle 
jobHandle) throws LensException {
-    // TBD place holder code
-    return false;
+    alarmService.resumeJob(jobHandle);
+    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_RESUME) == 1;
   }
 
   /**
@@ -160,20 +233,18 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
    */
   @Override
   public boolean deleteJob(LensSessionHandle sessionHandle, SchedulerJobHandle 
jobHandle) throws LensException {
-    // TBD place holder code
-    // it should only be a soft delete. Later on we will make a purge service 
and that service will delete
-    // all the soft delete things.
-    return false;
+    if (alarmService.checkExists(jobHandle)) {
+      alarmService.unSchedule(jobHandle);
+    }
+    return setStateOfJob(jobHandle, SchedulerJobEvent.ON_DELETE) == 1;
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle 
sessionHandle, String state, String userName,
-                                             long startTime, long endTime) 
throws LensException {
-    // TBD place holder code
-    // validate that the state is a valid state (enum)
+  public Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle 
sessionHandle, String state, String user,
+      String jobName, long startTime, long endTime) throws LensException {
     return null;
   }
 
@@ -182,9 +253,7 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
    */
   @Override
   public SchedulerJobStats getJobStats(LensSessionHandle sessionHandle, 
SchedulerJobHandle handle, String state,
-                              long startTime, long endTime) throws 
LensException {
-    // TBD place holder code
-    // validate that the state is a valid state (enum)
+      long startTime, long endTime) throws LensException {
     return null;
   }
 
@@ -192,39 +261,78 @@ public class SchedulerServiceImpl extends BaseLensService 
implements SchedulerSe
    * {@inheritDoc}
    */
   @Override
-  public boolean rerunInstance(LensSessionHandle sessionHandle,
-                               SchedulerJobInstanceHandle instanceHandle) 
throws LensException {
-    //TBD place holder code
-    return false;
+  public boolean rerunInstance(LensSessionHandle sessionHandle, 
SchedulerJobInstanceHandle instanceHandle)
+    throws LensException {
+    SchedulerJobInstanceInfo instanceInfo = 
schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle);
+    if (schedulerDAO.getJobState(instanceInfo.getJobId()) != 
SchedulerJobState.SCHEDULED) {
+      throw new LensException("Job with handle " + instanceInfo.getJobId() + " 
is not scheduled");
+    }
+    // Get the latest run.
+    List<SchedulerJobInstanceRun> runList = instanceInfo.getInstanceRunList();
+    if (runList.size() == 0) {
+      throw new LensException("Job instance " + instanceHandle + " is not yet 
run");
+    }
+    SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1);
+    // This call is for the test that it can be re run.
+    try {
+      
latestRun.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_RERUN);
+      getEventService().notifyEvent(
+          new SchedulerAlarmEvent(instanceInfo.getJobId(), new 
DateTime(instanceInfo.getScheduleTime()),
+              SchedulerAlarmEvent.EventType.SCHEDULE, instanceHandle));
+    } catch (InvalidStateTransitionException e) {
+      throw new LensException("Invalid State Transition ", e);
+    }
+    return true;
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle 
sessionHandle,
-                                      SchedulerJobHandle jobHandle, Long 
numResults) throws LensException {
-    // TBD place holder code
-    // By default return 100 results - make it configurable
-    return null;
+  public List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle 
sessionHandle, SchedulerJobHandle jobHandle,
+      Long numResults) throws LensException {
+    return schedulerDAO.getJobInstances(jobHandle);
   }
 
   @Override
-  public boolean killInstance(LensSessionHandle sessionHandle,
-                              SchedulerJobInstanceHandle instanceHandle) 
throws LensException {
-    // TBD place holder code
-    return true;
+  public boolean killInstance(LensSessionHandle sessionHandle, 
SchedulerJobInstanceHandle instanceHandle)
+    throws LensException {
+    /**
+     * Get the query handle from the latest run.
+     */
+    SchedulerJobInstanceInfo instanceInfo = 
schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle);
+    List<SchedulerJobInstanceRun> runList = instanceInfo.getInstanceRunList();
+    if (runList.size() == 0) {
+      throw new LensException("Job instance " + instanceHandle + " is not yet 
run");
+    }
+    SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1);
+    QueryHandle handle = latestRun.getQueryHandle();
+    if (handle.getHandleIdString().isEmpty()) {
+      return false;
+    }
+    // This will cause the QueryEnd event which will set the status of the 
instance to KILLED.
+    return queryService.cancelQuery(sessionHandle, handle);
   }
 
-
   /**
    * {@inheritDoc}
    */
   @Override
   public SchedulerJobInstanceInfo getInstanceDetails(LensSessionHandle 
sessionHandle,
-                                                     
SchedulerJobInstanceHandle instanceHandle) throws LensException {
-    // TBD place holder code
-    return null;
+      SchedulerJobInstanceHandle instanceHandle) throws LensException {
+    return schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle);
   }
 
+  private int setStateOfJob(SchedulerJobHandle handle, SchedulerJobEvent 
event) throws LensException {
+    try {
+      SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(handle);
+      SchedulerJobState currentState = info.getJobState();
+      SchedulerJobState nextState = currentState.nextTransition(event);
+      info.setJobState(nextState);
+      info.setModifiedOn(System.currentTimeMillis());
+      return schedulerDAO.updateJobStatus(info);
+    } catch (InvalidStateTransitionException e) {
+      throw new LensException("Invalid state ", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java
 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java
deleted file mode 100644
index a4cdd83..0000000
--- 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.lens.server.scheduler.notification.services;
-
-import org.apache.lens.api.scheduler.SchedulerJobHandle;
-import org.apache.lens.api.scheduler.XFrequency;
-import org.apache.lens.api.scheduler.XFrequencyEnum;
-import org.apache.lens.server.LensServices;
-import org.apache.lens.server.api.LensService;
-import org.apache.lens.server.api.error.LensException;
-import org.apache.lens.server.api.events.LensEventService;
-import org.apache.lens.server.api.events.SchedulerAlarmEvent;
-import org.apache.lens.server.api.health.HealthStatus;
-
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hive.service.AbstractService;
-
-import org.joda.time.DateTime;
-import org.quartz.*;
-import org.quartz.impl.StdSchedulerFactory;
-
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * This service is used primarily by Scheduler to get alarm notifications for 
scheduled queries.
- *
- * As a schedule this service accepts start time, frequency, end time and 
timeZone. It also requires the
- * {@link SchedulerJobHandle} which it sends as part of the
- * {@link org.apache.lens.server.api.events.SchedulerAlarmEvent} to inform the 
scheduler about the job for which
- * job the notification has been generated.
- */
-@Slf4j
-public class AlarmService extends AbstractService implements LensService {
-
-  public static final String NAME = "alarm-service";
-
-  private Scheduler scheduler;
-
-  /**
-   * True if the service started properly and is running fine, false otherwise.
-   */
-  private boolean isHealthy = true;
-
-  /**
-   * Contains the reason if service is not healthy.
-   */
-  private String healthCause;
-
-  /**
-   * Creates a new instance of AlarmService.
-   *
-   * @param name       the name
-   */
-  public AlarmService(String name) {
-    super(name);
-  }
-
-  @Override
-  public HealthStatus getHealthStatus() {
-    return isHealthy
-        ? new HealthStatus(isHealthy, "Alarm service is healthy.")
-        : new HealthStatus(isHealthy, healthCause);
-  }
-
-  public synchronized void init(HiveConf hiveConf) {
-    super.init(hiveConf);
-    try {
-      this.scheduler = StdSchedulerFactory.getDefaultScheduler();
-    } catch (SchedulerException e) {
-      isHealthy = false;
-      healthCause = "Failed to initialize the Quartz Scheduler for 
AlarmService.";
-      log.error(healthCause, e);
-    }
-  }
-
-  public synchronized void start() {
-    try {
-      scheduler.start();
-      log.info("Alarm service started successfully!");
-    } catch (SchedulerException e) {
-      isHealthy = false;
-      healthCause = "Failed to start the Quartz Scheduler for AlarmService.";
-      log.error(healthCause, e);
-    }
-  }
-
-  @Override
-  public synchronized void stop() {
-    try {
-      scheduler.shutdown();
-      log.info("Alarm Service stopped successfully.");
-    } catch (SchedulerException e) {
-      log.error("Failed to shut down the Quartz Scheduler for AlarmService.", 
e);
-    }
-  }
-
-  /**
-   * This method can be used by any consumer who wants to receive 
notifications during a time range at a given
-   * frequency.
-   *
-   * This method is intended to be used by LensScheduler to subscribe for time 
based notifications to schedule queries.
-   * On receiving a job to be scheduled LensScheduler will subscribe to all 
triggers required for the job, including
-   * AlarmService for time based triggers.
-   *
-   * @param start start time for notifications
-   * @param end end time for notifications
-   * @param frequency Frequency to determine the frequency at which 
notification should be sent.
-   * @param jobHandle Must be a unique jobHanlde across all consumers
-   */
-  public void schedule(DateTime start, DateTime end, XFrequency frequency, 
String jobHandle) throws LensException {
-    // accept the schedule and then keep on sending the notifications for that 
schedule
-    JobDataMap map = new JobDataMap();
-    map.put("jobHandle", jobHandle);
-
-    JobDetail job = JobBuilder.newJob(LensJob.class).withIdentity(jobHandle, 
"LensJobs")
-      .usingJobData(map).build();
-
-    Trigger trigger;
-    if (frequency.getEnum() != null) { //for enum expression:  create a 
trigger using calendar interval
-      CalendarIntervalScheduleBuilder scheduleBuilder = 
CalendarIntervalScheduleBuilder.calendarIntervalSchedule()
-        .withInterval(getTimeInterval(frequency.getEnum()), 
getTimeUnit(frequency.getEnum()))
-        .withMisfireHandlingInstructionIgnoreMisfires();
-      trigger = TriggerBuilder.newTrigger()
-        .withIdentity(jobHandle, "AlarmService")
-        .startAt(start.toDate())
-        .endAt(end.toDate())
-        .withSchedule(scheduleBuilder)
-        .build();
-    } else { // for cron expression create a cron trigger
-      trigger = TriggerBuilder.newTrigger()
-          .withIdentity(jobHandle, "AlarmService")
-          
.withSchedule(CronScheduleBuilder.cronSchedule(frequency.getCronExpression()))
-          .build();
-    }
-
-    // Tell quartz to run the job using our trigger
-    try {
-      scheduler.scheduleJob(job, trigger);
-    } catch (SchedulerException e) {
-      log.error("Error scheduling job with jobHandle: {}", jobHandle);
-      throw new LensException("Failed to schedule job with jobHandle: " + 
jobHandle, e);
-    }
-  }
-
-
-  private int getTimeInterval(XFrequencyEnum frequencyEnum) {
-    // since quarterly is not supported natively, we express it as 3 months
-    return frequencyEnum == XFrequencyEnum.QUARTERLY ? 3 : 1;
-  }
-
-
-  // Maps the timeunit in entity specification to the one in Quartz DateBuilder
-  private DateBuilder.IntervalUnit getTimeUnit(XFrequencyEnum frequencyEnum) {
-    switch (frequencyEnum) {
-
-    case DAILY:
-      return DateBuilder.IntervalUnit.DAY;
-
-    case WEEKLY:
-      return DateBuilder.IntervalUnit.WEEK;
-
-    case MONTHLY:
-      return DateBuilder.IntervalUnit.MONTH;
-
-    case QUARTERLY:
-      return DateBuilder.IntervalUnit.MONTH;
-
-    case YEARLY:
-      return DateBuilder.IntervalUnit.YEAR;
-
-    default:
-      throw new IllegalArgumentException("Invalid frequency enum expression: " 
+ frequencyEnum.name());
-    }
-  }
-
-  public boolean unSchedule(SchedulerJobHandle jobHandle) throws LensException 
{
-    // stop sending notifications for this job handle
-    try {
-      return scheduler.deleteJob(JobKey.jobKey(jobHandle.getHandleIdString(), 
"LensScheduler"));
-    } catch (SchedulerException e) {
-      log.error("Failed to remove alarm triggers for job with jobHandle: " + 
jobHandle, e);
-      throw new LensException("Failed to remove alarm triggers for job with 
jobHandle: " + jobHandle, e);
-    }
-  }
-
-  public static class LensJob implements Job {
-
-    @Override
-    public void execute(JobExecutionContext jobExecutionContext) throws 
JobExecutionException {
-      JobDataMap data = jobExecutionContext.getMergedJobDataMap();
-      DateTime nominalTime = new 
DateTime(jobExecutionContext.getScheduledFireTime());
-      SchedulerJobHandle jobHandle = 
SchedulerJobHandle.fromString(data.getString("jobHandle"));
-      SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, 
nominalTime);
-      try {
-        LensEventService  eventService = 
LensServices.get().getService(LensEventService.NAME);
-        eventService.notifyEvent(alarmEvent);
-      } catch (LensException e) {
-        log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and 
nominalTime: {}",
-            jobHandle.getHandleIdString(), nominalTime.toString(), e);
-        throw new JobExecutionException("Failed to notify alarmEvent", e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobInstanceState.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobInstanceState.java
 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobInstanceState.java
deleted file mode 100644
index 95057e4..0000000
--- 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobInstanceState.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.server.scheduler.state;
-
-import org.apache.lens.api.scheduler.SchedulerJobInstanceStatus;
-import org.apache.lens.server.api.error.InvalidStateTransitionException;
-import org.apache.lens.server.api.scheduler.StateMachine;
-
-import lombok.Getter;
-import lombok.Setter;
-
-/**
- * State machine for transitions on Scheduler Jobs.
- */
-public class SchedulerJobInstanceState {
-
-  public SchedulerJobInstanceState(SchedulerJobInstanceStatus status) {
-    this.currentStatus = status;
-  }
-
-  public SchedulerJobInstanceState() {
-    this.currentStatus = INITIAL_STATUS;
-  }
-
-  @Getter @Setter
-  private SchedulerJobInstanceStatus currentStatus;
-
-  private static final SchedulerJobInstanceStatus INITIAL_STATUS = 
SchedulerJobInstanceStatus.WAITING;
-
-  public SchedulerJobInstanceStatus nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-    STATE currentState = STATE.valueOf(currentStatus.name());
-    return 
SchedulerJobInstanceStatus.valueOf(currentState.nextTransition(event).name());
-  }
-
-  public enum STATE implements StateMachine<STATE, EVENT> {
-    // repeating same operation will return the same state to ensure 
idempotent behavior.
-    WAITING {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_CREATION:
-          return this;
-        case ON_CONDITIONS_MET:
-          return STATE.LAUNCHED;
-        case ON_TIME_OUT:
-          return STATE.TIMED_OUT;
-        case ON_RUN:
-          return STATE.RUNNING;
-        case ON_SUCCESS:
-          return STATE.SUCCEEDED;
-        case ON_FAILURE:
-          return STATE.FAILED;
-        case ON_KILL:
-          return STATE.KILLED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-    LAUNCHED {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_CONDITIONS_MET:
-          return this;
-        case ON_RUN:
-          return STATE.RUNNING;
-        case ON_SUCCESS:
-          return STATE.SUCCEEDED;
-        case ON_FAILURE:
-          return STATE.FAILED;
-        case ON_KILL:
-          return STATE.KILLED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-    RUNNING {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_RUN:
-          return this;
-        case ON_SUCCESS:
-          return STATE.SUCCEEDED;
-        case ON_FAILURE:
-          return STATE.FAILED;
-        case ON_KILL:
-          return STATE.KILLED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-    FAILED {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_FAILURE:
-          return this;
-        case ON_RERUN:
-          return STATE.LAUNCHED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-    SUCCEEDED {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_SUCCESS:
-          return this;
-        case ON_RERUN:
-          return STATE.LAUNCHED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-
-    TIMED_OUT {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_TIME_OUT:
-          return this;
-        case ON_RERUN:
-          return STATE.WAITING;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-    KILLED {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_KILL:
-          return this;
-        case ON_RERUN:
-          return STATE.LAUNCHED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    }
-  }
-
-  /**
-   * All events(actions) which can happen on an instance of 
<code>SchedulerJob</code>.
-   */
-  public enum EVENT {
-    ON_CREATION, // an instance is first considered by the scheduler.
-    ON_TIME_OUT,
-    ON_CONDITIONS_MET,
-    ON_RUN,
-    ON_SUCCESS,
-    ON_FAILURE,
-    ON_RERUN,
-    ON_KILL
-  }
-}

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobState.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobState.java
 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobState.java
deleted file mode 100644
index d21cd05..0000000
--- 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobState.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.server.scheduler.state;
-
-import org.apache.lens.api.scheduler.SchedulerJobStatus;
-import org.apache.lens.server.api.error.InvalidStateTransitionException;
-import org.apache.lens.server.api.scheduler.StateMachine;
-
-import lombok.Getter;
-import lombok.Setter;
-
-/**
- * This class represents current state of a SchedulerJob and provides helper 
methods
- * for handling different events and lifecycle transition for a SchedulerJob.
- */
-public class SchedulerJobState {
-
-  public SchedulerJobState(SchedulerJobStatus status) {
-    this.currentStatus = status;
-  }
-  public SchedulerJobState() {
-    this.currentStatus = INITIAL_STATUS;
-  }
-
-  @Getter @Setter
-  private SchedulerJobStatus currentStatus;
-
-  private static final SchedulerJobStatus INITIAL_STATUS = 
SchedulerJobStatus.NEW;
-
-  public SchedulerJobStatus nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-    STATE currentState = STATE.valueOf(currentStatus.name());
-    return 
SchedulerJobStatus.valueOf(currentState.nextTransition(event).name());
-  }
-
-  private enum STATE implements StateMachine<STATE, EVENT> {
-    // repeating same operation will return the same state to ensure 
idempotent behavior.
-    NEW {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_SUBMIT:
-          return this;
-        case ON_SCHEDULE:
-          return STATE.SCHEDULED;
-        case ON_EXPIRE:
-          return STATE.EXPIRED;
-        case ON_DELETE:
-          return STATE.DELETED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-    SCHEDULED {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_SCHEDULE:
-          return this;
-        case ON_SUSPEND:
-          return STATE.SUSPENDED;
-        case ON_EXPIRE:
-          return STATE.EXPIRED;
-        case ON_DELETE:
-          return STATE.DELETED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-    SUSPENDED {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_SUSPEND:
-          return this;
-        case ON_RESUME:
-          return STATE.SCHEDULED;
-        case ON_EXPIRE:
-          return STATE.EXPIRED;
-        case ON_DELETE:
-          return STATE.DELETED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-    EXPIRED {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_EXPIRE:
-          return this;
-        case ON_DELETE:
-          return STATE.DELETED;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    },
-
-    DELETED {
-      @Override
-      public STATE nextTransition(EVENT event) throws 
InvalidStateTransitionException {
-        switch (event) {
-        case ON_DELETE:
-          return this;
-        default:
-          throw new InvalidStateTransitionException("Event: " + event.name() + 
" is not a valid event for state: "
-            + this.name());
-        }
-      }
-    }
-  }
-
-  /**
-   * All events(actions) which can happen on a Scheduler Job.
-   */
-  public enum EVENT {
-    ON_SUBMIT,
-    ON_SCHEDULE,
-    ON_SUSPEND,
-    ON_RESUME,
-    ON_EXPIRE,
-    ON_DELETE
-  }
-}

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java
 
b/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java
deleted file mode 100644
index 31783ad..0000000
--- 
a/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.lens.server.scheduler.util;
-
-import org.apache.lens.server.api.LensConfConstants;
-
-import org.apache.commons.dbcp.BasicDataSource;
-import org.apache.hadoop.conf.Configuration;
-
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class UtilityMethods {
-  private UtilityMethods() {
-
-  }
-
-  /**
-   *
-   * @param conf
-   * @return
-   */
-  public static BasicDataSource getDataSourceFromConf(Configuration conf) {
-    BasicDataSource basicDataSource = new BasicDataSource();
-    basicDataSource.setDriverClassName(
-        conf.get(LensConfConstants.SERVER_DB_DRIVER_NAME, 
LensConfConstants.DEFAULT_SERVER_DB_DRIVER_NAME));
-    basicDataSource
-        .setUrl(conf.get(LensConfConstants.SERVER_DB_JDBC_URL, 
LensConfConstants.DEFAULT_SERVER_DB_JDBC_URL));
-    basicDataSource
-        .setUsername(conf.get(LensConfConstants.SERVER_DB_JDBC_USER, 
LensConfConstants.DEFAULT_SERVER_DB_USER));
-    basicDataSource
-        .setPassword(conf.get(LensConfConstants.SERVER_DB_JDBC_PASS, 
LensConfConstants.DEFAULT_SERVER_DB_PASS));
-    basicDataSource.setDefaultAutoCommit(true);
-    return basicDataSource;
-  }
-}

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java 
b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
index db53924..3003ab7 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java
@@ -57,7 +57,7 @@ import lombok.extern.slf4j.Slf4j;
  * The Class LensSessionImpl.
  */
 @Slf4j
-public class LensSessionImpl extends HiveSessionImpl {
+public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable {
 
   /** The persist info. */
   private LensSessionPersistInfo persistInfo = new LensSessionPersistInfo();

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java 
b/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java
index 63a7874..42d9fe0 100644
--- a/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java
+++ b/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,9 +26,12 @@ import java.security.NoSuchAlgorithmException;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Map;
+import java.util.UUID;
 
 import javax.sql.DataSource;
 
+import org.apache.lens.api.scheduler.SchedulerJobHandle;
+import org.apache.lens.api.scheduler.SchedulerJobInstanceHandle;
 import org.apache.lens.server.api.LensConfConstants;
 
 import org.apache.commons.dbcp.*;
@@ -41,7 +44,6 @@ import org.apache.hadoop.io.WritableUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
-
 /**
  * The Class UtilityMethods.
  */
@@ -138,30 +140,29 @@ public final class UtilityMethods {
    */
   public static BasicDataSource getDataSourceFromConf(Configuration conf) {
     BasicDataSource tmp = new BasicDataSource();
-    tmp.setDriverClassName(conf.get(LensConfConstants.SERVER_DB_DRIVER_NAME,
-      LensConfConstants.DEFAULT_SERVER_DB_DRIVER_NAME));
+    tmp.setDriverClassName(
+        conf.get(LensConfConstants.SERVER_DB_DRIVER_NAME, 
LensConfConstants.DEFAULT_SERVER_DB_DRIVER_NAME));
     tmp.setUrl(conf.get(LensConfConstants.SERVER_DB_JDBC_URL, 
LensConfConstants.DEFAULT_SERVER_DB_JDBC_URL));
     tmp.setUsername(conf.get(LensConfConstants.SERVER_DB_JDBC_USER, 
LensConfConstants.DEFAULT_SERVER_DB_USER));
     tmp.setPassword(conf.get(LensConfConstants.SERVER_DB_JDBC_PASS, 
LensConfConstants.DEFAULT_SERVER_DB_PASS));
-    
tmp.setValidationQuery(conf.get(LensConfConstants.SERVER_DB_VALIDATION_QUERY,
-      LensConfConstants.DEFAULT_SERVER_DB_VALIDATION_QUERY));
+    tmp.setValidationQuery(
+        conf.get(LensConfConstants.SERVER_DB_VALIDATION_QUERY, 
LensConfConstants.DEFAULT_SERVER_DB_VALIDATION_QUERY));
     tmp.setDefaultAutoCommit(false);
     return tmp;
   }
 
   public static DataSource getPoolingDataSourceFromConf(Configuration conf) {
     final ConnectionFactory cf = new DriverManagerConnectionFactory(
-      conf.get(LensConfConstants.SERVER_DB_JDBC_URL, 
LensConfConstants.DEFAULT_SERVER_DB_JDBC_URL),
-      conf.get(LensConfConstants.SERVER_DB_JDBC_USER, 
LensConfConstants.DEFAULT_SERVER_DB_USER),
-      conf.get(LensConfConstants.SERVER_DB_JDBC_PASS, 
LensConfConstants.DEFAULT_SERVER_DB_PASS));
+        conf.get(LensConfConstants.SERVER_DB_JDBC_URL, 
LensConfConstants.DEFAULT_SERVER_DB_JDBC_URL),
+        conf.get(LensConfConstants.SERVER_DB_JDBC_USER, 
LensConfConstants.DEFAULT_SERVER_DB_USER),
+        conf.get(LensConfConstants.SERVER_DB_JDBC_PASS, 
LensConfConstants.DEFAULT_SERVER_DB_PASS));
     final GenericObjectPool connectionPool = new GenericObjectPool();
     connectionPool.setTestOnBorrow(false);
     connectionPool.setTestOnReturn(false);
     connectionPool.setTestWhileIdle(true);
-    new PoolableConnectionFactory(cf, connectionPool, null
-      , conf.get(LensConfConstants.SERVER_DB_VALIDATION_QUERY,
-        LensConfConstants.DEFAULT_SERVER_DB_VALIDATION_QUERY), false, false)
-      .setDefaultAutoCommit(true);
+    new PoolableConnectionFactory(cf, connectionPool, null,
+        conf.get(LensConfConstants.SERVER_DB_VALIDATION_QUERY, 
LensConfConstants.DEFAULT_SERVER_DB_VALIDATION_QUERY),
+        false, false).setDefaultAutoCommit(true);
     return new PoolingDataSource(connectionPool);
   }
 
@@ -204,13 +205,31 @@ public final class UtilityMethods {
   public static byte[] generateHashOfWritable(Writable writable) {
     try {
       MessageDigest md = MessageDigest.getInstance("MD5");
-      byte [] lensConfBytes = WritableUtils.toByteArray(writable);
+      byte[] lensConfBytes = WritableUtils.toByteArray(writable);
       md.update(lensConfBytes);
-      byte [] digest = md.digest();
+      byte[] digest = md.digest();
       return digest;
     } catch (NoSuchAlgorithmException e) {
       log.warn("MD5: No such method error " + writable);
       return null;
     }
   }
+
+  /**
+   * @param conf
+   * @return
+   */
+  public static BasicDataSource 
getDataSourceFromConfForScheduler(Configuration conf) {
+    BasicDataSource basicDataSource = getDataSourceFromConf(conf);
+    basicDataSource.setDefaultAutoCommit(true);
+    return basicDataSource;
+  }
+
+  public static SchedulerJobHandle generateSchedulerJobHandle() {
+    return new SchedulerJobHandle(UUID.randomUUID());
+  }
+
+  public static SchedulerJobInstanceHandle 
generateSchedulerJobInstanceHandle() {
+    return new SchedulerJobInstanceHandle(UUID.randomUUID());
+  }
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/resources/lensserver-default.xml
----------------------------------------------------------------------
diff --git a/lens-server/src/main/resources/lensserver-default.xml 
b/lens-server/src/main/resources/lensserver-default.xml
index a57cad6..e9525fd 100644
--- a/lens-server/src/main/resources/lensserver-default.xml
+++ b/lens-server/src/main/resources/lensserver-default.xml
@@ -864,9 +864,15 @@
     </description>
   </property>
   <property>
-  <name>lens.server.status.update.exponential.wait.millis</name>
-  <value>30000</value>
-  <description>Number of millis that would grow exponentially for next update, 
incase of transient failures.
-  </description>
-</property>
+    <name>lens.server.status.update.exponential.wait.millis</name>
+    <value>30000</value>
+    <description>Number of millis that would grow exponentially for next 
update, incase of transient failures.
+    </description>
+  </property>
+  <property>
+    <name>lens.query.current.time.millis</name>
+    <value>0</value>
+    <description>Query current time in millis. This is used to resolve 'now'. 
If value is set to zero, 'now' is
+     resolved to current value</description>
+  </property>
 </configuration>

Reply via email to