http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java index bf99fde..7a2b06a 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java @@ -28,10 +28,11 @@ import javax.xml.bind.JAXBElement; import org.apache.lens.api.LensSessionHandle; import org.apache.lens.api.ToXMLString; +import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.scheduler.*; import org.apache.lens.server.api.LensConfConstants; import org.apache.lens.server.api.error.LensException; -import org.apache.lens.server.scheduler.util.UtilityMethods; +import org.apache.lens.server.util.UtilityMethods; import org.apache.commons.dbcp.BasicDataSource; import org.apache.commons.dbutils.QueryRunner; @@ -54,9 +55,10 @@ public class SchedulerDAO { Class dbStoreClass = Class .forName(conf.get(LensConfConstants.SCHEDULER_STORE_CLASS, SchedulerHsqlDBStore.class.getName())); this.store = (SchedulerDBStore) dbStoreClass.newInstance(); - this.store.init(UtilityMethods.getDataSourceFromConf(conf)); + this.store.init(UtilityMethods.getDataSourceFromConfForScheduler(conf)); this.store.createJobTable(); - this.store.createJobInstaceTable(); + this.store.createJobInstanceTable(); + this.store.createJobInstanceRunTable(); } catch (SQLException e) { log.error("Error creating job tables", e); throw new LensException("Error creating job tables ", e); @@ -115,16 +117,31 @@ public class SchedulerDAO { } /** - * Gets the Job state + * Get the user of the job who submitted the job. + * + * @param id + * @return + */ + public String getUser(SchedulerJobHandle id) { + try { + return store.getUser(id.getHandleIdString()); + } catch (SQLException e) { + log.error("Error while getting the user for the job with handle " + id.getHandleIdString(), e); + return null; + } + } + + /** + * Gets the Job status * * @param id : Job handle id. - * @return SchedulerJobState of the job. + * @return SchedulerJobStatus of the job. */ - public SchedulerJobStatus getJobState(SchedulerJobHandle id) { + public SchedulerJobState getJobState(SchedulerJobHandle id) { try { return store.getJobState(id.getHandleIdString()); } catch (SQLException e) { - log.error("Error while getting the job state for " + id.getHandleIdString(), e); + log.error("Error while getting the job status for " + id.getHandleIdString(), e); return null; } } @@ -145,16 +162,16 @@ public class SchedulerDAO { } /** - * Updates the job state form the new SchedulerJobInfo + * Updates the job status form the new SchedulerJobInfo * * @param info: Updated info objects * @return number of rows updated. */ - public int updateJobState(SchedulerJobInfo info) { + public int updateJobStatus(SchedulerJobInfo info) { try { - return store.updateJobState(info.getId().getHandleIdString(), info.getState().name(), info.getModifiedOn()); + return store.updateJobStatus(info.getId().getHandleIdString(), info.getJobState().name(), info.getModifiedOn()); } catch (SQLException e) { - log.error("Error while updating job state for " + info.getId().getHandleIdString(), e); + log.error("Error while updating job status for " + info.getId().getHandleIdString(), e); return 0; } } @@ -168,6 +185,17 @@ public class SchedulerDAO { } } + public int storeJobInstanceRun(SchedulerJobInstanceRun instanceRun) { + try { + return store.insertIntoJobInstanceRunTable(instanceRun); + } catch (SQLException e) { + log.error( + "Error while storing job instance run for " + instanceRun.getRunId() + " and instance handle " + instanceRun + .getHandle().getHandleIdString(), e); + return 0; + } + } + /** * Gets the SchedulerJobInstanceInfo corresponding instance handle id. * @@ -184,16 +212,17 @@ public class SchedulerDAO { } /** - * Updates the instance state + * Updates the instance status * - * @param info: Updated instance info + * @param instanceRun : instance Run object * @return number of rows updated. */ - public int updateJobInstanceState(SchedulerJobInstanceInfo info) { + public int updateJobInstanceRun(SchedulerJobInstanceRun instanceRun) { try { - return store.updateJobInstanceState(info.getId().getHandleIdString(), info.getState().name()); + return store.updateJobInstanceRun(instanceRun); } catch (SQLException e) { - log.error("Error while updating the job instance state for " + info.getId().getHandleIdString(), e); + log.error("Error while updating the job instance status for " + instanceRun.getHandle().getHandleIdString() + + " and run: " + instanceRun.getRunId(), e); return 0; } } @@ -204,7 +233,7 @@ public class SchedulerDAO { * @param id: Job handle id. * @return List of instance handles. */ - public List<SchedulerJobInstanceHandle> getJobInstances(SchedulerJobHandle id) { + public List<SchedulerJobInstanceInfo> getJobInstances(SchedulerJobHandle id) { // TODO: Add number of results to be fetched try { return store.getAllJobInstances(id.getHandleIdString()); @@ -218,15 +247,29 @@ public class SchedulerDAO { * Gets all jobs which match the filter requirements. * * @param username : User name of the job - * @param state : State of the job + * @param jobState : state of the job * @param startTime : Created on should be greater than this start time. * @param endTime : Created on should be less than the end time. * @return List of Job handles */ - public List<SchedulerJobHandle> getJobs(String username, SchedulerJobStatus state, Long startTime, - Long endTime) { + public List<SchedulerJobHandle> getJobs(String username, SchedulerJobState jobState, Long startTime, Long endTime) { + try { + return store.getJobs(username, jobState == null ? null : jobState.name(), startTime, endTime); + } catch (SQLException e) { + log.error("Error while getting jobs ", e); + return null; + } + } + + /** + * Get the handle given the job name. + * + * @param jobName + * @return + */ + public List<SchedulerJobHandle> getJob(String jobName) { try { - return store.getJobs(username, state == null ? null : state.name(), startTime, endTime); + return store.getJobsByName(jobName); } catch (SQLException e) { log.error("Error while getting jobs ", e); return null; @@ -236,18 +279,22 @@ public class SchedulerDAO { public abstract static class SchedulerDBStore { protected static final String JOB_TABLE = "job_table"; protected static final String JOB_INSTANCE_TABLE = "job_instance_table"; + protected static final String JOB_INSTANCE_RUN_TABLE = "job_instance_run_table"; protected static final String COLUMN_ID = "id"; + protected static final String COLUMN_RUN_ID = "runid"; protected static final String COLUMN_JOB = "job"; protected static final String COLUMN_USER = "username"; - protected static final String COLUMN_STATE = "state"; + protected static final String COLUMN_STATUS = "status"; protected static final String COLUMN_CREATED_ON = "createdon"; + protected static final String COLUMN_SCHEDULE_TIME = "schedultime"; protected static final String COLUMN_MODIFIED_ON = "modifiedon"; protected static final String COLUMN_JOB_ID = "jobid"; protected static final String COLUMN_SESSION_HANDLE = "sessionhandle"; protected static final String COLUMN_START_TIME = "starttime"; protected static final String COLUMN_END_TIME = "endtime"; protected static final String COLUMN_RESULT_PATH = "resultpath"; - protected static final String COLUMN_QUERY = "query"; + protected static final String COLUMN_QUERY_HANDLE = "queryhandle"; + protected static final String COLUMN_JOB_NAME = "jobname"; protected QueryRunner runner; protected ObjectFactory jobFactory = new ObjectFactory(); // Generic multiple row handler for the fetch query. @@ -289,7 +336,14 @@ public class SchedulerDAO { * * @throws SQLException */ - public abstract void createJobInstaceTable() throws SQLException; + public abstract void createJobInstanceTable() throws SQLException; + + /** + * Creates the job instance run table + * + * @throws SQLException + */ + public abstract void createJobInstanceRunTable() throws SQLException; /** * Inserts the Job info object into job table @@ -299,10 +353,10 @@ public class SchedulerDAO { * @throws SQLException */ public int insertIntoJobTable(SchedulerJobInfo jobInfo) throws SQLException { - String insertSQL = "INSERT INTO " + JOB_TABLE + " VALUES(?,?,?,?,?,?)"; + String insertSQL = "INSERT INTO " + JOB_TABLE + " VALUES(?,?,?,?,?,?,?)"; JAXBElement<XJob> xmlJob = jobFactory.createJob(jobInfo.getJob()); return runner.update(insertSQL, jobInfo.getId().toString(), ToXMLString.toString(xmlJob), jobInfo.getUserName(), - jobInfo.getState().name(), jobInfo.getCreatedOn(), jobInfo.getModifiedOn()); + jobInfo.getJobState().name(), jobInfo.getCreatedOn(), jobInfo.getModifiedOn(), jobInfo.getJob().getName()); } /** @@ -313,12 +367,19 @@ public class SchedulerDAO { * @throws SQLException */ public int insertIntoJobInstanceTable(SchedulerJobInstanceInfo instanceInfo) throws SQLException { - String insertSQL = "INSERT INTO " + JOB_INSTANCE_TABLE + " VALUES(?,?,?,?,?,?,?,?,?)"; + String insertSQL = "INSERT INTO " + JOB_INSTANCE_TABLE + " VALUES(?,?,?)"; return runner .update(insertSQL, instanceInfo.getId().getHandleIdString(), instanceInfo.getJobId().getHandleIdString(), - instanceInfo.getSessionHandle().toString(), instanceInfo.getStartTime(), instanceInfo.getEndTime(), - instanceInfo.getResultPath(), instanceInfo.getQuery(), instanceInfo.getState().name(), - instanceInfo.getCreatedOn()); + instanceInfo.getScheduleTime()); + } + + public int insertIntoJobInstanceRunTable(SchedulerJobInstanceRun instanceRun) throws SQLException { + String insetSQL = "INSERT INTO " + JOB_INSTANCE_RUN_TABLE + " VALUES(?,?,?,?,?,?,?,?)"; + return runner.update(insetSQL, instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(), + instanceRun.getSessionHandle().toString(), instanceRun.getStartTime(), instanceRun.getEndTime(), + instanceRun.getResultPath(), + instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(), + instanceRun.getInstanceState().name()); } /** @@ -338,10 +399,11 @@ public class SchedulerDAO { SchedulerJobHandle id = SchedulerJobHandle.fromString((String) jobInfo[0]); XJob xJob = ToXMLString.valueOf((String) jobInfo[1], ObjectFactory.class); String userName = (String) jobInfo[2]; - String state = (String) jobInfo[3]; + String status = (String) jobInfo[3]; long createdOn = (Long) jobInfo[4]; long modifiedOn = (Long) jobInfo[5]; - return new SchedulerJobInfo(id, xJob, userName, SchedulerJobStatus.valueOf(state), createdOn, modifiedOn); + SchedulerJobState jobState = SchedulerJobState.valueOf(status); + return new SchedulerJobInfo(id, xJob, userName, jobState, createdOn, modifiedOn); } } @@ -363,19 +425,37 @@ public class SchedulerDAO { } /** - * Gets the job state + * Get the job handles given the job name + * + * @param jobname + * @return A list of handles + * @throws SQLException + */ + public List<SchedulerJobHandle> getJobsByName(String jobname) throws SQLException { + String fetchSQL = "SELCET " + COLUMN_ID + " FROM " + JOB_TABLE + " WHERE " + COLUMN_JOB_NAME + "=?"; + List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, jobname); + List<SchedulerJobHandle> resOut = new ArrayList<>(); + for (int i = 0; i < result.size(); i++) { + Object[] row = result.get(i); + resOut.add(SchedulerJobHandle.fromString((String) row[0])); + } + return resOut; + } + + /** + * Gets the job status * * @param id - * @return SchedulerJobState + * @return SchedulerJobStatus * @throws SQLException */ - public SchedulerJobStatus getJobState(String id) throws SQLException { - String fetchSQL = "SELECT " + COLUMN_STATE + " FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; + public SchedulerJobState getJobState(String id) throws SQLException { + String fetchSQL = "SELECT " + COLUMN_STATUS + " FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, id); if (result.size() == 0) { return null; } else { - return SchedulerJobStatus.valueOf((String) result.get(0)[0]); + return SchedulerJobState.valueOf((String) result.get(0)[0]); } } @@ -383,20 +463,20 @@ public class SchedulerDAO { * Gets all the jobs which match the filter requirements. * * @param username - * @param state + * @param status * @param starttime * @param endtime * @return the list of job handles. * @throws SQLException */ - public List<SchedulerJobHandle> getJobs(String username, String state, Long starttime, Long endtime) + public List<SchedulerJobHandle> getJobs(String username, String status, Long starttime, Long endtime) throws SQLException { String whereClause = ""; if (username != null && !username.isEmpty()) { - whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + COLUMN_USER + " = '" + username+"'"; + whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + COLUMN_USER + " = '" + username + "'"; } - if (state != null && !state.isEmpty()) { - whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + COLUMN_STATE + " = '" + state + "'"; + if (status != null && !status.isEmpty()) { + whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + COLUMN_STATUS + " = '" + status + "'"; } if (starttime != null && starttime > 0) { whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + COLUMN_CREATED_ON + " >= " + starttime; @@ -432,19 +512,19 @@ public class SchedulerDAO { } /** - * Updates the job state into the job table + * Updates the job status into the job table * * @param id - * @param state + * @param status * @param modifiedOn * @return number of rows updated. * @throws SQLException */ - public int updateJobState(String id, String state, long modifiedOn) throws SQLException { + public int updateJobStatus(String id, String status, long modifiedOn) throws SQLException { String updateSQL = - "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATE + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID + "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATUS + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID + "=?"; - return runner.update(updateSQL, state, modifiedOn, id); + return runner.update(updateSQL, status, modifiedOn, id); } /** @@ -461,31 +541,54 @@ public class SchedulerDAO { return null; } else { Object[] instanceInfo = result.get(0); - SchedulerJobInstanceHandle id = SchedulerJobInstanceHandle.fromString((String) instanceInfo[0]); - SchedulerJobHandle jobId = SchedulerJobHandle.fromString((String) instanceInfo[1]); - LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) instanceInfo[2]); - long starttime = (Long) instanceInfo[3]; - long endtime = (Long) instanceInfo[4]; - String resultPath = (String) instanceInfo[5]; - String query = (String) instanceInfo[6]; - SchedulerJobInstanceStatus state = SchedulerJobInstanceStatus.valueOf((String) instanceInfo[7]); - long createdOn = (Long) instanceInfo[8]; - return new SchedulerJobInstanceInfo(id, jobId, sessionHandle, starttime, endtime, resultPath, query, state, - createdOn); + return parseSchedulerInstance(instanceInfo); + } + } + + private SchedulerJobInstanceInfo parseSchedulerInstance(Object[] instanceInfo) throws SQLException { + SchedulerJobInstanceHandle id = SchedulerJobInstanceHandle.fromString((String) instanceInfo[0]); + SchedulerJobHandle jobId = SchedulerJobHandle.fromString((String) instanceInfo[1]); + long createdOn = (Long) instanceInfo[2]; + // Get the Runs + String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_RUN_TABLE + " WHERE " + COLUMN_ID + "=?"; + List<Object[]> instanceRuns = runner.query(fetchSQL, multipleRowsHandler, (String) instanceInfo[0]); + List<SchedulerJobInstanceRun> runList = new ArrayList<>(); + for (Object[] run : instanceRuns) { + // run[0] will contain the instanceID + int runId = (Integer) run[1]; + LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) run[2]); + long starttime = (Long) run[3]; + long endtime = (Long) run[4]; + String resultPath = (String) run[5]; + String queryHandleString = (String) run[6]; + QueryHandle queryHandle = null; + if (!queryHandleString.isEmpty()) { + queryHandle = QueryHandle.fromString((String) run[6]); + } + SchedulerJobInstanceState instanceStatus = SchedulerJobInstanceState.valueOf((String) run[7]); + SchedulerJobInstanceRun instanceRun = new SchedulerJobInstanceRun(id, runId, sessionHandle, starttime, endtime, + resultPath, queryHandle, instanceStatus); + runList.add(instanceRun); } + return new SchedulerJobInstanceInfo(id, jobId, createdOn, runList); } /** - * Updates the state of a job instance. + * Updates the status of a job instance. * - * @param id - * @param state + * @param instanceRun * @return number of rows updated. * @throws SQLException */ - public int updateJobInstanceState(String id, String state) throws SQLException { - String updateSQL = "UPDATE " + JOB_INSTANCE_TABLE + " SET " + COLUMN_STATE + "=?" + " WHERE " + COLUMN_ID + "=?"; - return runner.update(updateSQL, state, id); + public int updateJobInstanceRun(SchedulerJobInstanceRun instanceRun) throws SQLException { + String updateSQL = + "UPDATE " + JOB_INSTANCE_RUN_TABLE + " SET " + COLUMN_END_TIME + "=?, " + COLUMN_RESULT_PATH + "=?, " + + COLUMN_QUERY_HANDLE + "=?, " + COLUMN_STATUS + "=?" + " WHERE " + COLUMN_ID + "=? AND " + COLUMN_RUN_ID + + "=?"; + + return runner.update(updateSQL, instanceRun.getEndTime(), instanceRun.getResultPath(), + instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(), + instanceRun.getInstanceState().name(), instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId()); } /** @@ -495,17 +598,33 @@ public class SchedulerDAO { * @return List of SchedulerJobInstanceHandle * @throws SQLException */ - public List<SchedulerJobInstanceHandle> getAllJobInstances(String jobId) throws SQLException { - String fetchSQL = "SELECT " + COLUMN_ID + " FROM " + JOB_INSTANCE_TABLE + " WHERE " + COLUMN_JOB_ID + "=?"; + public List<SchedulerJobInstanceInfo> getAllJobInstances(String jobId) throws SQLException { + String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_TABLE + " WHERE " + COLUMN_JOB_ID + "=?"; List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, jobId); - List<SchedulerJobInstanceHandle> resOut = new ArrayList<>(); + List<SchedulerJobInstanceInfo> resOut = new ArrayList<>(); for (int i = 0; i < result.size(); i++) { Object[] row = result.get(i); - resOut.add(SchedulerJobInstanceHandle.fromString((String) row[0])); + resOut.add(parseSchedulerInstance(row)); } return resOut; } + /** + * Get the user who submitted the job. + * + * @param id + * @return + * @throws SQLException + */ + public String getUser(String id) throws SQLException { + String fetchSQL = "SELECT " + COLUMN_USER + " FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; + List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, id); + if (result.size() == 0) { + return null; + } else { + return (String) result.get(0)[0]; + } + } } /** @@ -519,8 +638,9 @@ public class SchedulerDAO { public void createJobTable() throws SQLException { String createSQL = "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB - + " TEXT," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATE + " VARCHAR(20)," + COLUMN_CREATED_ON - + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")"; + + " TEXT," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON + + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( " + + COLUMN_ID + ")" + ")"; runner.update(createSQL); } @@ -528,13 +648,25 @@ public class SchedulerDAO { * {@inheritDoc} */ @Override - public void createJobInstaceTable() throws SQLException { + public void createJobInstanceTable() throws SQLException { String createSQL = "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " - + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " - + COLUMN_START_TIME + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " TEXT, " - + COLUMN_QUERY + " TEXT, " + COLUMN_STATE + " VARCHAR(20), " + COLUMN_CREATED_ON + " BIGINT, " - + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")"; + + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( " + + COLUMN_ID + ")" + ")"; + runner.update(createSQL); + } + + /** + * {@inheritDoc} + */ + @Override + public void createJobInstanceRunTable() throws SQLException { + String createSQL = + "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME + + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " TEXT, " + COLUMN_QUERY_HANDLE + + " TEXT, " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID + + ")" + ")"; runner.update(createSQL); } } @@ -550,8 +682,9 @@ public class SchedulerDAO { public void createJobTable() throws SQLException { String createSQL = "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB - + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATE + " VARCHAR(20)," + COLUMN_CREATED_ON - + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")"; + + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON + + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( " + + COLUMN_ID + ")" + ")"; runner.update(createSQL); } @@ -559,13 +692,25 @@ public class SchedulerDAO { * {@inheritDoc} */ @Override - public void createJobInstaceTable() throws SQLException { + public void createJobInstanceTable() throws SQLException { String createSQL = "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " - + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " - + COLUMN_START_TIME + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " VARCHAR(1024)," - + COLUMN_QUERY + " VARCHAR(1024), " + COLUMN_STATE + " VARCHAR(20), " + COLUMN_CREATED_ON + " BIGINT, " - + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")"; + + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( " + + COLUMN_ID + ")" + ")"; + runner.update(createSQL); + } + + /** + * {@inheritDoc} + */ + @Override + public void createJobInstanceRunTable() throws SQLException { + String createSQL = + "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME + + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " VARCHAR(1024)," + + COLUMN_QUERY_HANDLE + " VARCHAR(1024), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + + COLUMN_ID + ", " + COLUMN_RUN_ID + " )" + ")"; runner.update(createSQL); } }
http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java new file mode 100644 index 0000000..7323add --- /dev/null +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerEventListener.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.scheduler; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.lens.api.LensConf; +import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.error.InvalidStateTransitionException; +import org.apache.lens.api.query.QueryHandle; +import org.apache.lens.api.scheduler.*; +import org.apache.lens.server.LensServices; +import org.apache.lens.server.api.LensConfConstants; +import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.events.AsyncEventListener; +import org.apache.lens.server.api.events.SchedulerAlarmEvent; +import org.apache.lens.server.api.query.QueryExecutionService; +import org.apache.lens.server.api.scheduler.SchedulerService; +import org.apache.lens.server.query.QueryExecutionServiceImpl; +import org.apache.lens.server.util.UtilityMethods; + +import org.joda.time.DateTime; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SchedulerEventListener extends AsyncEventListener<SchedulerAlarmEvent> { + private static final int CORE_POOL_SIZE = 10; + private static final String JOB_INSTANCE_ID_KEY = "job_instance_key"; + @Getter + @Setter + @VisibleForTesting + protected QueryExecutionService queryService; + private SchedulerDAO schedulerDAO; + private SchedulerService schedulerService; + + public SchedulerEventListener(SchedulerDAO schedulerDAO) { + super(CORE_POOL_SIZE); + this.queryService = LensServices.get().getService(QueryExecutionService.NAME); + this.schedulerService = LensServices.get().getService(SchedulerService.NAME); + this.schedulerDAO = schedulerDAO; + } + + /** + * @param event the event + */ + @Override + public void process(SchedulerAlarmEvent event) { + DateTime scheduledTime = event.getNominalTime(); + SchedulerJobHandle jobHandle = event.getJobHandle(); + if (event.getType() == SchedulerAlarmEvent.EventType.EXPIRE) { + try { + schedulerService.expireJob(null, jobHandle); + } catch (LensException e) { + log.error("Error while expiring the job", e); + } + return; + } + /* + * Get the job from the store. + * Create an instance. + * Store the instance. + * Try to run the instance. + * If successfully submitted change the status to running. + * Otherwise update the status to killed. + */ + //TODO: Get the job status and if it is not Scheduled, don't do anything. + XJob job = schedulerDAO.getJob(jobHandle); + String user = schedulerDAO.getUser(jobHandle); + SchedulerJobInstanceHandle instanceHandle = event.getPreviousInstance() == null + ? UtilityMethods.generateSchedulerJobInstanceHandle() + : event.getPreviousInstance(); + Map<String, String> conf = new HashMap<>(); + LensSessionHandle sessionHandle = null; + try { + // Open the session with the user who submitted the job. + sessionHandle = ((QueryExecutionServiceImpl) LensServices.get().getService(QueryExecutionServiceImpl.NAME)) + .openSession(user, "dummy", conf, false); + } catch (LensException e) { + log.error("Error occurred while opening a session ", e); + return; + } + SchedulerJobInstanceInfo instance = null; + SchedulerJobInstanceRun run = null; + // Session needs to be closed after the launch. + try { + long scheduledTimeMillis = scheduledTime.getMillis(); + String query = job.getExecution().getQuery().getQuery(); + List<MapType> jobConf = job.getExecution().getQuery().getConf(); + LensConf queryConf = new LensConf(); + for (MapType element : jobConf) { + queryConf.addProperty(element.getKey(), element.getValue()); + } + queryConf.addProperty(JOB_INSTANCE_ID_KEY, instanceHandle.getHandleId()); + // Current time is used for resolving date. + queryConf.addProperty(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, scheduledTime.getMillis()); + String queryName = job.getName(); + queryName += "-" + scheduledTime.getMillis(); + // If the instance is new then create otherwise get from the store + if (event.getPreviousInstance() == null) { + instance = new SchedulerJobInstanceInfo(instanceHandle, jobHandle, scheduledTimeMillis, + new ArrayList<SchedulerJobInstanceRun>()); + } else { + instance = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle); + } + // Next run of the instance + long currentTime = System.currentTimeMillis(); + run = new SchedulerJobInstanceRun(instanceHandle, instance.getInstanceRunList().size() + 1, sessionHandle, + currentTime, 0, "N/A", null, SchedulerJobInstanceState.WAITING); + instance.getInstanceRunList().add(run); + boolean success; + if (event.getPreviousInstance() == null) { + success = schedulerDAO.storeJobInstance(instance) == 1; + if (!success) { + log.error( + "Exception occurred while storing the instance for instance handle " + instance + " of job " + jobHandle); + return; + } + } + success = schedulerDAO.storeJobInstanceRun(run) == 1; + if (!success) { + log.error( + "Exception occurred while storing the instance for instance handle " + instance + " of job " + jobHandle); + return; + } + + QueryHandle handle = queryService.executeAsync(sessionHandle, query, queryConf, queryName); + run.setQueryHandle(handle); + run.setInstanceState(run.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_RUN)); + run.setEndTime(System.currentTimeMillis()); + schedulerDAO.updateJobInstanceRun(run); + } catch (LensException | InvalidStateTransitionException e) { + log.error( + "Exception occurred while launching the job instance for " + jobHandle + " for nominal time " + scheduledTime + .getMillis(), e); + try { + run.setInstanceState(run.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_FAILURE)); + run.setEndTime(System.currentTimeMillis()); + schedulerDAO.updateJobInstanceRun(run); + } catch (InvalidStateTransitionException e1) { + log.error("Can't make transition for instance " + instance.getId() + " of job " + instance.getJobId(), e); + } + } finally { + try { + ((QueryExecutionServiceImpl) LensServices.get().getService(QueryExecutionServiceImpl.NAME)) + .closeSession(sessionHandle); + } catch (LensException e) { + log.error("Error closing session ", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java new file mode 100644 index 0000000..5b12720 --- /dev/null +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.scheduler; + +import java.util.List; + +import org.apache.lens.api.error.InvalidStateTransitionException; +import org.apache.lens.api.query.QueryStatus; +import org.apache.lens.api.scheduler.*; +import org.apache.lens.server.api.events.AsyncEventListener; +import org.apache.lens.server.api.query.QueryContext; +import org.apache.lens.server.api.query.QueryEnded; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SchedulerQueryEventListener extends AsyncEventListener<QueryEnded> { + private static final String JOB_INSTANCE_ID_KEY = "job_instance_key"; + private static final int CORE_POOL_SIZE = 10; + private SchedulerDAO schedulerDAO; + + public SchedulerQueryEventListener(SchedulerDAO schedulerDAO) { + super(CORE_POOL_SIZE); + this.schedulerDAO = schedulerDAO; + } + + @Override + public void process(QueryEnded event) { + if (event.getCurrentValue() == QueryStatus.Status.CLOSED) { + return; + } + QueryContext queryContext = event.getQueryContext(); + if (queryContext == null) { + log.warn("Could not find the context for {} for event:{}.", event.getQueryHandle(), event.getCurrentValue()); + return; + } + String instanceHandle = queryContext.getConf().get(JOB_INSTANCE_ID_KEY); + if (instanceHandle == null) { + // Nothing to do + return; + } + SchedulerJobInstanceInfo info = schedulerDAO + .getSchedulerJobInstanceInfo(SchedulerJobInstanceHandle.fromString(instanceHandle)); + List<SchedulerJobInstanceRun> runList = info.getInstanceRunList(); + if (runList.size() == 0) { + log.error("No instance run for " + instanceHandle + " with query " + queryContext.getQueryHandle()); + return; + } + SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1); + SchedulerJobInstanceState state = latestRun.getInstanceState(); + try { + switch (event.getCurrentValue()) { + case CANCELED: + state = state.nextTransition(SchedulerJobInstanceEvent.ON_KILL); + break; + case SUCCESSFUL: + state = state.nextTransition(SchedulerJobInstanceEvent.ON_SUCCESS); + break; + case FAILED: + state = state.nextTransition(SchedulerJobInstanceEvent.ON_FAILURE); + break; + } + latestRun.setEndTime(System.currentTimeMillis()); + latestRun.setInstanceState(state); + latestRun.setResultPath(queryContext.getDriverResultPath()); + schedulerDAO.updateJobInstanceRun(latestRun); + } catch (InvalidStateTransitionException e) { + log.error("Instance Transition Failed ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java index 3952671..14ca32d 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerServiceImpl.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,43 +20,83 @@ package org.apache.lens.server.scheduler; import java.util.Collection; import java.util.List; -import java.util.UUID; +import org.apache.lens.api.LensConf; import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.error.InvalidStateTransitionException; +import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.scheduler.*; +import org.apache.lens.cube.parse.CubeQueryConfUtil; import org.apache.lens.server.BaseLensService; -import org.apache.lens.server.LensServerConf; +import org.apache.lens.server.LensServices; import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.events.SchedulerAlarmEvent; import org.apache.lens.server.api.health.HealthStatus; +import org.apache.lens.server.api.query.QueryEnded; +import org.apache.lens.server.api.query.QueryExecutionService; import org.apache.lens.server.api.scheduler.SchedulerService; import org.apache.lens.server.session.LensSessionImpl; +import org.apache.lens.server.util.UtilityMethods; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hive.service.cli.CLIService; +import org.joda.time.DateTime; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + /** * This class handles all the scheduler operations. */ +@Slf4j public class SchedulerServiceImpl extends BaseLensService implements SchedulerService { - // get the state store - private SchedulerDAO schedulerDAO; + @Getter + @Setter + @VisibleForTesting + protected QueryExecutionService queryService; + @Getter + @VisibleForTesting + protected SchedulerEventListener schedulerEventListener; + @Getter + @VisibleForTesting + protected SchedulerQueryEventListener schedulerQueryEventListener; + @Getter + @VisibleForTesting + protected SchedulerDAO schedulerDAO; + private AlarmService alarmService; - private LensScheduler scheduler; /** - * The constant name for scheduler service. + * Instantiates a new scheduler service. + * + * @param cliService the cli service */ - public static final String NAME = "scheduler"; - public SchedulerServiceImpl(CLIService cliService) throws LensException { super(NAME, cliService); - this.schedulerDAO = new SchedulerDAO(LensServerConf.getHiveConf()); - this.scheduler = LensScheduler.get(); } - public SchedulerServiceImpl(CLIService cliService, SchedulerDAO schedulerDAO) { - super(NAME, cliService); - this.schedulerDAO = schedulerDAO; - this.scheduler = LensScheduler.get(); + public synchronized void init(HiveConf hiveConf) { + super.init(hiveConf); + try { + schedulerDAO = new SchedulerDAO(hiveConf); + alarmService = LensServices.get().getService(AlarmService.NAME); + queryService = LensServices.get().getService(QueryExecutionService.NAME); + // Get the listeners' classes from the configuration. + this.schedulerEventListener = new SchedulerEventListener(schedulerDAO); + this.schedulerQueryEventListener = new SchedulerQueryEventListener(schedulerDAO); + getEventService().addListenerForType(schedulerEventListener, SchedulerAlarmEvent.class); + getEventService().addListenerForType(schedulerQueryEventListener, QueryEnded.class); + } catch (LensException e) { + log.error("Error Initialising Scheduler-service", e); + } + } + + @Override + public synchronized void start() { + super.start(); } /** @@ -65,8 +105,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe @Override public HealthStatus getHealthStatus() { return this.getServiceState().equals(STATE.STARTED) - ? new HealthStatus(true, "Scheduler service is healthy.") - : new HealthStatus(false, "Scheduler service is down."); + ? new HealthStatus(true, "Scheduler service is healthy.") + : new HealthStatus(false, "Scheduler service is down."); } /** @@ -74,27 +114,57 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public SchedulerJobHandle submitJob(LensSessionHandle sessionHandle, XJob job) throws LensException { - //TBD place holder code LensSessionImpl session = getSession(sessionHandle); - return null; + // Validate XJob + validateJob(job); + SchedulerJobHandle handle = UtilityMethods.generateSchedulerJobHandle(); + long createdOn = System.currentTimeMillis(); + long modifiedOn = createdOn; + SchedulerJobInfo info = new SchedulerJobInfo(handle, job, session.getLoggedInUser(), SchedulerJobState.NEW, + createdOn, modifiedOn); + if (schedulerDAO.storeJob(info) == 1) { + return handle; + } else { + throw new LensException("Could not Submit the job"); + } + } + + private void validateJob(XJob job) throws LensException { } /** * {@inheritDoc} */ @Override - public void scheduleJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { - //TBD place holder code - // send the schedule request to the scheduler. - UUID externalID = jobHandle.getHandleId(); - // get the job from the database and schedule + public boolean scheduleJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + SchedulerJobInfo jobInfo = schedulerDAO.getSchedulerJobInfo(jobHandle); + XJob job = jobInfo.getJob(); + DateTime start = new DateTime(job.getStartTime().toGregorianCalendar().getTime()); + DateTime end = new DateTime(job.getEndTime().toGregorianCalendar().getTime()); + XFrequency frequency = job.getTrigger().getFrequency(); + // check query + checkQuery(sessionHandle, job); + alarmService.schedule(start, end, frequency, jobHandle.getHandleIdString()); + return setStateOfJob(jobHandle, SchedulerJobEvent.ON_SCHEDULE) == 1; + } + + private void checkQuery(LensSessionHandle sessionHandle, XJob job) throws LensException { + List<MapType> jobConf = job.getExecution().getQuery().getConf(); + LensConf queryConf = new LensConf(); + for (MapType element : jobConf) { + queryConf.addProperty(element.getKey(), element.getValue()); + } + queryConf.addProperty(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, false); + queryService.estimate(LensServices.get().getLogSegregationContext().getLogSegragationId(), sessionHandle, + job.getExecution().getQuery().getQuery(), queryConf); + return; } @Override public SchedulerJobHandle submitAndScheduleJob(LensSessionHandle sessionHandle, XJob job) throws LensException { - //TBD place holder code - // take job, validate it, submit it(check duplicate, persist it), schedule it. - return null; + SchedulerJobHandle handle = submitJob(sessionHandle, job); + scheduleJob(sessionHandle, handle); + return handle; } /** @@ -102,39 +172,42 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public XJob getJobDefinition(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { - //TBD place holder code - // get the job definition from the persisted store, return it. - return null; + return schedulerDAO.getJob(jobHandle); } /** * {@inheritDoc} */ @Override - public SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, - SchedulerJobHandle jobHandle) throws LensException { - //TBD place holder code - return null; + public SchedulerJobInfo getJobDetails(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) + throws LensException { + return schedulerDAO.getSchedulerJobInfo(jobHandle); } /** * {@inheritDoc} */ @Override - public boolean updateJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, - XJob newJobDefinition) throws LensException { - //TBD place holder code - XJob job = schedulerDAO.getJob(jobHandle); - return false; + public boolean updateJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, XJob newJobDefinition) + throws LensException { + SchedulerJobInfo jobInfo = schedulerDAO.getSchedulerJobInfo(jobHandle); + // This will allow only the job definition and configuration change. + // TODO: fix start and end time changes + jobInfo.setJob(newJobDefinition); + jobInfo.setModifiedOn(System.currentTimeMillis()); + int updated = schedulerDAO.updateJob(jobInfo); + return updated > 0; } /** - * * {@inheritDoc} */ @Override - public void expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { - //TBD place holder code + public boolean expireJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { + if (alarmService.checkExists(jobHandle)) { + alarmService.unSchedule(jobHandle); + } + return setStateOfJob(jobHandle, SchedulerJobEvent.ON_EXPIRE) == 1; } /** @@ -142,8 +215,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public boolean suspendJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { - //TBD place holder code - return false; + alarmService.pauseJob(jobHandle); + return setStateOfJob(jobHandle, SchedulerJobEvent.ON_SUSPEND) == 1; } /** @@ -151,8 +224,8 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public boolean resumeJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { - // TBD place holder code - return false; + alarmService.resumeJob(jobHandle); + return setStateOfJob(jobHandle, SchedulerJobEvent.ON_RESUME) == 1; } /** @@ -160,20 +233,18 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public boolean deleteJob(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle) throws LensException { - // TBD place holder code - // it should only be a soft delete. Later on we will make a purge service and that service will delete - // all the soft delete things. - return false; + if (alarmService.checkExists(jobHandle)) { + alarmService.unSchedule(jobHandle); + } + return setStateOfJob(jobHandle, SchedulerJobEvent.ON_DELETE) == 1; } /** * {@inheritDoc} */ @Override - public Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, String state, String userName, - long startTime, long endTime) throws LensException { - // TBD place holder code - // validate that the state is a valid state (enum) + public Collection<SchedulerJobStats> getAllJobStats(LensSessionHandle sessionHandle, String state, String user, + String jobName, long startTime, long endTime) throws LensException { return null; } @@ -182,9 +253,7 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe */ @Override public SchedulerJobStats getJobStats(LensSessionHandle sessionHandle, SchedulerJobHandle handle, String state, - long startTime, long endTime) throws LensException { - // TBD place holder code - // validate that the state is a valid state (enum) + long startTime, long endTime) throws LensException { return null; } @@ -192,39 +261,78 @@ public class SchedulerServiceImpl extends BaseLensService implements SchedulerSe * {@inheritDoc} */ @Override - public boolean rerunInstance(LensSessionHandle sessionHandle, - SchedulerJobInstanceHandle instanceHandle) throws LensException { - //TBD place holder code - return false; + public boolean rerunInstance(LensSessionHandle sessionHandle, SchedulerJobInstanceHandle instanceHandle) + throws LensException { + SchedulerJobInstanceInfo instanceInfo = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle); + if (schedulerDAO.getJobState(instanceInfo.getJobId()) != SchedulerJobState.SCHEDULED) { + throw new LensException("Job with handle " + instanceInfo.getJobId() + " is not scheduled"); + } + // Get the latest run. + List<SchedulerJobInstanceRun> runList = instanceInfo.getInstanceRunList(); + if (runList.size() == 0) { + throw new LensException("Job instance " + instanceHandle + " is not yet run"); + } + SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1); + // This call is for the test that it can be re run. + try { + latestRun.getInstanceState().nextTransition(SchedulerJobInstanceEvent.ON_RERUN); + getEventService().notifyEvent( + new SchedulerAlarmEvent(instanceInfo.getJobId(), new DateTime(instanceInfo.getScheduleTime()), + SchedulerAlarmEvent.EventType.SCHEDULE, instanceHandle)); + } catch (InvalidStateTransitionException e) { + throw new LensException("Invalid State Transition ", e); + } + return true; } /** * {@inheritDoc} */ @Override - public List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, - SchedulerJobHandle jobHandle, Long numResults) throws LensException { - // TBD place holder code - // By default return 100 results - make it configurable - return null; + public List<SchedulerJobInstanceInfo> getJobInstances(LensSessionHandle sessionHandle, SchedulerJobHandle jobHandle, + Long numResults) throws LensException { + return schedulerDAO.getJobInstances(jobHandle); } @Override - public boolean killInstance(LensSessionHandle sessionHandle, - SchedulerJobInstanceHandle instanceHandle) throws LensException { - // TBD place holder code - return true; + public boolean killInstance(LensSessionHandle sessionHandle, SchedulerJobInstanceHandle instanceHandle) + throws LensException { + /** + * Get the query handle from the latest run. + */ + SchedulerJobInstanceInfo instanceInfo = schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle); + List<SchedulerJobInstanceRun> runList = instanceInfo.getInstanceRunList(); + if (runList.size() == 0) { + throw new LensException("Job instance " + instanceHandle + " is not yet run"); + } + SchedulerJobInstanceRun latestRun = runList.get(runList.size() - 1); + QueryHandle handle = latestRun.getQueryHandle(); + if (handle.getHandleIdString().isEmpty()) { + return false; + } + // This will cause the QueryEnd event which will set the status of the instance to KILLED. + return queryService.cancelQuery(sessionHandle, handle); } - /** * {@inheritDoc} */ @Override public SchedulerJobInstanceInfo getInstanceDetails(LensSessionHandle sessionHandle, - SchedulerJobInstanceHandle instanceHandle) throws LensException { - // TBD place holder code - return null; + SchedulerJobInstanceHandle instanceHandle) throws LensException { + return schedulerDAO.getSchedulerJobInstanceInfo(instanceHandle); } + private int setStateOfJob(SchedulerJobHandle handle, SchedulerJobEvent event) throws LensException { + try { + SchedulerJobInfo info = schedulerDAO.getSchedulerJobInfo(handle); + SchedulerJobState currentState = info.getJobState(); + SchedulerJobState nextState = currentState.nextTransition(event); + info.setJobState(nextState); + info.setModifiedOn(System.currentTimeMillis()); + return schedulerDAO.updateJobStatus(info); + } catch (InvalidStateTransitionException e) { + throw new LensException("Invalid state ", e); + } + } } http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java deleted file mode 100644 index a4cdd83..0000000 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/notification/services/AlarmService.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.lens.server.scheduler.notification.services; - -import org.apache.lens.api.scheduler.SchedulerJobHandle; -import org.apache.lens.api.scheduler.XFrequency; -import org.apache.lens.api.scheduler.XFrequencyEnum; -import org.apache.lens.server.LensServices; -import org.apache.lens.server.api.LensService; -import org.apache.lens.server.api.error.LensException; -import org.apache.lens.server.api.events.LensEventService; -import org.apache.lens.server.api.events.SchedulerAlarmEvent; -import org.apache.lens.server.api.health.HealthStatus; - -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hive.service.AbstractService; - -import org.joda.time.DateTime; -import org.quartz.*; -import org.quartz.impl.StdSchedulerFactory; - -import lombok.extern.slf4j.Slf4j; - -/** - * This service is used primarily by Scheduler to get alarm notifications for scheduled queries. - * - * As a schedule this service accepts start time, frequency, end time and timeZone. It also requires the - * {@link SchedulerJobHandle} which it sends as part of the - * {@link org.apache.lens.server.api.events.SchedulerAlarmEvent} to inform the scheduler about the job for which - * job the notification has been generated. - */ -@Slf4j -public class AlarmService extends AbstractService implements LensService { - - public static final String NAME = "alarm-service"; - - private Scheduler scheduler; - - /** - * True if the service started properly and is running fine, false otherwise. - */ - private boolean isHealthy = true; - - /** - * Contains the reason if service is not healthy. - */ - private String healthCause; - - /** - * Creates a new instance of AlarmService. - * - * @param name the name - */ - public AlarmService(String name) { - super(name); - } - - @Override - public HealthStatus getHealthStatus() { - return isHealthy - ? new HealthStatus(isHealthy, "Alarm service is healthy.") - : new HealthStatus(isHealthy, healthCause); - } - - public synchronized void init(HiveConf hiveConf) { - super.init(hiveConf); - try { - this.scheduler = StdSchedulerFactory.getDefaultScheduler(); - } catch (SchedulerException e) { - isHealthy = false; - healthCause = "Failed to initialize the Quartz Scheduler for AlarmService."; - log.error(healthCause, e); - } - } - - public synchronized void start() { - try { - scheduler.start(); - log.info("Alarm service started successfully!"); - } catch (SchedulerException e) { - isHealthy = false; - healthCause = "Failed to start the Quartz Scheduler for AlarmService."; - log.error(healthCause, e); - } - } - - @Override - public synchronized void stop() { - try { - scheduler.shutdown(); - log.info("Alarm Service stopped successfully."); - } catch (SchedulerException e) { - log.error("Failed to shut down the Quartz Scheduler for AlarmService.", e); - } - } - - /** - * This method can be used by any consumer who wants to receive notifications during a time range at a given - * frequency. - * - * This method is intended to be used by LensScheduler to subscribe for time based notifications to schedule queries. - * On receiving a job to be scheduled LensScheduler will subscribe to all triggers required for the job, including - * AlarmService for time based triggers. - * - * @param start start time for notifications - * @param end end time for notifications - * @param frequency Frequency to determine the frequency at which notification should be sent. - * @param jobHandle Must be a unique jobHanlde across all consumers - */ - public void schedule(DateTime start, DateTime end, XFrequency frequency, String jobHandle) throws LensException { - // accept the schedule and then keep on sending the notifications for that schedule - JobDataMap map = new JobDataMap(); - map.put("jobHandle", jobHandle); - - JobDetail job = JobBuilder.newJob(LensJob.class).withIdentity(jobHandle, "LensJobs") - .usingJobData(map).build(); - - Trigger trigger; - if (frequency.getEnum() != null) { //for enum expression: create a trigger using calendar interval - CalendarIntervalScheduleBuilder scheduleBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule() - .withInterval(getTimeInterval(frequency.getEnum()), getTimeUnit(frequency.getEnum())) - .withMisfireHandlingInstructionIgnoreMisfires(); - trigger = TriggerBuilder.newTrigger() - .withIdentity(jobHandle, "AlarmService") - .startAt(start.toDate()) - .endAt(end.toDate()) - .withSchedule(scheduleBuilder) - .build(); - } else { // for cron expression create a cron trigger - trigger = TriggerBuilder.newTrigger() - .withIdentity(jobHandle, "AlarmService") - .withSchedule(CronScheduleBuilder.cronSchedule(frequency.getCronExpression())) - .build(); - } - - // Tell quartz to run the job using our trigger - try { - scheduler.scheduleJob(job, trigger); - } catch (SchedulerException e) { - log.error("Error scheduling job with jobHandle: {}", jobHandle); - throw new LensException("Failed to schedule job with jobHandle: " + jobHandle, e); - } - } - - - private int getTimeInterval(XFrequencyEnum frequencyEnum) { - // since quarterly is not supported natively, we express it as 3 months - return frequencyEnum == XFrequencyEnum.QUARTERLY ? 3 : 1; - } - - - // Maps the timeunit in entity specification to the one in Quartz DateBuilder - private DateBuilder.IntervalUnit getTimeUnit(XFrequencyEnum frequencyEnum) { - switch (frequencyEnum) { - - case DAILY: - return DateBuilder.IntervalUnit.DAY; - - case WEEKLY: - return DateBuilder.IntervalUnit.WEEK; - - case MONTHLY: - return DateBuilder.IntervalUnit.MONTH; - - case QUARTERLY: - return DateBuilder.IntervalUnit.MONTH; - - case YEARLY: - return DateBuilder.IntervalUnit.YEAR; - - default: - throw new IllegalArgumentException("Invalid frequency enum expression: " + frequencyEnum.name()); - } - } - - public boolean unSchedule(SchedulerJobHandle jobHandle) throws LensException { - // stop sending notifications for this job handle - try { - return scheduler.deleteJob(JobKey.jobKey(jobHandle.getHandleIdString(), "LensScheduler")); - } catch (SchedulerException e) { - log.error("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e); - throw new LensException("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e); - } - } - - public static class LensJob implements Job { - - @Override - public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { - JobDataMap data = jobExecutionContext.getMergedJobDataMap(); - DateTime nominalTime = new DateTime(jobExecutionContext.getScheduledFireTime()); - SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString("jobHandle")); - SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, nominalTime); - try { - LensEventService eventService = LensServices.get().getService(LensEventService.NAME); - eventService.notifyEvent(alarmEvent); - } catch (LensException e) { - log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and nominalTime: {}", - jobHandle.getHandleIdString(), nominalTime.toString(), e); - throw new JobExecutionException("Failed to notify alarmEvent", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobInstanceState.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobInstanceState.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobInstanceState.java deleted file mode 100644 index 95057e4..0000000 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobInstanceState.java +++ /dev/null @@ -1,193 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.server.scheduler.state; - -import org.apache.lens.api.scheduler.SchedulerJobInstanceStatus; -import org.apache.lens.server.api.error.InvalidStateTransitionException; -import org.apache.lens.server.api.scheduler.StateMachine; - -import lombok.Getter; -import lombok.Setter; - -/** - * State machine for transitions on Scheduler Jobs. - */ -public class SchedulerJobInstanceState { - - public SchedulerJobInstanceState(SchedulerJobInstanceStatus status) { - this.currentStatus = status; - } - - public SchedulerJobInstanceState() { - this.currentStatus = INITIAL_STATUS; - } - - @Getter @Setter - private SchedulerJobInstanceStatus currentStatus; - - private static final SchedulerJobInstanceStatus INITIAL_STATUS = SchedulerJobInstanceStatus.WAITING; - - public SchedulerJobInstanceStatus nextTransition(EVENT event) throws InvalidStateTransitionException { - STATE currentState = STATE.valueOf(currentStatus.name()); - return SchedulerJobInstanceStatus.valueOf(currentState.nextTransition(event).name()); - } - - public enum STATE implements StateMachine<STATE, EVENT> { - // repeating same operation will return the same state to ensure idempotent behavior. - WAITING { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_CREATION: - return this; - case ON_CONDITIONS_MET: - return STATE.LAUNCHED; - case ON_TIME_OUT: - return STATE.TIMED_OUT; - case ON_RUN: - return STATE.RUNNING; - case ON_SUCCESS: - return STATE.SUCCEEDED; - case ON_FAILURE: - return STATE.FAILED; - case ON_KILL: - return STATE.KILLED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - LAUNCHED { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_CONDITIONS_MET: - return this; - case ON_RUN: - return STATE.RUNNING; - case ON_SUCCESS: - return STATE.SUCCEEDED; - case ON_FAILURE: - return STATE.FAILED; - case ON_KILL: - return STATE.KILLED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - RUNNING { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_RUN: - return this; - case ON_SUCCESS: - return STATE.SUCCEEDED; - case ON_FAILURE: - return STATE.FAILED; - case ON_KILL: - return STATE.KILLED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - FAILED { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_FAILURE: - return this; - case ON_RERUN: - return STATE.LAUNCHED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - SUCCEEDED { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_SUCCESS: - return this; - case ON_RERUN: - return STATE.LAUNCHED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - - TIMED_OUT { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_TIME_OUT: - return this; - case ON_RERUN: - return STATE.WAITING; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - KILLED { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_KILL: - return this; - case ON_RERUN: - return STATE.LAUNCHED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - } - } - - /** - * All events(actions) which can happen on an instance of <code>SchedulerJob</code>. - */ - public enum EVENT { - ON_CREATION, // an instance is first considered by the scheduler. - ON_TIME_OUT, - ON_CONDITIONS_MET, - ON_RUN, - ON_SUCCESS, - ON_FAILURE, - ON_RERUN, - ON_KILL - } -} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobState.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobState.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobState.java deleted file mode 100644 index d21cd05..0000000 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/state/SchedulerJobState.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.server.scheduler.state; - -import org.apache.lens.api.scheduler.SchedulerJobStatus; -import org.apache.lens.server.api.error.InvalidStateTransitionException; -import org.apache.lens.server.api.scheduler.StateMachine; - -import lombok.Getter; -import lombok.Setter; - -/** - * This class represents current state of a SchedulerJob and provides helper methods - * for handling different events and lifecycle transition for a SchedulerJob. - */ -public class SchedulerJobState { - - public SchedulerJobState(SchedulerJobStatus status) { - this.currentStatus = status; - } - public SchedulerJobState() { - this.currentStatus = INITIAL_STATUS; - } - - @Getter @Setter - private SchedulerJobStatus currentStatus; - - private static final SchedulerJobStatus INITIAL_STATUS = SchedulerJobStatus.NEW; - - public SchedulerJobStatus nextTransition(EVENT event) throws InvalidStateTransitionException { - STATE currentState = STATE.valueOf(currentStatus.name()); - return SchedulerJobStatus.valueOf(currentState.nextTransition(event).name()); - } - - private enum STATE implements StateMachine<STATE, EVENT> { - // repeating same operation will return the same state to ensure idempotent behavior. - NEW { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_SUBMIT: - return this; - case ON_SCHEDULE: - return STATE.SCHEDULED; - case ON_EXPIRE: - return STATE.EXPIRED; - case ON_DELETE: - return STATE.DELETED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - SCHEDULED { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_SCHEDULE: - return this; - case ON_SUSPEND: - return STATE.SUSPENDED; - case ON_EXPIRE: - return STATE.EXPIRED; - case ON_DELETE: - return STATE.DELETED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - SUSPENDED { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_SUSPEND: - return this; - case ON_RESUME: - return STATE.SCHEDULED; - case ON_EXPIRE: - return STATE.EXPIRED; - case ON_DELETE: - return STATE.DELETED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - EXPIRED { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_EXPIRE: - return this; - case ON_DELETE: - return STATE.DELETED; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - }, - - DELETED { - @Override - public STATE nextTransition(EVENT event) throws InvalidStateTransitionException { - switch (event) { - case ON_DELETE: - return this; - default: - throw new InvalidStateTransitionException("Event: " + event.name() + " is not a valid event for state: " - + this.name()); - } - } - } - } - - /** - * All events(actions) which can happen on a Scheduler Job. - */ - public enum EVENT { - ON_SUBMIT, - ON_SCHEDULE, - ON_SUSPEND, - ON_RESUME, - ON_EXPIRE, - ON_DELETE - } -} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java deleted file mode 100644 index 31783ad..0000000 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.server.scheduler.util; - -import org.apache.lens.server.api.LensConfConstants; - -import org.apache.commons.dbcp.BasicDataSource; -import org.apache.hadoop.conf.Configuration; - -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class UtilityMethods { - private UtilityMethods() { - - } - - /** - * - * @param conf - * @return - */ - public static BasicDataSource getDataSourceFromConf(Configuration conf) { - BasicDataSource basicDataSource = new BasicDataSource(); - basicDataSource.setDriverClassName( - conf.get(LensConfConstants.SERVER_DB_DRIVER_NAME, LensConfConstants.DEFAULT_SERVER_DB_DRIVER_NAME)); - basicDataSource - .setUrl(conf.get(LensConfConstants.SERVER_DB_JDBC_URL, LensConfConstants.DEFAULT_SERVER_DB_JDBC_URL)); - basicDataSource - .setUsername(conf.get(LensConfConstants.SERVER_DB_JDBC_USER, LensConfConstants.DEFAULT_SERVER_DB_USER)); - basicDataSource - .setPassword(conf.get(LensConfConstants.SERVER_DB_JDBC_PASS, LensConfConstants.DEFAULT_SERVER_DB_PASS)); - basicDataSource.setDefaultAutoCommit(true); - return basicDataSource; - } -} http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java index db53924..3003ab7 100644 --- a/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/session/LensSessionImpl.java @@ -57,7 +57,7 @@ import lombok.extern.slf4j.Slf4j; * The Class LensSessionImpl. */ @Slf4j -public class LensSessionImpl extends HiveSessionImpl { +public class LensSessionImpl extends HiveSessionImpl implements AutoCloseable { /** The persist info. */ private LensSessionPersistInfo persistInfo = new LensSessionPersistInfo(); http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java b/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java index 63a7874..42d9fe0 100644 --- a/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java +++ b/lens-server/src/main/java/org/apache/lens/server/util/UtilityMethods.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,9 +26,12 @@ import java.security.NoSuchAlgorithmException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Map; +import java.util.UUID; import javax.sql.DataSource; +import org.apache.lens.api.scheduler.SchedulerJobHandle; +import org.apache.lens.api.scheduler.SchedulerJobInstanceHandle; import org.apache.lens.server.api.LensConfConstants; import org.apache.commons.dbcp.*; @@ -41,7 +44,6 @@ import org.apache.hadoop.io.WritableUtils; import lombok.extern.slf4j.Slf4j; - /** * The Class UtilityMethods. */ @@ -138,30 +140,29 @@ public final class UtilityMethods { */ public static BasicDataSource getDataSourceFromConf(Configuration conf) { BasicDataSource tmp = new BasicDataSource(); - tmp.setDriverClassName(conf.get(LensConfConstants.SERVER_DB_DRIVER_NAME, - LensConfConstants.DEFAULT_SERVER_DB_DRIVER_NAME)); + tmp.setDriverClassName( + conf.get(LensConfConstants.SERVER_DB_DRIVER_NAME, LensConfConstants.DEFAULT_SERVER_DB_DRIVER_NAME)); tmp.setUrl(conf.get(LensConfConstants.SERVER_DB_JDBC_URL, LensConfConstants.DEFAULT_SERVER_DB_JDBC_URL)); tmp.setUsername(conf.get(LensConfConstants.SERVER_DB_JDBC_USER, LensConfConstants.DEFAULT_SERVER_DB_USER)); tmp.setPassword(conf.get(LensConfConstants.SERVER_DB_JDBC_PASS, LensConfConstants.DEFAULT_SERVER_DB_PASS)); - tmp.setValidationQuery(conf.get(LensConfConstants.SERVER_DB_VALIDATION_QUERY, - LensConfConstants.DEFAULT_SERVER_DB_VALIDATION_QUERY)); + tmp.setValidationQuery( + conf.get(LensConfConstants.SERVER_DB_VALIDATION_QUERY, LensConfConstants.DEFAULT_SERVER_DB_VALIDATION_QUERY)); tmp.setDefaultAutoCommit(false); return tmp; } public static DataSource getPoolingDataSourceFromConf(Configuration conf) { final ConnectionFactory cf = new DriverManagerConnectionFactory( - conf.get(LensConfConstants.SERVER_DB_JDBC_URL, LensConfConstants.DEFAULT_SERVER_DB_JDBC_URL), - conf.get(LensConfConstants.SERVER_DB_JDBC_USER, LensConfConstants.DEFAULT_SERVER_DB_USER), - conf.get(LensConfConstants.SERVER_DB_JDBC_PASS, LensConfConstants.DEFAULT_SERVER_DB_PASS)); + conf.get(LensConfConstants.SERVER_DB_JDBC_URL, LensConfConstants.DEFAULT_SERVER_DB_JDBC_URL), + conf.get(LensConfConstants.SERVER_DB_JDBC_USER, LensConfConstants.DEFAULT_SERVER_DB_USER), + conf.get(LensConfConstants.SERVER_DB_JDBC_PASS, LensConfConstants.DEFAULT_SERVER_DB_PASS)); final GenericObjectPool connectionPool = new GenericObjectPool(); connectionPool.setTestOnBorrow(false); connectionPool.setTestOnReturn(false); connectionPool.setTestWhileIdle(true); - new PoolableConnectionFactory(cf, connectionPool, null - , conf.get(LensConfConstants.SERVER_DB_VALIDATION_QUERY, - LensConfConstants.DEFAULT_SERVER_DB_VALIDATION_QUERY), false, false) - .setDefaultAutoCommit(true); + new PoolableConnectionFactory(cf, connectionPool, null, + conf.get(LensConfConstants.SERVER_DB_VALIDATION_QUERY, LensConfConstants.DEFAULT_SERVER_DB_VALIDATION_QUERY), + false, false).setDefaultAutoCommit(true); return new PoolingDataSource(connectionPool); } @@ -204,13 +205,31 @@ public final class UtilityMethods { public static byte[] generateHashOfWritable(Writable writable) { try { MessageDigest md = MessageDigest.getInstance("MD5"); - byte [] lensConfBytes = WritableUtils.toByteArray(writable); + byte[] lensConfBytes = WritableUtils.toByteArray(writable); md.update(lensConfBytes); - byte [] digest = md.digest(); + byte[] digest = md.digest(); return digest; } catch (NoSuchAlgorithmException e) { log.warn("MD5: No such method error " + writable); return null; } } + + /** + * @param conf + * @return + */ + public static BasicDataSource getDataSourceFromConfForScheduler(Configuration conf) { + BasicDataSource basicDataSource = getDataSourceFromConf(conf); + basicDataSource.setDefaultAutoCommit(true); + return basicDataSource; + } + + public static SchedulerJobHandle generateSchedulerJobHandle() { + return new SchedulerJobHandle(UUID.randomUUID()); + } + + public static SchedulerJobInstanceHandle generateSchedulerJobInstanceHandle() { + return new SchedulerJobInstanceHandle(UUID.randomUUID()); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/1a96948e/lens-server/src/main/resources/lensserver-default.xml ---------------------------------------------------------------------- diff --git a/lens-server/src/main/resources/lensserver-default.xml b/lens-server/src/main/resources/lensserver-default.xml index a57cad6..e9525fd 100644 --- a/lens-server/src/main/resources/lensserver-default.xml +++ b/lens-server/src/main/resources/lensserver-default.xml @@ -864,9 +864,15 @@ </description> </property> <property> - <name>lens.server.status.update.exponential.wait.millis</name> - <value>30000</value> - <description>Number of millis that would grow exponentially for next update, incase of transient failures. - </description> -</property> + <name>lens.server.status.update.exponential.wait.millis</name> + <value>30000</value> + <description>Number of millis that would grow exponentially for next update, incase of transient failures. + </description> + </property> + <property> + <name>lens.query.current.time.millis</name> + <value>0</value> + <description>Query current time in millis. This is used to resolve 'now'. If value is set to zero, 'now' is + resolved to current value</description> + </property> </configuration>