Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 7f55214e2 -> 461999eda


[GOBBLIN-454] Add retention support to the MysqlDatasetStateStore

Closes #2326 from
htran1/mysql_state_store_retention


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/461999ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/461999ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/461999ed

Branch: refs/heads/master
Commit: 461999eda9e72b8a05e9d8248a3942f5f90be5a6
Parents: 7f55214
Author: Hung Tran <hut...@linkedin.com>
Authored: Wed Apr 4 13:48:52 2018 -0700
Committer: Hung Tran <hut...@linkedin.com>
Committed: Wed Apr 4 13:48:52 2018 -0700

----------------------------------------------------------------------
 gobblin-data-management/build.gradle            |   1 +
 .../policy/CombineSelectionPolicy.java          |   3 +-
 .../dataset/TimeBasedDatasetStoreDataset.java   |  33 +-
 .../TimestampedDatasetStateStoreVersion.java    |  22 +-
 .../CleanableMysqlDatasetStoreDatasetTest.java  | 303 +++++++++++++++++++
 .../gobblin/metastore/MysqlStateStore.java      |  72 +++++
 .../metastore/MysqlStateStoreEntryManager.java  |  47 +++
 .../metadata/DatasetStateStoreEntryManager.java |   2 +
 .../metadata/StateStoreEntryManager.java        |   2 +
 .../gobblin/runtime/MysqlDatasetStateStore.java |  16 +
 .../MysqlDatasetStateStoreEntryManager.java     |  50 +++
 11 files changed, 547 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-data-management/build.gradle 
b/gobblin-data-management/build.gradle
index 3bb5d1e..add49ef 100644
--- a/gobblin-data-management/build.gradle
+++ b/gobblin-data-management/build.gradle
@@ -47,6 +47,7 @@ dependencies {
   compile externalDependency.junit
 
   testCompile project(":gobblin-test-utils")
+  testCompile project(path: ":gobblin-metastore", configuration: 
"testFixtures")
 
   testCompile externalDependency.calciteCore
   testCompile externalDependency.calciteAvatica

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java
index c3c70f8..51caf55 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java
@@ -41,6 +41,7 @@ import com.typesafe.config.ConfigFactory;
 
 import 
org.apache.gobblin.data.management.retention.policy.CombineRetentionPolicy;
 import org.apache.gobblin.data.management.version.DatasetVersion;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
@@ -95,7 +96,7 @@ public class CombineSelectionPolicy implements 
VersionSelectionPolicy<DatasetVer
 
     ImmutableList.Builder<VersionSelectionPolicy<DatasetVersion>> builder = 
ImmutableList.builder();
 
-    for (String combineClassName : 
config.getStringList(VERSION_SELECTION_POLICIES_PREFIX)) {
+    for (String combineClassName : ConfigUtils.getStringList(config, 
VERSION_SELECTION_POLICIES_PREFIX)) {
       try {
         builder.add((VersionSelectionPolicy<DatasetVersion>) 
GobblinConstructorUtils
             .invokeFirstConstructor(Class.forName(combineClassName), 
ImmutableList.<Object>of(config),

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java
index 0fa7c18..1eecad6 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java
@@ -17,8 +17,13 @@
 
 package org.apache.gobblin.data.management.retention.dataset;
 
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Properties;
+
+import com.google.common.collect.ImmutableList;
+import com.typesafe.config.Config;
+
 import org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy;
 import org.apache.gobblin.data.management.policy.VersionSelectionPolicy;
 import 
org.apache.gobblin.data.management.version.TimestampedDatasetStateStoreVersion;
@@ -27,6 +32,8 @@ import 
org.apache.gobblin.data.management.version.finder.TimestampedDatasetState
 import org.apache.gobblin.data.management.version.finder.VersionFinder;
 import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
 import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
 import lombok.Data;
 
 
@@ -35,6 +42,8 @@ import lombok.Data;
  */
 @Data
 public class TimeBasedDatasetStoreDataset extends 
CleanableDatasetStoreDataset<TimestampedDatasetVersion> {
+  private static final String SELECTION_POLICY_CLASS_KEY = 
"selection.policy.class";
+  private static final String DEFAULT_SELECTION_POLICY_CLASS = 
SelectBeforeTimeBasedPolicy.class.getName();
 
   private final VersionFinder<TimestampedDatasetStateStoreVersion> 
versionFinder;
   private final VersionSelectionPolicy<TimestampedDatasetVersion> 
versionSelectionPolicy;
@@ -42,7 +51,15 @@ public class TimeBasedDatasetStoreDataset extends 
CleanableDatasetStoreDataset<T
   public TimeBasedDatasetStoreDataset(Key key, 
List<DatasetStateStoreEntryManager> entries, Properties props) {
     super(key, entries);
     this.versionFinder = new TimestampedDatasetStateStoreVersionFinder();
-    this.versionSelectionPolicy = new 
SelectBeforeTimeBasedPolicy(ConfigUtils.propertiesToConfig(props));
+    Config propsAsConfig = ConfigUtils.propertiesToConfig(props);
+
+    // strip the retention config namespace since the selection policy looks 
for configuration without the namespace
+    Config retentionConfig = ConfigUtils.getConfigOrEmpty(propsAsConfig,
+        ConfigurableCleanableDataset.RETENTION_CONFIGURATION_KEY);
+    Config retentionConfigWithFallback = 
retentionConfig.withFallback(propsAsConfig);
+
+    this.versionSelectionPolicy = 
createSelectionPolicy(ConfigUtils.getString(retentionConfigWithFallback,
+        SELECTION_POLICY_CLASS_KEY, DEFAULT_SELECTION_POLICY_CLASS), 
retentionConfigWithFallback, props);
   }
 
   @Override
@@ -54,4 +71,18 @@ public class TimeBasedDatasetStoreDataset extends 
CleanableDatasetStoreDataset<T
   public VersionSelectionPolicy<TimestampedDatasetVersion> 
getVersionSelectionPolicy() {
     return this.versionSelectionPolicy;
   }
+
+  @SuppressWarnings("unchecked")
+  private VersionSelectionPolicy<TimestampedDatasetVersion> 
createSelectionPolicy(String className,
+      Config config, Properties jobProps) {
+    try {
+      return (VersionSelectionPolicy<TimestampedDatasetVersion>)
+          
GobblinConstructorUtils.invokeFirstConstructor(Class.forName(className),
+          ImmutableList.<Object> of(config), ImmutableList.<Object> of(config, 
jobProps),
+          ImmutableList.<Object> of(jobProps));
+    } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
+        | ClassNotFoundException e) {
+      throw new IllegalArgumentException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java
index 45a153e..080af07 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java
@@ -42,11 +42,29 @@ public class TimestampedDatasetStateStoreVersion extends 
TimestampedDatasetVersi
 
   @Override
   public boolean equals(Object obj) {
-    return super.equals(obj);
+    if (this == obj) {
+      return true;
+    }
+
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+
+    if (obj instanceof TimestampedDatasetStateStoreVersion) {
+      TimestampedDatasetStateStoreVersion other = 
(TimestampedDatasetStateStoreVersion)obj;
+
+      if (this.entry.equals(other.getEntry())) {
+        return super.equals(obj);
+      }
+    }
+
+    return false;
   }
 
   @Override
   public int hashCode() {
-    return this.version.hashCode();
+    int result = this.version.hashCode();
+    result = 31 * result + (entry != null ? entry.hashCode() : 0);
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java
new file mode 100644
index 0000000..7d29a17
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.retention;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.dbcp.BasicDataSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.retention.dataset.CleanableDataset;
+import 
org.apache.gobblin.data.management.retention.dataset.finder.TimeBasedDatasetStoreDatasetFinder;
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.DatasetStoreDataset;
+import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.MysqlDatasetStateStore;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Unit test for cleaning {@link MysqlStateStore} and {@link 
MysqlDatasetStateStore}
+ */
+@Test(singleThreaded = true)
+public class CleanableMysqlDatasetStoreDatasetTest {
+  private static final String TEST_STATE_STORE = "TestStateStore";
+  private static final String TEST_JOB_NAME1 = "TestJob1";
+  private static final String TEST_JOB_NAME2 = "TestJob2";
+  private static final String TEST_JOB_ID = "TestJobId";
+  private static final String TEST_TASK_ID_PREFIX = "TestTask-";
+  private static final String TEST_DATASET_URN1 = "TestDataset1";
+  private static final String TEST_DATASET_URN2 = "TestDataset2";
+
+  private MysqlStateStore<JobState> dbJobStateStore;
+  private DatasetStateStore<JobState.DatasetState> dbDatasetStateStore;
+  private long startTime = System.currentTimeMillis();
+  private Config config;
+
+  private ITestMetastoreDatabase testMetastoreDatabase;
+  private static final String TEST_USER = "testUser";
+  private static final String TEST_PASSWORD = "testPassword";
+
+  private static String getJobId(String jobIdBase, int index) {
+    return jobIdBase + index;
+  }
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
+    String jdbcUrl = this.testMetastoreDatabase.getJdbcUrl();
+    ConfigBuilder configBuilder = ConfigBuilder.create();
+    BasicDataSource mySqlDs = new BasicDataSource();
+
+    
mySqlDs.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
+    mySqlDs.setDefaultAutoCommit(false);
+    mySqlDs.setUrl(jdbcUrl);
+    mySqlDs.setUsername(TEST_USER);
+    mySqlDs.setPassword(TEST_PASSWORD);
+
+    this.dbJobStateStore = new MysqlStateStore<>(mySqlDs, TEST_STATE_STORE, 
false, JobState.class);
+
+    configBuilder.addPrimitive("selection.timeBased.lookbackTime", "10m");
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_TYPE_KEY, 
"mysql");
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, 
TEST_STATE_STORE);
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
jdbcUrl);
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, 
TEST_USER);
+    configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, 
TEST_PASSWORD);
+
+    ClassAliasResolver<DatasetStateStore.Factory> resolver = new 
ClassAliasResolver<>(DatasetStateStore.Factory.class);
+
+    DatasetStateStore.Factory stateStoreFactory = 
resolver.resolveClass("mysql").newInstance();
+    this.config = configBuilder.build();
+    this.dbDatasetStateStore = 
stateStoreFactory.createStateStore(configBuilder.build());
+
+    // clear data that may have been left behind by a prior test run
+    this.dbJobStateStore.delete(TEST_JOB_NAME1);
+    this.dbDatasetStateStore.delete(TEST_JOB_NAME1);
+    this.dbJobStateStore.delete(TEST_JOB_NAME2);
+    this.dbDatasetStateStore.delete(TEST_JOB_NAME2);
+  }
+
+  /**
+   * Test cleanup of the job state store
+   * @throws IOException
+   */
+  @Test
+  public void testCleanJobStateStore() throws IOException {
+    JobState jobState = new JobState(TEST_JOB_NAME1, getJobId(TEST_JOB_ID, 1));
+    jobState.setId(getJobId(TEST_JOB_ID, 1));
+    jobState.setProp("foo", "bar");
+    jobState.setState(JobState.RunningState.COMMITTED);
+    jobState.setStartTime(this.startTime);
+    jobState.setEndTime(this.startTime + 1000);
+    jobState.setDuration(1000);
+
+    for (int i = 0; i < 3; i++) {
+      TaskState taskState = new TaskState();
+      taskState.setJobId(getJobId(TEST_JOB_ID, 1));
+      taskState.setTaskId(TEST_TASK_ID_PREFIX + i);
+      taskState.setId(TEST_TASK_ID_PREFIX + i);
+      taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+      jobState.addTaskState(taskState);
+    }
+
+    // set old time to test that this state is deleted
+    this.dbJobStateStore.setTestTimestamp(System.currentTimeMillis()/1000 - 
(60 * 20));
+
+    this.dbJobStateStore.put(TEST_JOB_NAME1,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        jobState);
+
+    jobState.setJobName(TEST_JOB_NAME2);
+    jobState.setJobId(getJobId(TEST_JOB_ID, 2));
+    jobState.setId(getJobId(TEST_JOB_ID, 2));
+
+    // set current time to test that the state is not deleted
+    this.dbJobStateStore.setTestTimestamp(0);
+
+    this.dbJobStateStore.put(TEST_JOB_NAME2,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
+        jobState);
+
+    TimeBasedDatasetStoreDatasetFinder datasetFinder =
+        new TimeBasedDatasetStoreDatasetFinder(FileSystem.get(new 
Configuration()), ConfigUtils.configToProperties(config));
+    List<DatasetStoreDataset> datasets = datasetFinder.findDatasets();
+
+    Assert.assertTrue(this.dbJobStateStore.exists(TEST_JOB_NAME1,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+    Assert.assertTrue(this.dbJobStateStore.exists(TEST_JOB_NAME2,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    for (DatasetStoreDataset dataset : datasets) {
+      ((CleanableDataset) dataset).clean();
+    }
+
+    Assert.assertFalse(this.dbJobStateStore.exists(TEST_JOB_NAME1,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    // state with recent timestamp is not deleted
+    Assert.assertTrue(this.dbJobStateStore.exists(TEST_JOB_NAME2,
+        MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+  }
+
+  /**
+   * Test cleanup of the dataset state store. This test uses the combined 
selection policy to test that the newest 2
+   * entries are retained even when the timestamp is old.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testCleanDatasetStateStore() throws IOException {
+    JobState.DatasetState datasetState = new 
JobState.DatasetState(TEST_JOB_NAME1, getJobId(TEST_JOB_ID, 1));
+
+    datasetState.setDatasetUrn(TEST_DATASET_URN1);
+    datasetState.setState(JobState.RunningState.COMMITTED);
+    datasetState.setId(TEST_DATASET_URN1);
+    datasetState.setStartTime(this.startTime);
+    datasetState.setEndTime(this.startTime + 1000);
+    datasetState.setDuration(1000);
+
+    for (int i = 0; i < 3; i++) {
+      TaskState taskState = new TaskState();
+      taskState.setJobId(getJobId(TEST_JOB_ID, 1));
+      taskState.setTaskId(TEST_TASK_ID_PREFIX + i);
+      taskState.setId(TEST_TASK_ID_PREFIX + i);
+      taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+      datasetState.addTaskState(taskState);
+    }
+
+    // set old time to test that this state is deleted
+    
((MysqlDatasetStateStore)this.dbDatasetStateStore).setTestTimestamp(this.startTime/1000
 + 2 - (60 * 20));
+    datasetState.setJobId(getJobId(TEST_JOB_ID, 1));
+    this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN1, 
datasetState);
+
+    
((MysqlDatasetStateStore)this.dbDatasetStateStore).setTestTimestamp(this.startTime/1000
 + 4 - (60 * 20));
+    datasetState.setJobId(getJobId(TEST_JOB_ID, 2));
+    this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN1, 
datasetState);
+
+    
((MysqlDatasetStateStore)this.dbDatasetStateStore).setTestTimestamp(this.startTime/1000
 + 6 - (60 * 20));
+    datasetState.setJobId(getJobId(TEST_JOB_ID, 3));
+    this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN1, 
datasetState);
+
+    datasetState.setJobId(getJobId(TEST_JOB_ID, 1));
+
+    // persist a second dataset state to test that retrieval of multiple 
dataset states works
+    datasetState.setDatasetUrn(TEST_DATASET_URN2);
+    datasetState.setId(TEST_DATASET_URN2);
+    datasetState.setDuration(2000);
+
+    // set current time to test that the state is not deleted
+    ((MysqlDatasetStateStore)this.dbDatasetStateStore).setTestTimestamp(0);
+
+    this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, 
datasetState);
+    datasetState.setJobName(TEST_JOB_NAME2);
+    this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, 
datasetState);
+
+
+    Config cleanerConfig = config
+        .withValue("gobblin.retention.selection.policy.class", 
ConfigValueFactory.fromAnyRef("org.apache.gobblin.data.management.policy.CombineSelectionPolicy"))
+        .withValue("gobblin.retention.selection.combine.operation", 
ConfigValueFactory.fromAnyRef("intersect"))
+        .withValue("gobblin.retention.selection.combine.policy.classes", 
ConfigValueFactory.fromAnyRef("org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy,org.apache.gobblin.data.management.policy.NewestKSelectionPolicy"))
+        .withValue("gobblin.retention.selection.timeBased.lookbackTime", 
ConfigValueFactory.fromAnyRef("10m"))
+        .withValue("gobblin.retention.selection.newestK.versionsNotSelected", 
ConfigValueFactory.fromAnyRef("2"));
+
+    TimeBasedDatasetStoreDatasetFinder datasetFinder =
+        new TimeBasedDatasetStoreDatasetFinder(FileSystem.get(new 
Configuration()), ConfigUtils.configToProperties(cleanerConfig));
+    List<DatasetStoreDataset> datasets = datasetFinder.findDatasets();
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN1 + "-" + 
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN1 + "-" +  getJobId(TEST_JOB_ID, 1)
+    + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN1 + "-" +  getJobId(TEST_JOB_ID, 2)
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN1 + "-" +  getJobId(TEST_JOB_ID, 3)
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN2 + "-" + 
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN2 + "-" + getJobId(TEST_JOB_ID, 1) + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME2,
+        TEST_DATASET_URN2 + "-" + 
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME2,
+        TEST_DATASET_URN2 + "-" + getJobId(TEST_JOB_ID, 1) + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    for (DatasetStoreDataset dataset : datasets) {
+      ((CleanableDataset) dataset).clean();
+    }
+
+    // the most recent two entries (current and job id 3) should be retained
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN1 + "-" + 
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN1 + "-" +  getJobId(TEST_JOB_ID, 3)
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertFalse(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN1 + "-" +  getJobId(TEST_JOB_ID, 1)
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertFalse(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN1 + "-" +  getJobId(TEST_JOB_ID, 2)
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN2 + "-" + 
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1,
+        TEST_DATASET_URN2 + "-" + getJobId(TEST_JOB_ID, 1) + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME2,
+        TEST_DATASET_URN2 + "-" + 
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX
+            + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+
+    Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME2,
+        TEST_DATASET_URN2 + "-" + getJobId(TEST_JOB_ID, 1) + 
MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
index 169a5e1..b7694f6 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java
@@ -31,6 +31,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -40,6 +41,7 @@ import java.util.zip.GZIPOutputStream;
 import org.apache.commons.dbcp.BasicDataSource;
 import org.apache.hadoop.io.Text;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -47,6 +49,9 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import org.apache.gobblin.metastore.predicates.StoreNamePredicate;
 import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.io.StreamUtils;
@@ -106,6 +111,9 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
           + " store_name = ? AND table_name = ?)"
           + " ON DUPLICATE KEY UPDATE state = s.state";
 
+  private static final String SELECT_METADATA_TEMPLATE =
+      "SELECT store_name, table_name, modified_time from $TABLE$ where 
store_name like ?";
+
   // MySQL key length limit is 767 bytes
   private static final String CREATE_JOB_STATE_TABLE_TEMPLATE =
       "CREATE TABLE IF NOT EXISTS $TABLE$ (store_name varchar(100) CHARACTER 
SET latin1 COLLATE latin1_bin not null,"
@@ -122,6 +130,7 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
   private final String DELETE_JOB_STATE_SQL;
   private final String CLONE_JOB_STATE_SQL;
   private final String SELECT_STORE_NAMES_SQL;
+  private final String SELECT_METADATA_SQL;
 
   /**
    * Manages the persistence and retrieval of {@link State} in a MySQL database
@@ -146,6 +155,7 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
     DELETE_JOB_STATE_SQL = DELETE_JOB_STATE_TEMPLATE.replace("$TABLE$", 
stateStoreTableName);
     CLONE_JOB_STATE_SQL = CLONE_JOB_STATE_TEMPLATE.replace("$TABLE$", 
stateStoreTableName);
     SELECT_STORE_NAMES_SQL = SELECT_STORE_NAMES_TEMPLATE.replace("$TABLE$", 
stateStoreTableName);
+    SELECT_METADATA_SQL = SELECT_METADATA_TEMPLATE.replace("$TABLE$", 
stateStoreTableName);
 
     // create table if it does not exist
     String createJobTable = CREATE_JOB_STATE_TABLE_TEMPLATE.replace("$TABLE$", 
stateStoreTableName);
@@ -463,4 +473,66 @@ public class MysqlStateStore<T extends State> implements 
StateStore<T> {
       throw new IOException("failure deleting storeName " + storeName, e);
     }
   }
+
+  /**
+   * Gets entry managers for all tables matching the predicate
+   * @param predicate Predicate used to filter tables. To allow state stores 
to push down predicates, use native extensions
+   *                  of {@link StateStorePredicate}.
+   * @throws IOException
+   */
+  @Override
+  public List<? extends StateStoreEntryManager> 
getMetadataForTables(StateStorePredicate predicate)
+      throws IOException {
+    List<MysqlStateStoreEntryManager> entryManagers = Lists.newArrayList();
+
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement queryStatement = 
connection.prepareStatement(SELECT_METADATA_SQL)) {
+      String storeName = predicate instanceof StoreNamePredicate ? 
((StoreNamePredicate) predicate).getStoreName() : "%";
+      queryStatement.setString(1, storeName);
+
+      try (ResultSet rs = queryStatement.executeQuery()) {
+        while (rs.next()) {
+          String rsStoreName = rs.getString(1);
+          String rsTableName = rs.getString(2);
+          Timestamp timestamp = rs.getTimestamp(3);
+
+          StateStoreEntryManager entryManager =
+              new MysqlStateStoreEntryManager(rsStoreName, rsTableName, 
timestamp.getTime(), this);
+
+          if (predicate.apply(entryManager)) {
+            entryManagers.add(new MysqlStateStoreEntryManager(rsStoreName, 
rsTableName, timestamp.getTime(), this));
+          }
+        }
+      }
+    } catch (SQLException e) {
+      throw new IOException("failure getting metadata for tables", e);
+    }
+
+    return entryManagers;
+  }
+
+  /**
+   * For setting timestamps in tests
+   * @param timestamp 0 to set to default, non-zero to set an epoch time
+   * @throws SQLException
+   */
+  @VisibleForTesting
+  public void setTestTimestamp(long timestamp) throws IOException {
+    String statement = "SET TIMESTAMP =";
+
+    // 0 is used to reset to the default
+    if (timestamp > 0 ) {
+      statement += timestamp;
+    } else {
+      statement += " DEFAULT";
+    }
+
+    try (Connection connection = dataSource.getConnection();
+        PreparedStatement queryStatement =
+            connection.prepareStatement(statement)) {
+      queryStatement.execute();
+    } catch (SQLException e) {
+      throw new IOException("Could not set timestamp " + timestamp, e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreEntryManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreEntryManager.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreEntryManager.java
new file mode 100644
index 0000000..5d735a5
--- /dev/null
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreEntryManager.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.metastore;
+
+import java.io.IOException;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+
+
+/**
+ * A {@link StateStoreEntryManager} generated by {@link MysqlStateStore}.
+ */
+public class MysqlStateStoreEntryManager<T extends State> extends 
StateStoreEntryManager<T> {
+  private final MysqlStateStore<T> stateStore;
+
+  public MysqlStateStoreEntryManager(String storeName, String tableName, long 
modificationTime,
+      MysqlStateStore<T> stateStore) {
+    super(storeName, tableName, modificationTime, stateStore);
+    this.stateStore = stateStore;
+  }
+
+  @Override
+  public T readState() throws IOException {
+    return this.stateStore.get(getStoreName(), getTableName(), "");
+  }
+
+  @Override
+  public void delete() throws IOException {
+    this.stateStore.delete(getStoreName(), getTableName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
index 32bc851..40cff79 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.metastore.metadata;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.DatasetStateStore;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 
 
@@ -27,6 +28,7 @@ import lombok.Getter;
  * A {@link StateStoreEntryManager} in a {@link DatasetStateStore}.
  */
 @Getter
+@EqualsAndHashCode(exclude={"datasetStateStore"}, callSuper = true)
 public abstract class DatasetStateStoreEntryManager<T extends State> extends 
StateStoreEntryManager<T> {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
index 33d404b..fe58196 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java
@@ -24,6 +24,7 @@ import org.apache.gobblin.metastore.StateStore;
 
 
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 
 
 /**
@@ -32,6 +33,7 @@ import lombok.Data;
  * @param <T> type of {@link State} that can be read from this entry.
  */
 @Data
+@EqualsAndHashCode(exclude={"stateStore"})
 public abstract class StateStoreEntryManager<T extends State> {
 
   private final String storeName;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
index 6e3e57a..22a9718 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,6 +33,9 @@ import com.google.common.collect.Maps;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.DatasetStateStore;
 import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.metastore.MysqlStateStoreEntryManager;
+import org.apache.gobblin.metastore.predicates.StateStorePredicate;
+import 
org.apache.gobblin.runtime.metastore.mysql.MysqlDatasetStateStoreEntryManager;
 
 import javax.sql.DataSource;
 
@@ -132,4 +136,16 @@ public class MysqlDatasetStateStore extends 
MysqlStateStore<JobState.DatasetStat
     return Strings.isNullOrEmpty(datasetUrn) ? 
CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX
         : datasetUrn + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + 
DATASET_STATE_STORE_TABLE_SUFFIX;
   }
+
+  @Override
+  public List<MysqlDatasetStateStoreEntryManager> 
getMetadataForTables(StateStorePredicate predicate)
+      throws IOException {
+    // get the metadata from the parent class and convert
+    List<MysqlStateStoreEntryManager> entryManagers =
+        (List<MysqlStateStoreEntryManager>) 
super.getMetadataForTables(predicate);
+
+    return entryManagers.stream().map(entry -> new 
MysqlDatasetStateStoreEntryManager(entry.getStoreName(),
+        entry.getTableName(), entry.getTimestamp(), 
this)).collect(Collectors.toList());
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/mysql/MysqlDatasetStateStoreEntryManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/mysql/MysqlDatasetStateStoreEntryManager.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/mysql/MysqlDatasetStateStoreEntryManager.java
new file mode 100644
index 0000000..366c713
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/mysql/MysqlDatasetStateStoreEntryManager.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.metastore.mysql;
+
+import java.io.IOException;
+
+import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager;
+import org.apache.gobblin.metastore.metadata.StateStoreEntryManager;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.MysqlDatasetStateStore;
+
+
+/**
+ * A {@link StateStoreEntryManager} generated by {@link 
MysqlDatasetStateStore}.
+ */
+public class MysqlDatasetStateStoreEntryManager extends 
DatasetStateStoreEntryManager<JobState.DatasetState> {
+  private final MysqlDatasetStateStore stateStore;
+
+  public MysqlDatasetStateStoreEntryManager(String storeName, String 
tableName, long modificationTime,
+      MysqlDatasetStateStore stateStore) {
+    super(storeName, tableName, modificationTime, new 
DatasetStateStore.TableNameParser(tableName), stateStore);
+    this.stateStore = stateStore;
+  }
+
+  @Override
+  public JobState.DatasetState readState() throws IOException {
+    return this.stateStore.get(getStoreName(), getTableName(), 
this.getStateId());
+  }
+
+  @Override
+  public void delete() throws IOException {
+    this.stateStore.delete(getStoreName(), getTableName());
+  }
+}

Reply via email to