Repository: lens Updated Branches: refs/heads/master d7efa9e31 -> 600b9be4c
LENS-988: StateStore for LensScheduler Project: http://git-wip-us.apache.org/repos/asf/lens/repo Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/600b9be4 Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/600b9be4 Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/600b9be4 Branch: refs/heads/master Commit: 600b9be4c3d32894472b9762bdc96a8df9683625 Parents: d7efa9e Author: Lavkesh Lahngir <lavk...@linux.com> Authored: Mon Jun 27 18:23:51 2016 +0530 Committer: Rajat Khandelwal <rajatgupt...@gmail.com> Committed: Mon Jun 27 18:23:51 2016 +0530 ---------------------------------------------------------------------- .../lens/server/api/LensConfConstants.java | 6 + .../lens/server/scheduler/SchedulerDAO.java | 568 +++++++++++++++++++ .../server/scheduler/util/UtilityMethods.java | 52 ++ .../src/main/resources/lensserver-default.xml | 6 + src/site/apt/admin/config.apt | 90 +-- 5 files changed, 678 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/lens/blob/600b9be4/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java index 0a81f7b..bd9b1ab 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java @@ -1108,4 +1108,10 @@ public final class LensConfConstants { * Default value of "lens.query.cancel.on.timeout" */ public static final boolean DEFAULT_CANCEL_QUERY_ON_TIMEOUT = true; + + /** + * Scheduler store class + */ + public static final java.lang.String SCHEDULER_STORE_CLASS = SERVER_PFX + "scheduler.store.class"; + } http://git-wip-us.apache.org/repos/asf/lens/blob/600b9be4/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java new file mode 100644 index 0000000..e866eb3 --- /dev/null +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerDAO.java @@ -0,0 +1,568 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.scheduler; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.lens.api.LensSessionHandle; +import org.apache.lens.api.ToXMLString; +import org.apache.lens.api.scheduler.*; +import org.apache.lens.server.api.LensConfConstants; +import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.scheduler.util.UtilityMethods; + +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.commons.dbutils.QueryRunner; +import org.apache.commons.dbutils.ResultSetHandler; +import org.apache.hadoop.conf.Configuration; + +import lombok.extern.slf4j.Slf4j; + +/** + * SchedulerDAO class defines scheduler store operations. + */ +@Slf4j +public class SchedulerDAO { + private Configuration conf; + private SchedulerDBStore store; + + public SchedulerDAO(Configuration conf) throws LensException { + this.conf = conf; + try { + Class dbStoreClass = Class + .forName(conf.get(LensConfConstants.SCHEDULER_STORE_CLASS, SchedulerHsqlDBStore.class.getName())); + this.store = (SchedulerDBStore) dbStoreClass.newInstance(); + this.store.init(UtilityMethods.getDataSourceFromConf(conf)); + this.store.createJobTable(); + this.store.createJobInstaceTable(); + } catch (SQLException e) { + log.error("Error creating job tables", e); + throw new LensException("Error creating job tables ", e); + } catch (ClassNotFoundException e) { + log.error("No class found ", e); + throw new LensException("No class found ", e); + } catch (InstantiationException | IllegalAccessException e) { + log.error("Illegal access exception", e); + throw new LensException("Illegal access exceptio ", e); + } + } + + /** + * Saves the SchedulerJobInfo object into the store. + * + * @param jobInfo object + * @return the number of records stored + */ + public int storeJob(SchedulerJobInfo jobInfo) { + try { + return store.insertIntoJobTable(jobInfo); + } catch (SQLException e) { + log.error("Error while storing the jobInfo for " + jobInfo.getId().getHandleIdString(), e); + return 0; + } + } + + /** + * Fetches the SchedulerJobInfo object corresponding to handle id. + * + * @param id: Job handle id. + * @return SchedulerJobInfo + */ + public SchedulerJobInfo getSchedulerJobInfo(SchedulerJobHandle id) { + try { + return store.getSchedulerJobInfo(id.getHandleIdString()); + } catch (SQLException e) { + log.error("Error while getting the job detail for " + id.getHandleIdString(), e); + return null; + } + } + + /** + * Gets the job definition from the store + * + * @param id : Job handle id. + * @return XJob definition + */ + public XJob getJob(SchedulerJobHandle id) { + try { + return store.getJob(id.getHandleIdString()); + } catch (SQLException e) { + log.error("Error while getting the job for " + id.getHandleIdString(), e); + return null; + } + } + + /** + * Gets the Job state + * + * @param id : Job handle id. + * @return SchedulerJobState of the job. + */ + public SchedulerJobState getJobState(SchedulerJobHandle id) { + try { + return store.getJobState(id.getHandleIdString()); + } catch (SQLException e) { + log.error("Error while getting the job state for " + id.getHandleIdString(), e); + return null; + } + } + + /** + * Updates the job definition from the new SchedulerJobInfo + * + * @param info: Updated info object. + * @return number of rows updated. + */ + public int updateJob(SchedulerJobInfo info) { + try { + return store.updateJob(info.getId().getHandleIdString(), info.getJob(), info.getModifiedOn()); + } catch (SQLException e) { + log.error("Error while updating job for " + info.getId().getHandleIdString(), e); + return 0; + } + } + + /** + * Updates the job state form the new SchedulerJobInfo + * + * @param info: Updated info objects + * @return number of rows updated. + */ + public int updateJobState(SchedulerJobInfo info) { + try { + return store.updateJobState(info.getId().getHandleIdString(), info.getState().name(), info.getModifiedOn()); + } catch (SQLException e) { + log.error("Error while updating job state for " + info.getId().getHandleIdString(), e); + return 0; + } + } + + public int storeJobInstance(SchedulerJobInstanceInfo instanceInfo) { + try { + return store.insertIntoJobInstanceTable(instanceInfo); + } catch (SQLException e) { + log.error("Error while storing job instance for " + instanceInfo.getId()); + return 0; + } + } + + /** + * Gets the SchedulerJobInstanceInfo corresponding instance handle id. + * + * @param id : Job instance id + * @return ShedulerJobInstanceInfo + */ + public SchedulerJobInstanceInfo getSchedulerJobInstanceInfo(SchedulerJobInstanceHandle id) { + try { + return store.getJobInstanceInfo(id.getHandleIdString()); + } catch (SQLException e) { + log.error("Error while getting the job instance info for " + id.getHandleIdString(), e); + return null; + } + } + + /** + * Updates the instance state + * + * @param info: Updated instance info + * @return number of rows updated. + */ + public int updateJobInstanceState(SchedulerJobInstanceInfo info) { + try { + return store.updateJobInstanceState(info.getId().getHandleIdString(), info.getState().name()); + } catch (SQLException e) { + log.error("Error while updating the job instance state for " + info.getId().getHandleIdString(), e); + return 0; + } + } + + /** + * Gets all the instance handle id for a job. + * + * @param id: Job handle id. + * @return List of instance handles. + */ + public List<SchedulerJobInstanceHandle> getJobInstances(SchedulerJobHandle id) { + // TODO: Add number of results to be fetched + try { + return store.getAllJobInstances(id.getHandleIdString()); + } catch (SQLException e) { + log.error("Error while getting instances of a job with id " + id.getHandleIdString(), e); + return null; + } + } + + /** + * Gets all jobs which match the filter requirements. + * + * @param username : User name of the job + * @param state : State of the job + * @param startTime : Created on should be greater than this start time. + * @param endTime : Created on should be less than the end time. + * @return List of Job handles + */ + public List<SchedulerJobHandle> getJobs(String username, SchedulerJobInstanceState state, Long startTime, + Long endTime) { + try { + return store.getJobs(username, state.name(), startTime, endTime); + } catch (SQLException e) { + log.error("Error while getting jobs ", e); + return null; + } + } + + public abstract static class SchedulerDBStore { + protected static final String JOB_TABLE = "job_table"; + protected static final String JOB_INSTANCE_TABLE = "job_instance_table"; + protected static final String COLUMN_ID = "id"; + protected static final String COLUMN_JOB = "job"; + protected static final String COLUMN_USER = "username"; + protected static final String COLUMN_STATE = "state"; + protected static final String COLUMN_CREATED_ON = "createdon"; + protected static final String COLUMN_MODIFIED_ON = "modifiedon"; + protected static final String COLUMN_JOB_ID = "jobid"; + protected static final String COLUMN_SESSION_HANDLE = "sessionhandle"; + protected static final String COLUMN_START_TIME = "starttime"; + protected static final String COLUMN_END_TIME = "endtime"; + protected static final String COLUMN_RESULT_PATH = "resultpath"; + protected static final String COLUMN_QUERY = "query"; + protected QueryRunner runner; + // Generic multiple row handler for the fetch query. + private ResultSetHandler<List<Object[]>> multipleRowsHandler = new ResultSetHandler<List<Object[]>>() { + @Override + public List<Object[]> handle(ResultSet resultSet) throws SQLException { + List<Object[]> output = new ArrayList<>(); + while (resultSet.next()) { + ResultSetMetaData meta = resultSet.getMetaData(); + int cols = meta.getColumnCount(); + Object[] result = new Object[cols]; + for (int i = 0; i < cols; i++) { + result[i] = resultSet.getObject(i + 1); + } + output.add(result); + } + return output; + } + }; + + /** + * Init the store. + * + * @param ds + */ + public void init(BasicDataSource ds) { + runner = new QueryRunner(ds); + } + + /** + * Creates the job table + * + * @throws SQLException + */ + public abstract void createJobTable() throws SQLException; + + /** + * Creates the job instance table + * + * @throws SQLException + */ + public abstract void createJobInstaceTable() throws SQLException; + + /** + * Inserts the Job info object into job table + * + * @param jobInfo + * @return number of rows inserted. + * @throws SQLException + */ + public int insertIntoJobTable(SchedulerJobInfo jobInfo) throws SQLException { + String insertSQL = "INSERT INTO " + JOB_TABLE + " VALUES(?,?,?,?,?,?)"; + return runner + .update(insertSQL, jobInfo.getId().toString(), ToXMLString.toString(jobInfo.getJob()), jobInfo.getUserName(), + jobInfo.getState().name(), jobInfo.getCreatedOn(), jobInfo.getModifiedOn()); + } + + /** + * Inserts the job instance info object into job instance table + * + * @param instanceInfo + * @return number of rows inserted. + * @throws SQLException + */ + public int insertIntoJobInstanceTable(SchedulerJobInstanceInfo instanceInfo) throws SQLException { + String insertSQL = "INSERT INTO " + JOB_INSTANCE_TABLE + " VALUES(?,?,?,?,?,?,?,?,?)"; + return runner + .update(insertSQL, instanceInfo.getId().getHandleIdString(), instanceInfo.getJobId().getHandleIdString(), + instanceInfo.getSessionHandle().toString(), instanceInfo.getStartTime(), instanceInfo.getEndTime(), + instanceInfo.getResultPath(), instanceInfo.getQuery(), instanceInfo.getState().name(), + instanceInfo.getCreatedOn()); + } + + /** + * Gets the Job info object + * + * @param idStr + * @return SchedulerJobInfo object corresponding to the job handle. + * @throws SQLException + */ + public SchedulerJobInfo getSchedulerJobInfo(String idStr) throws SQLException { + String fetchSQL = "SELECT * FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; + List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, idStr); + if (result.size() == 0) { + return null; + } else { + Object[] jobInfo = result.get(0); + SchedulerJobHandle id = SchedulerJobHandle.fromString((String) jobInfo[0]); + XJob xJob = ToXMLString.valueOf((String) jobInfo[1], XJob.class); + String userName = (String) jobInfo[2]; + String state = (String) jobInfo[3]; + long createdOn = (Long) jobInfo[4]; + long modifiedOn = (Long) jobInfo[5]; + return new SchedulerJobInfo(id, xJob, userName, SchedulerJobState.valueOf(state), createdOn, modifiedOn); + } + } + + /** + * Gets the job definition + * + * @param id + * @return Xjob corresponding to the job handle + * @throws SQLException + */ + public XJob getJob(String id) throws SQLException { + String fetchSQL = "SELECT " + COLUMN_JOB + " FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; + List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, id); + if (result.size() == 0) { + return null; + } else { + return ToXMLString.valueOf((String) result.get(0)[0], XJob.class); + } + } + + /** + * Gets the job state + * + * @param id + * @return SchedulerJobState + * @throws SQLException + */ + public SchedulerJobState getJobState(String id) throws SQLException { + String fetchSQL = "SELECT " + COLUMN_STATE + " FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; + List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, id); + if (result.size() == 0) { + return null; + } else { + return SchedulerJobState.valueOf((String) result.get(0)[0]); + } + } + + /** + * Gets all the jobs which match the filter requirements. + * + * @param username + * @param state + * @param starttime + * @param endtime + * @return the list of job handles. + * @throws SQLException + */ + public List<SchedulerJobHandle> getJobs(String username, String state, Long starttime, Long endtime) + throws SQLException { + String whereClause = ""; + if (username != null && !username.isEmpty()) { + whereClause += (whereClause.isEmpty()) ? " WHERE " : " AND " + COLUMN_USER + "=?"; + } + if (state != null && !state.isEmpty()) { + whereClause += (whereClause.isEmpty()) ? " WHERE " : " AND " + COLUMN_STATE + "=?"; + } + if (starttime != null && starttime > 0) { + whereClause += (whereClause.isEmpty()) ? " WHERE " : " AND " + COLUMN_CREATED_ON + ">=?"; + } + if (endtime != null && endtime > 0) { + whereClause += (whereClause.isEmpty()) ? " WHERE " : " AND " + COLUMN_CREATED_ON + "< ?"; + } + String fetchSQL = "SELECT " + COLUMN_ID + " FROM " + JOB_TABLE + whereClause; + List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, username, state, starttime, endtime); + List<SchedulerJobHandle> resOut = new ArrayList<>(); + for (int i = 0; i < result.size(); i++) { + Object[] row = result.get(i); + resOut.add(SchedulerJobHandle.fromString((String) row[0])); + } + return resOut; + } + + /** + * Update the XJob into the job table + * + * @param id + * @param job + * @param modifiedOn + * @return the number of rows updated + * @throws SQLException + */ + public int updateJob(String id, XJob job, long modifiedOn) throws SQLException { + String updateSQL = + "UPDATE " + JOB_TABLE + " SET " + COLUMN_JOB + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID + + "=?"; + return runner.update(updateSQL, ToXMLString.toString(job), modifiedOn, id); + } + + /** + * Updates the job state into the job table + * + * @param id + * @param state + * @param modifiedOn + * @return number of rows updated. + * @throws SQLException + */ + public int updateJobState(String id, String state, long modifiedOn) throws SQLException { + String updateSQL = + "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATE + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID + + "=?"; + return runner.update(updateSQL, state, modifiedOn, id); + } + + /** + * Gets the Job instance info corresponding to handle id. + * + * @param idStr + * @return SchedulerJobInstanceInfo + * @throws SQLException + */ + public SchedulerJobInstanceInfo getJobInstanceInfo(String idStr) throws SQLException { + String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_TABLE + " WHERE " + COLUMN_ID + "=?"; + List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, idStr); + if (result.size() == 0) { + return null; + } else { + Object[] instanceInfo = result.get(0); + SchedulerJobInstanceHandle id = SchedulerJobInstanceHandle.fromString((String) instanceInfo[0]); + SchedulerJobHandle jobId = SchedulerJobHandle.fromString((String) instanceInfo[1]); + LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) instanceInfo[2]); + long starttime = (Long) instanceInfo[3]; + long endtime = (Long) instanceInfo[4]; + String resultPath = (String) instanceInfo[5]; + String query = (String) instanceInfo[6]; + SchedulerJobInstanceState state = SchedulerJobInstanceState.valueOf((String) instanceInfo[7]); + long createdOn = (Long) instanceInfo[8]; + return new SchedulerJobInstanceInfo(id, jobId, sessionHandle, starttime, endtime, resultPath, query, state, + createdOn); + } + } + + /** + * Updates the state of a job instance. + * + * @param id + * @param state + * @return number of rows updated. + * @throws SQLException + */ + public int updateJobInstanceState(String id, String state) throws SQLException { + String updateSQL = "UPDATE " + JOB_INSTANCE_TABLE + " SET " + COLUMN_STATE + "=?" + " WHERE " + COLUMN_ID + "=?"; + return runner.update(updateSQL, state, id); + } + + /** + * Gets all the instance handle of a job + * + * @param jobId + * @return List of SchedulerJobInstanceHandle + * @throws SQLException + */ + public List<SchedulerJobInstanceHandle> getAllJobInstances(String jobId) throws SQLException { + String fetchSQL = "SELECT " + COLUMN_ID + " FROM " + JOB_INSTANCE_TABLE + " WHERE " + COLUMN_JOB_ID + "=?"; + List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, jobId); + List<SchedulerJobInstanceHandle> resOut = new ArrayList<>(); + for (int i = 0; i < result.size(); i++) { + Object[] row = result.get(i); + resOut.add(SchedulerJobInstanceHandle.fromString((String) row[0])); + } + return resOut; + } + + } + + /** + * MySQL based DB Store. + */ + public static class SchedulerMySQLDBStore extends SchedulerDBStore { + /** + * {@inheritDoc} + */ + @Override + public void createJobTable() throws SQLException { + String createSQL = + "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB + + " TEXT," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATE + " VARCHAR(20)," + COLUMN_CREATED_ON + + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")"; + runner.update(createSQL); + } + + /** + * {@inheritDoc} + */ + @Override + public void createJobInstaceTable() throws SQLException { + String createSQL = + "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + + COLUMN_START_TIME + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " TEXT, " + + COLUMN_QUERY + " TEXT, " + COLUMN_STATE + " VARCHAR(20), " + COLUMN_CREATED_ON + " BIGINT, " + + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")"; + runner.update(createSQL); + } + } + + /** + * HSQL Based DB store. This class is used in testing. + */ + public static class SchedulerHsqlDBStore extends SchedulerDBStore { + /** + * {@inheritDoc} + */ + @Override + public void createJobTable() throws SQLException { + String createSQL = + "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB + + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATE + " VARCHAR(20)," + COLUMN_CREATED_ON + + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")"; + runner.update(createSQL); + } + + /** + * {@inheritDoc} + */ + @Override + public void createJobInstaceTable() throws SQLException { + String createSQL = + "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " + + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + + COLUMN_START_TIME + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " VARCHAR(1024)," + + COLUMN_QUERY + " VARCHAR(1024), " + COLUMN_STATE + " VARCHAR(20), " + COLUMN_CREATED_ON + " BIGINT, " + + " PRIMARY KEY ( " + COLUMN_ID + ")" + ")"; + runner.update(createSQL); + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/600b9be4/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java new file mode 100644 index 0000000..31783ad --- /dev/null +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/util/UtilityMethods.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.scheduler.util; + +import org.apache.lens.server.api.LensConfConstants; + +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.hadoop.conf.Configuration; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class UtilityMethods { + private UtilityMethods() { + + } + + /** + * + * @param conf + * @return + */ + public static BasicDataSource getDataSourceFromConf(Configuration conf) { + BasicDataSource basicDataSource = new BasicDataSource(); + basicDataSource.setDriverClassName( + conf.get(LensConfConstants.SERVER_DB_DRIVER_NAME, LensConfConstants.DEFAULT_SERVER_DB_DRIVER_NAME)); + basicDataSource + .setUrl(conf.get(LensConfConstants.SERVER_DB_JDBC_URL, LensConfConstants.DEFAULT_SERVER_DB_JDBC_URL)); + basicDataSource + .setUsername(conf.get(LensConfConstants.SERVER_DB_JDBC_USER, LensConfConstants.DEFAULT_SERVER_DB_USER)); + basicDataSource + .setPassword(conf.get(LensConfConstants.SERVER_DB_JDBC_PASS, LensConfConstants.DEFAULT_SERVER_DB_PASS)); + basicDataSource.setDefaultAutoCommit(true); + return basicDataSource; + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/600b9be4/lens-server/src/main/resources/lensserver-default.xml ---------------------------------------------------------------------- diff --git a/lens-server/src/main/resources/lensserver-default.xml b/lens-server/src/main/resources/lensserver-default.xml index 5492132..6dc322e 100644 --- a/lens-server/src/main/resources/lensserver-default.xml +++ b/lens-server/src/main/resources/lensserver-default.xml @@ -859,6 +859,12 @@ lens server will return the handle of the previous query </description> </property> + <property> + <name>lens.server.scheduler.store.class</name> + <value>org.apache.lens.server.scheduler.SchedulerDAO$SchedulerHsqlDBStore</value> + <description>A subclass of SchedulerDBStore class used for storing scheduler related information. + </description> + </property> <property> <name>lens.server.status.update.exponential.wait.millis</name> <value>30000</value> http://git-wip-us.apache.org/repos/asf/lens/blob/600b9be4/src/site/apt/admin/config.apt ---------------------------------------------------------------------- diff --git a/src/site/apt/admin/config.apt b/src/site/apt/admin/config.apt index 7ad1843..5b76069 100644 --- a/src/site/apt/admin/config.apt +++ b/src/site/apt/admin/config.apt @@ -175,92 +175,94 @@ Lens server configuration *--+--+---+--+ |73|lens.server.scheduler.service.impl|org.apache.lens.server.scheduler.SchedulerServiceImpl|Implementation class for query scheduler service| *--+--+---+--+ -|74|lens.server.scheduler.ws.resource.impl|org.apache.lens.server.scheduler.ScheduleResource|Implementation class for query scheduler resource| +|74|lens.server.scheduler.store.class|org.apache.lens.server.scheduler.SchedulerDAO$SchedulerHsqlDBStore|A subclass of SchedulerDBStore class used for storing scheduler related information.| *--+--+---+--+ -|75|lens.server.scheduling.queue.poll.interval.millisec|2000|The interval at which submission thread will poll scheduling queue to fetch the next query for submission. If value is less than equal to 0, then it would mean that thread will continuosly poll without sleeping. The interval has to be given in milliseconds.| +|75|lens.server.scheduler.ws.resource.impl|org.apache.lens.server.scheduler.ScheduleResource|Implementation class for query scheduler resource| *--+--+---+--+ -|76|lens.server.serverMode.ws.filter.impl|org.apache.lens.server.ServerModeFilter|Implementation class for ServerMode Filter| +|76|lens.server.scheduling.queue.poll.interval.millisec|2000|The interval at which submission thread will poll scheduling queue to fetch the next query for submission. If value is less than equal to 0, then it would mean that thread will continuosly poll without sleeping. The interval has to be given in milliseconds.| *--+--+---+--+ -|77|lens.server.service.provider.factory|org.apache.lens.server.ServiceProviderFactoryImpl|Service provider factory implementation class. This parameter is used to lookup the factory implementation class name that would provide an instance of ServiceProvider. Users should instantiate the class to obtain its instance. Example -- Class spfClass = conf.getClass("lens.server.service.provider.factory", null, ServiceProviderFactory.class); ServiceProviderFactory spf = spfClass.newInstance(); ServiceProvider serviceProvider = spf.getServiceProvider(); -- This is not supposed to be overridden by users.| +|77|lens.server.serverMode.ws.filter.impl|org.apache.lens.server.ServerModeFilter|Implementation class for ServerMode Filter| *--+--+---+--+ -|78|lens.server.servicenames|session,query,metastore,scheduler,quota|These services would be started in the specified order when lens-server starts up| +|78|lens.server.service.provider.factory|org.apache.lens.server.ServiceProviderFactoryImpl|Service provider factory implementation class. This parameter is used to lookup the factory implementation class name that would provide an instance of ServiceProvider. Users should instantiate the class to obtain its instance. Example -- Class spfClass = conf.getClass("lens.server.service.provider.factory", null, ServiceProviderFactory.class); ServiceProviderFactory spf = spfClass.newInstance(); ServiceProvider serviceProvider = spf.getServiceProvider(); -- This is not supposed to be overridden by users.| *--+--+---+--+ -|79|lens.server.session.expiry.service.interval.secs|3600|Interval at which lens session expiry service runs| +|79|lens.server.servicenames|session,query,metastore,scheduler,quota|These services would be started in the specified order when lens-server starts up| *--+--+---+--+ -|80|lens.server.session.service.impl|org.apache.lens.server.session.HiveSessionService|Implementation class for session service| +|80|lens.server.session.expiry.service.interval.secs|3600|Interval at which lens session expiry service runs| *--+--+---+--+ -|81|lens.server.session.timeout.seconds|86400|Lens session timeout in seconds.If there is no activity on the session for this period then the session will be closed.Default timeout is one day.| +|81|lens.server.session.service.impl|org.apache.lens.server.session.HiveSessionService|Implementation class for session service| *--+--+---+--+ -|82|lens.server.session.ws.resource.impl|org.apache.lens.server.session.SessionResource|Implementation class for Session Resource| +|82|lens.server.session.timeout.seconds|86400|Lens session timeout in seconds.If there is no activity on the session for this period then the session will be closed.Default timeout is one day.| *--+--+---+--+ -|83|lens.server.state.persist.out.stream.buffer.size|1048576|Output Stream Buffer Size used in writing lens server state to file system. Size is in bytes.| +|83|lens.server.session.ws.resource.impl|org.apache.lens.server.session.SessionResource|Implementation class for Session Resource| *--+--+---+--+ -|84|lens.server.state.persistence.enabled|true|If flag is enabled, state of all the services will be persisted periodically to a location specified by lens.server.persist.location and on server restart all the services will be started from last saved state.| +|84|lens.server.state.persist.out.stream.buffer.size|1048576|Output Stream Buffer Size used in writing lens server state to file system. Size is in bytes.| *--+--+---+--+ -|85|lens.server.state.persistence.interval.millis|300000|Lens server state persistence time interval in milliseconds| +|85|lens.server.state.persistence.enabled|true|If flag is enabled, state of all the services will be persisted periodically to a location specified by lens.server.persist.location and on server restart all the services will be started from last saved state.| *--+--+---+--+ -|86|lens.server.statistics.db|lensstats|Database to which statistics tables are created and partitions are added.| +|86|lens.server.state.persistence.interval.millis|300000|Lens server state persistence time interval in milliseconds| *--+--+---+--+ -|87|lens.server.statistics.log.rollover.interval|3600000|Default rate which log statistics store scans for rollups in milliseconds.| +|87|lens.server.statistics.db|lensstats|Database to which statistics tables are created and partitions are added.| *--+--+---+--+ -|88|lens.server.statistics.store.class|org.apache.lens.server.stats.store.log.LogStatisticsStore|Default implementation of class used to persist Lens Statistics.| +|88|lens.server.statistics.log.rollover.interval|3600000|Default rate which log statistics store scans for rollups in milliseconds.| *--+--+---+--+ -|89|lens.server.statistics.warehouse.dir|file:///tmp/lens/statistics/warehouse|Default top level location where stats are moved by the log statistics store.| +|89|lens.server.statistics.store.class|org.apache.lens.server.stats.store.log.LogStatisticsStore|Default implementation of class used to persist Lens Statistics.| *--+--+---+--+ -|90|lens.server.status.update.exponential.wait.millis|30000|Number of millis that would grow exponentially for next update, incase of transient failures.| +|90|lens.server.statistics.warehouse.dir|file:///tmp/lens/statistics/warehouse|Default top level location where stats are moved by the log statistics store.| *--+--+---+--+ -|91|lens.server.status.update.maximum.delay.secs|1800|The maximum delay in seconds for next status update to happen after any transient failure. This will be used a maximum delay sothat exponential wait times not to grow to bigger value.| +|91|lens.server.status.update.exponential.wait.millis|30000|Number of millis that would grow exponentially for next update, incase of transient failures.| *--+--+---+--+ -|92|lens.server.status.update.num.retries|10|The number of retries a status update will tried with exponentital back off, in case of transient issues, upon which query will be marked FAILED.| +|92|lens.server.status.update.maximum.delay.secs|1800|The maximum delay in seconds for next status update to happen after any transient failure. This will be used a maximum delay sothat exponential wait times not to grow to bigger value.| *--+--+---+--+ -|93|lens.server.total.query.cost.ceiling.per.user|-1.0|A query submitted by user will be launched only if total query cost of all current launched queries of user is less than or equal to total query cost ceiling defined by this property. This configuration value is only useful when TotalQueryCostCeilingConstraint is enabled by using org.apache.lens.server.query.constraint.TotalQueryCostCeilingConstraintFactory as one of the factories in lens.server.query.constraint.factories property. Default is -1.0 which means that there is no limit on the total query cost of launched queries submitted by a user.| +|93|lens.server.status.update.num.retries|10|The number of retries a status update will tried with exponentital back off, in case of transient issues, upon which query will be marked FAILED.| *--+--+---+--+ -|94|lens.server.ui.base.uri|http://0.0.0.0:19999/|The base url for the Lens UI Server| +|94|lens.server.total.query.cost.ceiling.per.user|-1.0|A query submitted by user will be launched only if total query cost of all current launched queries of user is less than or equal to total query cost ceiling defined by this property. This configuration value is only useful when TotalQueryCostCeilingConstraint is enabled by using org.apache.lens.server.query.constraint.TotalQueryCostCeilingConstraintFactory as one of the factories in lens.server.query.constraint.factories property. Default is -1.0 which means that there is no limit on the total query cost of launched queries submitted by a user.| *--+--+---+--+ -|95|lens.server.ui.enable|true|Bringing up the ui server is optional. By default it brings up UI server.| +|95|lens.server.ui.base.uri|http://0.0.0.0:19999/|The base url for the Lens UI Server| *--+--+---+--+ -|96|lens.server.ui.enable.caching|true|Set this to false to disable static file caching in the UI server| +|96|lens.server.ui.enable|true|Bringing up the ui server is optional. By default it brings up UI server.| *--+--+---+--+ -|97|lens.server.ui.static.dir|webapp/lens-server/static|The base directory to server UI static files from| +|97|lens.server.ui.enable.caching|true|Set this to false to disable static file caching in the UI server| *--+--+---+--+ -|98|lens.server.user.resolver.custom.class|full.package.name.Classname|Required for CUSTOM user resolver. In case the provided implementations are not sufficient for user config resolver, a custom classname can be provided. Class should extend org.apache.lens.server.user.UserConfigLoader| +|98|lens.server.ui.static.dir|webapp/lens-server/static|The base directory to server UI static files from| *--+--+---+--+ -|99|lens.server.user.resolver.db.keys|lens.session.cluster.user,mapred.job.queue.name|Required for DATABASE and LDAP_BACKED_DATABASE user resolvers. For database based user config loaders, the conf keys that will be loaded from database.| +|99|lens.server.user.resolver.custom.class|full.package.name.Classname|Required for CUSTOM user resolver. In case the provided implementations are not sufficient for user config resolver, a custom classname can be provided. Class should extend org.apache.lens.server.user.UserConfigLoader| *--+--+---+--+ -|100|lens.server.user.resolver.db.query|select clusteruser,queue from user_config_table where username=?|Required for DATABASE and LDAP_BACKED_DATABASE user resolvers. For database based user config loader, this query will be run with single argument = logged in user and the result columns will be assigned to lens.server.user.resolver.db.keys in order. For ldap backed database resolver, the argument to this query will be the intermediate values obtained from ldap.| +|100|lens.server.user.resolver.db.keys|lens.session.cluster.user,mapred.job.queue.name|Required for DATABASE and LDAP_BACKED_DATABASE user resolvers. For database based user config loaders, the conf keys that will be loaded from database.| *--+--+---+--+ -|101|lens.server.user.resolver.fixed.value| |Required for FIXED user resolver. when lens.server.user.resolver.type=FIXED, This will be the value cluster user will resolve to.| +|101|lens.server.user.resolver.db.query|select clusteruser,queue from user_config_table where username=?|Required for DATABASE and LDAP_BACKED_DATABASE user resolvers. For database based user config loader, this query will be run with single argument = logged in user and the result columns will be assigned to lens.server.user.resolver.db.keys in order. For ldap backed database resolver, the argument to this query will be the intermediate values obtained from ldap.| *--+--+---+--+ -|102|lens.server.user.resolver.ldap.bind.dn| |Required for LDAP_BACKED_DATABASE user resolvers. ldap dn for admin binding example: CN=company-it-admin,ou=service-account,ou=company-service-account,dc=dc1,dc=com...| +|102|lens.server.user.resolver.fixed.value| |Required for FIXED user resolver. when lens.server.user.resolver.type=FIXED, This will be the value cluster user will resolve to.| *--+--+---+--+ -|103|lens.server.user.resolver.ldap.bind.password| |Required for LDAP_BACKED_DATABASE user resolvers. ldap password for admin binding above| +|103|lens.server.user.resolver.ldap.bind.dn| |Required for LDAP_BACKED_DATABASE user resolvers. ldap dn for admin binding example: CN=company-it-admin,ou=service-account,ou=company-service-account,dc=dc1,dc=com...| *--+--+---+--+ -|104|lens.server.user.resolver.ldap.fields|department|Required for LDAP_BACKED_DATABASE user resolvers. list of fields to be obtained from ldap. These will be cached by the intermediate db.| +|104|lens.server.user.resolver.ldap.bind.password| |Required for LDAP_BACKED_DATABASE user resolvers. ldap password for admin binding above| *--+--+---+--+ -|105|lens.server.user.resolver.ldap.intermediate.db.delete.sql|delete from user_department where username=?|Required for LDAP_BACKED_DATABASE user resolvers. query to delete intermediate values from database backing ldap as cache. one argument: logged in user.| +|105|lens.server.user.resolver.ldap.fields|department|Required for LDAP_BACKED_DATABASE user resolvers. list of fields to be obtained from ldap. These will be cached by the intermediate db.| *--+--+---+--+ -|106|lens.server.user.resolver.ldap.intermediate.db.insert.sql|insert into user_department (username, department, expiry) values (?, ?, ?)|Required for LDAP_BACKED_DATABASE user resolvers. query to insert intermediate values from database backing ldap as cache. arguments: first logged in user, then all intermediate values, then current time + expiration time| +|106|lens.server.user.resolver.ldap.intermediate.db.delete.sql|delete from user_department where username=?|Required for LDAP_BACKED_DATABASE user resolvers. query to delete intermediate values from database backing ldap as cache. one argument: logged in user.| *--+--+---+--+ -|107|lens.server.user.resolver.ldap.intermediate.db.query|select department from user_department where username=? and expiry>?|Required for LDAP_BACKED_DATABASE user resolvers. query to obtain intermediate values from database backing ldap as cache. two arguments: logged in user and current time.| +|107|lens.server.user.resolver.ldap.intermediate.db.insert.sql|insert into user_department (username, department, expiry) values (?, ?, ?)|Required for LDAP_BACKED_DATABASE user resolvers. query to insert intermediate values from database backing ldap as cache. arguments: first logged in user, then all intermediate values, then current time + expiration time| *--+--+---+--+ -|108|lens.server.user.resolver.ldap.search.base| |Required for LDAP_BACKED_DATABASE user resolvers. for searching intermediate values for a user, the search keys. example: cn=users,dc=dc1,dc=dc2...| +|108|lens.server.user.resolver.ldap.intermediate.db.query|select department from user_department where username=? and expiry>?|Required for LDAP_BACKED_DATABASE user resolvers. query to obtain intermediate values from database backing ldap as cache. two arguments: logged in user and current time.| *--+--+---+--+ -|109|lens.server.user.resolver.ldap.search.filter|(&(objectClass=user)(sAMAccountName=%s))|Required for LDAP_BACKED_DATABASE user resolvers. filter pattern for ldap search| +|109|lens.server.user.resolver.ldap.search.base| |Required for LDAP_BACKED_DATABASE user resolvers. for searching intermediate values for a user, the search keys. example: cn=users,dc=dc1,dc=dc2...| *--+--+---+--+ -|110|lens.server.user.resolver.ldap.url| |Required for LDAP_BACKED_DATABASE user resolvers. ldap url to connect to.| +|110|lens.server.user.resolver.ldap.search.filter|(&(objectClass=user)(sAMAccountName=%s))|Required for LDAP_BACKED_DATABASE user resolvers. filter pattern for ldap search| *--+--+---+--+ -|111|lens.server.user.resolver.propertybased.filename|/path/to/propertyfile|Required for PROPERTYBASED user resolver. when lens.server.user.resolver.type is PROPERTYBASED, then this file will be read and parsed to determine cluster user. Each line should contain username followed by DOT followed by property full name followed by equal-to sign and followed by value. example schema of the file is: user1.lens.server.cluster.user=clusteruser1 user1.mapred.job.queue.name=queue1 *.lens.server.cluster.user=defaultclusteruser *.mapred.job.queue.name=default| +|111|lens.server.user.resolver.ldap.url| |Required for LDAP_BACKED_DATABASE user resolvers. ldap url to connect to.| *--+--+---+--+ -|112|lens.server.user.resolver.type|FIXED|Type of user config resolver. allowed values are FIXED, PROPERTYBASED, DATABASE, LDAP_BACKED_DATABASE, CUSTOM.| +|112|lens.server.user.resolver.propertybased.filename|/path/to/propertyfile|Required for PROPERTYBASED user resolver. when lens.server.user.resolver.type is PROPERTYBASED, then this file will be read and parsed to determine cluster user. Each line should contain username followed by DOT followed by property full name followed by equal-to sign and followed by value. example schema of the file is: user1.lens.server.cluster.user=clusteruser1 user1.mapred.job.queue.name=queue1 *.lens.server.cluster.user=defaultclusteruser *.mapred.job.queue.name=default| *--+--+---+--+ -|113|lens.server.waiting.queries.selection.policy.factories|org.apache.lens.server.query.collect.UserSpecificWaitingQueriesSelectionPolicyFactory|Factories used to instantiate waiting queries selection policies. Every factory should be an implementation of org.apache.lens.server.api.common.ConfigBasedObjectCreationFactory and create an implementation of org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy.| +|113|lens.server.user.resolver.type|FIXED|Type of user config resolver. allowed values are FIXED, PROPERTYBASED, DATABASE, LDAP_BACKED_DATABASE, CUSTOM.| *--+--+---+--+ -|114|lens.server.ws.featurenames|multipart,moxyjson,moxyjsonconfigresovler|These JAX-RS Feature(s) would be started in the specified order when lens-server starts up| +|114|lens.server.waiting.queries.selection.policy.factories|org.apache.lens.server.query.collect.UserSpecificWaitingQueriesSelectionPolicyFactory|Factories used to instantiate waiting queries selection policies. Every factory should be an implementation of org.apache.lens.server.api.common.ConfigBasedObjectCreationFactory and create an implementation of org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy.| *--+--+---+--+ -|115|lens.server.ws.filternames|authentication,consistentState,serverMode|These JAX-RS filters would be started in the specified order when lens-server starts up| +|115|lens.server.ws.featurenames|multipart,moxyjson,moxyjsonconfigresovler|These JAX-RS Feature(s) would be started in the specified order when lens-server starts up| *--+--+---+--+ -|116|lens.server.ws.listenernames|appevent|These listeners would be called in the specified order when lens-server starts up| +|116|lens.server.ws.filternames|authentication,consistentState,serverMode|These JAX-RS filters would be started in the specified order when lens-server starts up| *--+--+---+--+ -|117|lens.server.ws.resourcenames|session,metastore,query,quota,scheduler,index,log|These JAX-RS resources would be started in the specified order when lens-server starts up| +|117|lens.server.ws.listenernames|appevent|These listeners would be called in the specified order when lens-server starts up| +*--+--+---+--+ +|118|lens.server.ws.resourcenames|session,metastore,query,quota,scheduler,index,log|These JAX-RS resources would be started in the specified order when lens-server starts up| *--+--+---+--+ The configuration parameters and their default values