Repository: lens
Updated Branches:
  refs/heads/master 13dc803c7 -> b54c4b999


LENS-292 : Query result retention policy


Project: http://git-wip-us.apache.org/repos/asf/lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/lens/commit/b54c4b99
Tree: http://git-wip-us.apache.org/repos/asf/lens/tree/b54c4b99
Diff: http://git-wip-us.apache.org/repos/asf/lens/diff/b54c4b99

Branch: refs/heads/master
Commit: b54c4b9993bdfc5c79e6c9ab727e908d294a61dc
Parents: 13dc803
Author: Deepak Barr <deepak.b...@gmail.com>
Authored: Wed Oct 21 15:51:59 2015 +0530
Committer: Rajat Khandelwal <rajatgupt...@gmail.com>
Committed: Wed Oct 21 15:51:59 2015 +0530

----------------------------------------------------------------------
 .../org/apache/lens/cube/parse/DateUtil.java    |   4 +-
 .../lens/server/api/LensConfConstants.java      |  40 +++++
 .../server/query/QueryExecutionServiceImpl.java |  22 +++
 .../lens/server/query/QueryResultPurger.java    | 177 +++++++++++++++++++
 .../src/main/resources/lensserver-default.xml   |  25 +++
 .../server/query/TestQueryResultPurger.java     | 100 +++++++++++
 src/site/apt/admin/config.apt                   |   8 +
 7 files changed, 374 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/lens/blob/b54c4b99/lens-cube/src/main/java/org/apache/lens/cube/parse/DateUtil.java
----------------------------------------------------------------------
diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/DateUtil.java 
b/lens-cube/src/main/java/org/apache/lens/cube/parse/DateUtil.java
index cbcc76c..67932da 100644
--- a/lens-cube/src/main/java/org/apache/lens/cube/parse/DateUtil.java
+++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/DateUtil.java
@@ -432,7 +432,7 @@ public final class DateUtil {
   }
 
   @EqualsAndHashCode
-  static class TimeDiff {
+  public static class TimeDiff {
     int quantity;
     UpdatePeriod updatePeriod;
 
@@ -441,7 +441,7 @@ public final class DateUtil {
       this.updatePeriod = updatePeriod;
     }
 
-    static TimeDiff parseFrom(String diffStr) throws LensException {
+    public static TimeDiff parseFrom(String diffStr) throws LensException {
       // Get the relative diff part to get eventual date based on now.
       Matcher qtyMatcher = P_QUANTITY.matcher(diffStr);
       int qty = 1;

http://git-wip-us.apache.org/repos/asf/lens/blob/b54c4b99/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
----------------------------------------------------------------------
diff --git 
a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
 
b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
index f202603..7ee0749 100644
--- 
a/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
+++ 
b/lens-server-api/src/main/java/org/apache/lens/server/api/LensConfConstants.java
@@ -942,4 +942,44 @@ public final class LensConfConstants {
    */
   public static final int DEFAULT_FETCH_COUNT_SAVED_QUERY_LIST = 20;
 
+  /**
+   * The Constant RESULTSET_PURGE_ENABLED.
+   */
+  public static final String RESULTSET_PURGE_ENABLED = SERVER_PFX + 
"resultset.purge.enabled";
+
+  /**
+   * The Constant DEFAULT_RESULTSET_PURGE_ENABLED
+   */
+  public static final boolean DEFAULT_RESULTSET_PURGE_ENABLED = false;
+
+  /**
+   * The Constant RESULTSET_PURGE_INTERVAL_IN_SECONDS.
+   */
+  public static final String RESULTSET_PURGE_INTERVAL_IN_SECONDS = SERVER_PFX 
+ "resultsetpurger.sleep.interval.secs";
+
+  /*
+   * The Constant DEFAULT_RESULTSET_PURGE_INTERVAL_IN_SECONDS.
+   */
+  public static final int DEFAULT_RESULTSET_PURGE_INTERVAL_IN_SECONDS = 3600;
+
+  /**
+   * The Constant QUERY_RESULTSET_RETENTION.
+   */
+  public static final String QUERY_RESULTSET_RETENTION = SERVER_PFX + 
"query.resultset.retention";
+
+  /**
+   * The Constant DEFAULT_QUERY_RESULTSET_RETENTION.
+   */
+  public static final String DEFAULT_QUERY_RESULTSET_RETENTION = "1 day";
+
+  /**
+   * The Constant HDFS_OUTPUT_RETENTION.
+   */
+  public static final String HDFS_OUTPUT_RETENTION = SERVER_PFX + 
"hdfs.output.retention";
+
+  /**
+   * The Constant DEFAULT_HDFS_OUTPUT_RETENTION.
+   */
+  public static final String DEFAULT_HDFS_OUTPUT_RETENTION = "1 day";
+
 }

http://git-wip-us.apache.org/repos/asf/lens/blob/b54c4b99/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
index 7b15a3c..1a49250 100644
--- 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java
@@ -200,6 +200,11 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
   private final Thread prepareQueryPurger = new Thread(new 
PreparedQueryPurger(), "PrepareQueryPurger");
 
   /**
+   * The query result purger
+   */
+  private QueryResultPurger queryResultPurger;
+
+  /**
    * The query acceptors.
    */
   private List<QueryAcceptor> queryAcceptors = new ArrayList<QueryAcceptor>();
@@ -1087,6 +1092,11 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     }
 
     estimatePool.shutdownNow();
+
+    if (null != queryResultPurger) {
+      queryResultPurger.stop();
+    }
+
     log.info("Query execution service stopped");
   }
 
@@ -1123,6 +1133,13 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
     prepareQueryPurger.start();
 
     startEstimatePool();
+
+    if (conf.getBoolean(RESULTSET_PURGE_ENABLED, 
DEFAULT_RESULTSET_PURGE_ENABLED)) {
+      queryResultPurger = new QueryResultPurger();
+      queryResultPurger.init(conf);
+    } else {
+      log.info("Query result purger is not enabled");
+    }
   }
 
   private void startEstimatePool() {
@@ -2441,6 +2458,11 @@ public class QueryExecutionServiceImpl extends 
BaseLensService implements QueryE
       details.append("QuerySubmitter paused for test.");
     }
 
+    if (null != this.queryResultPurger && !this.queryResultPurger.isHealthy()) 
{
+      isHealthy = false;
+      details.append("QueryResultPurger is dead.");
+    }
+
     if (!isHealthy) {
       log.error(details.toString());
     }

http://git-wip-us.apache.org/repos/asf/lens/blob/b54c4b99/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java
new file mode 100644
index 0000000..54c6574
--- /dev/null
+++ 
b/lens-server/src/main/java/org/apache/lens/server/query/QueryResultPurger.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.server.query;
+
+import static org.apache.lens.server.api.LensConfConstants.*;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lens.cube.parse.DateUtil;
+import org.apache.lens.server.LensServices;
+import org.apache.lens.server.api.error.LensException;
+import org.apache.lens.server.api.metrics.MetricsService;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * The Class QueryResultPurger - Purges old files in query resultset directory 
and hdfs output directory.
+ */
+@Slf4j
+public class QueryResultPurger implements Runnable {
+
+  /**
+   * The resultset retention
+   */
+  private DateUtil.TimeDiff resultsetRetention;
+
+  /**
+   * The hdfs output retention
+   */
+  private DateUtil.TimeDiff hdfsOutputRetention;
+
+  /**
+   * The query result purger executor
+   */
+  private ScheduledExecutorService queryResultPurgerExecutor;
+
+  private Path resultsetPath;
+
+  private Path hdfsOutputPath;
+
+  private Configuration conf;
+
+  /**
+   * The Constant QUERY_RESULT_PURGER_COUNTER.
+   */
+  public static final String QUERY_RESULT_PURGER_ERROR_COUNTER = 
"query-result-purger-errors";
+
+  /**
+   * The metrics service.
+   */
+  private MetricsService metricsService;
+
+  public void init(Configuration configuration) {
+    this.conf = configuration;
+    this.resultsetPath = new Path(conf.get(RESULT_SET_PARENT_DIR, 
RESULT_SET_PARENT_DIR_DEFAULT));
+    this.hdfsOutputPath = new Path(resultsetPath.toString(),
+      conf.get(QUERY_HDFS_OUTPUT_PATH, DEFAULT_HDFS_OUTPUT_PATH));
+    int purgeDelay = conf.getInt(RESULTSET_PURGE_INTERVAL_IN_SECONDS, 
DEFAULT_RESULTSET_PURGE_INTERVAL_IN_SECONDS);
+
+    try {
+      String resultSetDiffStr = conf.get(QUERY_RESULTSET_RETENTION, 
DEFAULT_QUERY_RESULTSET_RETENTION);
+      String hdfsOutputDiffStr = conf.get(QUERY_RESULTSET_RETENTION, 
DEFAULT_QUERY_RESULTSET_RETENTION);
+      this.resultsetRetention = DateUtil.TimeDiff.parseFrom(resultSetDiffStr);
+      this.hdfsOutputRetention = 
DateUtil.TimeDiff.parseFrom(hdfsOutputDiffStr);
+      queryResultPurgerExecutor = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+          return new Thread(r, "QueryResultPurger");
+        }
+      });
+      queryResultPurgerExecutor.scheduleWithFixedDelay(this, purgeDelay,
+        purgeDelay, TimeUnit.SECONDS);
+      log.info(
+        "Initialized query result purger with lens resultset retention of {} 
and hdfs output retention of {}, "
+          + "scheduled to run every {} seconds",
+        resultSetDiffStr, hdfsOutputDiffStr, purgeDelay);
+    } catch (LensException e) {
+      log.error("Error occurred while initializing query result purger", e);
+    }
+  }
+
+  public void purgePaths(Path path, DateUtil.TimeDiff retention, boolean 
purgeDirectory) throws IOException {
+    int counter = 0;
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus[] fileList = fs.listStatus(path);
+    for (FileStatus f : fileList) {
+      if ((f.isFile() || (f.isDirectory() && purgeDirectory)) && 
canBePurged(f, retention)) {
+        try {
+          if (fs.delete(f.getPath(), true)) {
+            counter++;
+          } else {
+            getMetrics().incrCounter(this.getClass(), 
QUERY_RESULT_PURGER_ERROR_COUNTER);
+          }
+        } catch (IOException e) {
+          getMetrics().incrCounter(this.getClass(), 
QUERY_RESULT_PURGER_ERROR_COUNTER);
+        }
+      }
+    }
+    log.info("Purged {} files/directories in {}", counter, path.toString());
+  }
+
+  @Override
+  public void run() {
+    try {
+      purgePaths(resultsetPath, resultsetRetention, false);
+      purgePaths(hdfsOutputPath, hdfsOutputRetention, true);
+    } catch (Exception e) {
+      log.error("Error occurred in Query result purger", e);
+      getMetrics().incrCounter(this.getClass(), 
QUERY_RESULT_PURGER_ERROR_COUNTER);
+    }
+  }
+
+  private boolean canBePurged(FileStatus f, DateUtil.TimeDiff retention) {
+    return f.getModificationTime() < 
retention.negativeOffsetFrom(Calendar.getInstance().getTime()).getTime();
+  }
+
+  /**
+   * Stops query result purger
+   */
+  public void stop() {
+    if (null != queryResultPurgerExecutor) {
+      queryResultPurgerExecutor.shutdownNow();
+      log.info("Stopped query result purger.");
+    }
+  }
+
+  /**
+   * Checks the status of executor service
+   *
+   * @return
+   */
+  public boolean isHealthy() {
+    if (null == queryResultPurgerExecutor || 
queryResultPurgerExecutor.isShutdown()
+      || queryResultPurgerExecutor.isTerminated()) {
+      return false;
+    }
+    return true;
+  }
+
+  private MetricsService getMetrics() {
+    if (metricsService == null) {
+      metricsService = LensServices.get().getService(MetricsService.NAME);
+      if (metricsService == null) {
+        throw new NullPointerException("Could not get metrics service");
+      }
+    }
+    return metricsService;
+  }
+}

http://git-wip-us.apache.org/repos/asf/lens/blob/b54c4b99/lens-server/src/main/resources/lensserver-default.xml
----------------------------------------------------------------------
diff --git a/lens-server/src/main/resources/lensserver-default.xml 
b/lens-server/src/main/resources/lensserver-default.xml
index 1e6bd10..5f268cb 100644
--- a/lens-server/src/main/resources/lensserver-default.xml
+++ b/lens-server/src/main/resources/lensserver-default.xml
@@ -277,6 +277,31 @@
     <description>Database to which statistics tables are created and 
partitions are added.</description>
   </property>
 
+  <!-- Query Result Purger Configurations -->
+  <property>
+    <name>lens.server.resultset.purge.enabled</name>
+    <value>false</value>
+    <description>Whether to purge the query results </description>
+  </property>
+
+  <property>
+    <name>lens.server.resultsetpurger.sleep.interval.secs</name>
+    <value>3600</value>
+    <description>Periodicity for Query result purger runs. Default 1 hour. 
</description>
+  </property>
+
+  <property>
+    <name>lens.server.query.resultset.retention</name>
+    <value>1 day</value>
+    <description>Lens query resultset retention period. Default 1 day 
</description>
+  </property>
+
+  <property>
+    <name>lens.server.hdfs.output.retention</name>
+    <value>1 day</value>
+    <description>hdfs output retention period. Default 1 day</description>
+  </property>
+
   <!-- Finished Query Purging Configurations -->
   <property>
     <name>lens.server.querypurger.sleep.interval</name>

http://git-wip-us.apache.org/repos/asf/lens/blob/b54c4b99/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java
----------------------------------------------------------------------
diff --git 
a/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java
 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java
new file mode 100644
index 0000000..9aeb645
--- /dev/null
+++ 
b/lens-server/src/test/java/org/apache/lens/server/query/TestQueryResultPurger.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.lens.server.query;
+
+import static org.testng.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.lens.server.api.LensConfConstants;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+@Test(groups = "unit-test")
+public class TestQueryResultPurger {
+
+  private Configuration conf;
+  private static final long MILLISECONDS_IN_DAY = 24 * 60 * 60 * 1000;
+
+  /**
+   * Test query result purger
+   *
+   * @throws InterruptedException the interrupted exception
+   * @throws IOException          Signals that an I/O exception has occurred.
+   */
+
+  @BeforeTest
+  public void setUp() throws IOException {
+    String resultsetPath = "target/" + getClass().getSimpleName();
+    conf = new Configuration();
+    conf.set(LensConfConstants.RESULT_SET_PARENT_DIR, resultsetPath);
+    conf.set(LensConfConstants.QUERY_HDFS_OUTPUT_PATH, "hdfsout");
+    conf.set(LensConfConstants.QUERY_RESULTSET_RETENTION, "1 day");
+    conf.set(LensConfConstants.HDFS_OUTPUT_RETENTION, "1 day");
+    conf.set(LensConfConstants.RESULTSET_PURGE_INTERVAL_IN_SECONDS, "1");
+    createTestFiles();
+  }
+
+  @AfterTest
+  public void cleanup() throws Exception {
+    Path dir = new Path(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR));
+    FileSystem fs = dir.getFileSystem(conf);
+    fs.delete(dir, true);
+  }
+
+  @Test
+  public void testQueryResultPurger() throws InterruptedException, IOException 
{
+    verify(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR), 2);
+    verify(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR) + "/" + 
conf.get(LensConfConstants.QUERY_HDFS_OUTPUT_PATH),
+      1);
+    QueryResultPurger queryResultPurger = new QueryResultPurger();
+    queryResultPurger.init(conf);
+    Thread.sleep(2000); // sleep for 2 seconds, enough to run query purger
+    queryResultPurger.stop();
+    verify(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR), 1);
+    verify(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR) + "/" + 
conf.get(LensConfConstants.QUERY_HDFS_OUTPUT_PATH),
+      0);
+  }
+
+  private void verify(String path, int count) {
+    File f = new File(path);
+    assertEquals(f.list().length, count);
+  }
+
+  private void createTestFiles() throws IOException {
+    long delta = 60 * 1000; //60 seconds
+    long lastModified = System.currentTimeMillis() - (MILLISECONDS_IN_DAY + 
delta);
+    File hdfsDir = new File(
+      conf.get(LensConfConstants.RESULT_SET_PARENT_DIR) + "/" + 
conf.get(LensConfConstants.QUERY_HDFS_OUTPUT_PATH)
+        + "/test-dir");
+    hdfsDir.mkdirs();
+    hdfsDir.setLastModified(lastModified);
+    File resultFile = new 
File(conf.get(LensConfConstants.RESULT_SET_PARENT_DIR) + "/test-result.txt");
+    resultFile.createNewFile();
+    resultFile.setLastModified(lastModified);
+  }
+}

http://git-wip-us.apache.org/repos/asf/lens/blob/b54c4b99/src/site/apt/admin/config.apt
----------------------------------------------------------------------
diff --git a/src/site/apt/admin/config.apt b/src/site/apt/admin/config.apt
index 3a0678f..76cbff0 100644
--- a/src/site/apt/admin/config.apt
+++ b/src/site/apt/admin/config.apt
@@ -239,4 +239,12 @@ Lens server configuration
 *--+--+---+--+
 
|105|lens.server.ws.resourcenames|session,metastore,query,quota,scheduler,index,log|These
 JAX-RS resources would be started in the specified order when lens-server 
starts up|
 *--+--+---+--+
+|106|lens.server.resultset.purge.enabled|false|Whether the query results will 
be purged|
+*--+--+---+--+
+|107|lens.server.resultsetpurger.sleep.interval.secs|3600|Interval at which 
query result purger runs|
+*--+--+---+--+
+|108|lens.server.query.resultset.retention|1 day|Lens query resultset 
retention period in days|
+*--+--+---+--+
+|109|lens.server.hdfs.output.retention|1 day|Hdfs output retention period|
+*--+--+---+--+
 The configuration parameters and their default values

Reply via email to