Repository: incubator-gobblin Updated Branches: refs/heads/master 7f55214e2 -> 461999eda
[GOBBLIN-454] Add retention support to the MysqlDatasetStateStore Closes #2326 from htran1/mysql_state_store_retention Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/461999ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/461999ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/461999ed Branch: refs/heads/master Commit: 461999eda9e72b8a05e9d8248a3942f5f90be5a6 Parents: 7f55214 Author: Hung Tran <hut...@linkedin.com> Authored: Wed Apr 4 13:48:52 2018 -0700 Committer: Hung Tran <hut...@linkedin.com> Committed: Wed Apr 4 13:48:52 2018 -0700 ---------------------------------------------------------------------- gobblin-data-management/build.gradle | 1 + .../policy/CombineSelectionPolicy.java | 3 +- .../dataset/TimeBasedDatasetStoreDataset.java | 33 +- .../TimestampedDatasetStateStoreVersion.java | 22 +- .../CleanableMysqlDatasetStoreDatasetTest.java | 303 +++++++++++++++++++ .../gobblin/metastore/MysqlStateStore.java | 72 +++++ .../metastore/MysqlStateStoreEntryManager.java | 47 +++ .../metadata/DatasetStateStoreEntryManager.java | 2 + .../metadata/StateStoreEntryManager.java | 2 + .../gobblin/runtime/MysqlDatasetStateStore.java | 16 + .../MysqlDatasetStateStoreEntryManager.java | 50 +++ 11 files changed, 547 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-data-management/build.gradle b/gobblin-data-management/build.gradle index 3bb5d1e..add49ef 100644 --- a/gobblin-data-management/build.gradle +++ b/gobblin-data-management/build.gradle @@ -47,6 +47,7 @@ dependencies { compile externalDependency.junit testCompile project(":gobblin-test-utils") + testCompile project(path: ":gobblin-metastore", configuration: "testFixtures") testCompile externalDependency.calciteCore testCompile externalDependency.calciteAvatica http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java index c3c70f8..51caf55 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/policy/CombineSelectionPolicy.java @@ -41,6 +41,7 @@ import com.typesafe.config.ConfigFactory; import org.apache.gobblin.data.management.retention.policy.CombineRetentionPolicy; import org.apache.gobblin.data.management.version.DatasetVersion; +import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @@ -95,7 +96,7 @@ public class CombineSelectionPolicy implements VersionSelectionPolicy<DatasetVer ImmutableList.Builder<VersionSelectionPolicy<DatasetVersion>> builder = ImmutableList.builder(); - for (String combineClassName : config.getStringList(VERSION_SELECTION_POLICIES_PREFIX)) { + for (String combineClassName : ConfigUtils.getStringList(config, VERSION_SELECTION_POLICIES_PREFIX)) { try { builder.add((VersionSelectionPolicy<DatasetVersion>) GobblinConstructorUtils .invokeFirstConstructor(Class.forName(combineClassName), ImmutableList.<Object>of(config), http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java index 0fa7c18..1eecad6 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/retention/dataset/TimeBasedDatasetStoreDataset.java @@ -17,8 +17,13 @@ package org.apache.gobblin.data.management.retention.dataset; +import java.lang.reflect.InvocationTargetException; import java.util.List; import java.util.Properties; + +import com.google.common.collect.ImmutableList; +import com.typesafe.config.Config; + import org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy; import org.apache.gobblin.data.management.policy.VersionSelectionPolicy; import org.apache.gobblin.data.management.version.TimestampedDatasetStateStoreVersion; @@ -27,6 +32,8 @@ import org.apache.gobblin.data.management.version.finder.TimestampedDatasetState import org.apache.gobblin.data.management.version.finder.VersionFinder; import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + import lombok.Data; @@ -35,6 +42,8 @@ import lombok.Data; */ @Data public class TimeBasedDatasetStoreDataset extends CleanableDatasetStoreDataset<TimestampedDatasetVersion> { + private static final String SELECTION_POLICY_CLASS_KEY = "selection.policy.class"; + private static final String DEFAULT_SELECTION_POLICY_CLASS = SelectBeforeTimeBasedPolicy.class.getName(); private final VersionFinder<TimestampedDatasetStateStoreVersion> versionFinder; private final VersionSelectionPolicy<TimestampedDatasetVersion> versionSelectionPolicy; @@ -42,7 +51,15 @@ public class TimeBasedDatasetStoreDataset extends CleanableDatasetStoreDataset<T public TimeBasedDatasetStoreDataset(Key key, List<DatasetStateStoreEntryManager> entries, Properties props) { super(key, entries); this.versionFinder = new TimestampedDatasetStateStoreVersionFinder(); - this.versionSelectionPolicy = new SelectBeforeTimeBasedPolicy(ConfigUtils.propertiesToConfig(props)); + Config propsAsConfig = ConfigUtils.propertiesToConfig(props); + + // strip the retention config namespace since the selection policy looks for configuration without the namespace + Config retentionConfig = ConfigUtils.getConfigOrEmpty(propsAsConfig, + ConfigurableCleanableDataset.RETENTION_CONFIGURATION_KEY); + Config retentionConfigWithFallback = retentionConfig.withFallback(propsAsConfig); + + this.versionSelectionPolicy = createSelectionPolicy(ConfigUtils.getString(retentionConfigWithFallback, + SELECTION_POLICY_CLASS_KEY, DEFAULT_SELECTION_POLICY_CLASS), retentionConfigWithFallback, props); } @Override @@ -54,4 +71,18 @@ public class TimeBasedDatasetStoreDataset extends CleanableDatasetStoreDataset<T public VersionSelectionPolicy<TimestampedDatasetVersion> getVersionSelectionPolicy() { return this.versionSelectionPolicy; } + + @SuppressWarnings("unchecked") + private VersionSelectionPolicy<TimestampedDatasetVersion> createSelectionPolicy(String className, + Config config, Properties jobProps) { + try { + return (VersionSelectionPolicy<TimestampedDatasetVersion>) + GobblinConstructorUtils.invokeFirstConstructor(Class.forName(className), + ImmutableList.<Object> of(config), ImmutableList.<Object> of(config, jobProps), + ImmutableList.<Object> of(jobProps)); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException + | ClassNotFoundException e) { + throw new IllegalArgumentException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java index 45a153e..080af07 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/TimestampedDatasetStateStoreVersion.java @@ -42,11 +42,29 @@ public class TimestampedDatasetStateStoreVersion extends TimestampedDatasetVersi @Override public boolean equals(Object obj) { - return super.equals(obj); + if (this == obj) { + return true; + } + + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + if (obj instanceof TimestampedDatasetStateStoreVersion) { + TimestampedDatasetStateStoreVersion other = (TimestampedDatasetStateStoreVersion)obj; + + if (this.entry.equals(other.getEntry())) { + return super.equals(obj); + } + } + + return false; } @Override public int hashCode() { - return this.version.hashCode(); + int result = this.version.hashCode(); + result = 31 * result + (entry != null ? entry.hashCode() : 0); + return result; } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java new file mode 100644 index 0000000..7d29a17 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/retention/CleanableMysqlDatasetStoreDatasetTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.retention; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.dbcp.BasicDataSource; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.retention.dataset.CleanableDataset; +import org.apache.gobblin.data.management.retention.dataset.finder.TimeBasedDatasetStoreDatasetFinder; +import org.apache.gobblin.metastore.DatasetStateStore; +import org.apache.gobblin.metastore.DatasetStoreDataset; +import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase; +import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.MysqlDatasetStateStore; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * Unit test for cleaning {@link MysqlStateStore} and {@link MysqlDatasetStateStore} + */ +@Test(singleThreaded = true) +public class CleanableMysqlDatasetStoreDatasetTest { + private static final String TEST_STATE_STORE = "TestStateStore"; + private static final String TEST_JOB_NAME1 = "TestJob1"; + private static final String TEST_JOB_NAME2 = "TestJob2"; + private static final String TEST_JOB_ID = "TestJobId"; + private static final String TEST_TASK_ID_PREFIX = "TestTask-"; + private static final String TEST_DATASET_URN1 = "TestDataset1"; + private static final String TEST_DATASET_URN2 = "TestDataset2"; + + private MysqlStateStore<JobState> dbJobStateStore; + private DatasetStateStore<JobState.DatasetState> dbDatasetStateStore; + private long startTime = System.currentTimeMillis(); + private Config config; + + private ITestMetastoreDatabase testMetastoreDatabase; + private static final String TEST_USER = "testUser"; + private static final String TEST_PASSWORD = "testPassword"; + + private static String getJobId(String jobIdBase, int index) { + return jobIdBase + index; + } + + @BeforeClass + public void setUp() throws Exception { + this.testMetastoreDatabase = TestMetastoreDatabaseFactory.get(); + String jdbcUrl = this.testMetastoreDatabase.getJdbcUrl(); + ConfigBuilder configBuilder = ConfigBuilder.create(); + BasicDataSource mySqlDs = new BasicDataSource(); + + mySqlDs.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER); + mySqlDs.setDefaultAutoCommit(false); + mySqlDs.setUrl(jdbcUrl); + mySqlDs.setUsername(TEST_USER); + mySqlDs.setPassword(TEST_PASSWORD); + + this.dbJobStateStore = new MysqlStateStore<>(mySqlDs, TEST_STATE_STORE, false, JobState.class); + + configBuilder.addPrimitive("selection.timeBased.lookbackTime", "10m"); + configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_TYPE_KEY, "mysql"); + configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_STATE_STORE); + configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl); + configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER); + configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD); + + ClassAliasResolver<DatasetStateStore.Factory> resolver = new ClassAliasResolver<>(DatasetStateStore.Factory.class); + + DatasetStateStore.Factory stateStoreFactory = resolver.resolveClass("mysql").newInstance(); + this.config = configBuilder.build(); + this.dbDatasetStateStore = stateStoreFactory.createStateStore(configBuilder.build()); + + // clear data that may have been left behind by a prior test run + this.dbJobStateStore.delete(TEST_JOB_NAME1); + this.dbDatasetStateStore.delete(TEST_JOB_NAME1); + this.dbJobStateStore.delete(TEST_JOB_NAME2); + this.dbDatasetStateStore.delete(TEST_JOB_NAME2); + } + + /** + * Test cleanup of the job state store + * @throws IOException + */ + @Test + public void testCleanJobStateStore() throws IOException { + JobState jobState = new JobState(TEST_JOB_NAME1, getJobId(TEST_JOB_ID, 1)); + jobState.setId(getJobId(TEST_JOB_ID, 1)); + jobState.setProp("foo", "bar"); + jobState.setState(JobState.RunningState.COMMITTED); + jobState.setStartTime(this.startTime); + jobState.setEndTime(this.startTime + 1000); + jobState.setDuration(1000); + + for (int i = 0; i < 3; i++) { + TaskState taskState = new TaskState(); + taskState.setJobId(getJobId(TEST_JOB_ID, 1)); + taskState.setTaskId(TEST_TASK_ID_PREFIX + i); + taskState.setId(TEST_TASK_ID_PREFIX + i); + taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + jobState.addTaskState(taskState); + } + + // set old time to test that this state is deleted + this.dbJobStateStore.setTestTimestamp(System.currentTimeMillis()/1000 - (60 * 20)); + + this.dbJobStateStore.put(TEST_JOB_NAME1, + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, + jobState); + + jobState.setJobName(TEST_JOB_NAME2); + jobState.setJobId(getJobId(TEST_JOB_ID, 2)); + jobState.setId(getJobId(TEST_JOB_ID, 2)); + + // set current time to test that the state is not deleted + this.dbJobStateStore.setTestTimestamp(0); + + this.dbJobStateStore.put(TEST_JOB_NAME2, + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, + jobState); + + TimeBasedDatasetStoreDatasetFinder datasetFinder = + new TimeBasedDatasetStoreDatasetFinder(FileSystem.get(new Configuration()), ConfigUtils.configToProperties(config)); + List<DatasetStoreDataset> datasets = datasetFinder.findDatasets(); + + Assert.assertTrue(this.dbJobStateStore.exists(TEST_JOB_NAME1, + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + Assert.assertTrue(this.dbJobStateStore.exists(TEST_JOB_NAME2, + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + for (DatasetStoreDataset dataset : datasets) { + ((CleanableDataset) dataset).clean(); + } + + Assert.assertFalse(this.dbJobStateStore.exists(TEST_JOB_NAME1, + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + // state with recent timestamp is not deleted + Assert.assertTrue(this.dbJobStateStore.exists(TEST_JOB_NAME2, + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + } + + /** + * Test cleanup of the dataset state store. This test uses the combined selection policy to test that the newest 2 + * entries are retained even when the timestamp is old. + * + * @throws IOException + */ + @Test + public void testCleanDatasetStateStore() throws IOException { + JobState.DatasetState datasetState = new JobState.DatasetState(TEST_JOB_NAME1, getJobId(TEST_JOB_ID, 1)); + + datasetState.setDatasetUrn(TEST_DATASET_URN1); + datasetState.setState(JobState.RunningState.COMMITTED); + datasetState.setId(TEST_DATASET_URN1); + datasetState.setStartTime(this.startTime); + datasetState.setEndTime(this.startTime + 1000); + datasetState.setDuration(1000); + + for (int i = 0; i < 3; i++) { + TaskState taskState = new TaskState(); + taskState.setJobId(getJobId(TEST_JOB_ID, 1)); + taskState.setTaskId(TEST_TASK_ID_PREFIX + i); + taskState.setId(TEST_TASK_ID_PREFIX + i); + taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED); + datasetState.addTaskState(taskState); + } + + // set old time to test that this state is deleted + ((MysqlDatasetStateStore)this.dbDatasetStateStore).setTestTimestamp(this.startTime/1000 + 2 - (60 * 20)); + datasetState.setJobId(getJobId(TEST_JOB_ID, 1)); + this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN1, datasetState); + + ((MysqlDatasetStateStore)this.dbDatasetStateStore).setTestTimestamp(this.startTime/1000 + 4 - (60 * 20)); + datasetState.setJobId(getJobId(TEST_JOB_ID, 2)); + this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN1, datasetState); + + ((MysqlDatasetStateStore)this.dbDatasetStateStore).setTestTimestamp(this.startTime/1000 + 6 - (60 * 20)); + datasetState.setJobId(getJobId(TEST_JOB_ID, 3)); + this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN1, datasetState); + + datasetState.setJobId(getJobId(TEST_JOB_ID, 1)); + + // persist a second dataset state to test that retrieval of multiple dataset states works + datasetState.setDatasetUrn(TEST_DATASET_URN2); + datasetState.setId(TEST_DATASET_URN2); + datasetState.setDuration(2000); + + // set current time to test that the state is not deleted + ((MysqlDatasetStateStore)this.dbDatasetStateStore).setTestTimestamp(0); + + this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState); + datasetState.setJobName(TEST_JOB_NAME2); + this.dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState); + + + Config cleanerConfig = config + .withValue("gobblin.retention.selection.policy.class", ConfigValueFactory.fromAnyRef("org.apache.gobblin.data.management.policy.CombineSelectionPolicy")) + .withValue("gobblin.retention.selection.combine.operation", ConfigValueFactory.fromAnyRef("intersect")) + .withValue("gobblin.retention.selection.combine.policy.classes", ConfigValueFactory.fromAnyRef("org.apache.gobblin.data.management.policy.SelectBeforeTimeBasedPolicy,org.apache.gobblin.data.management.policy.NewestKSelectionPolicy")) + .withValue("gobblin.retention.selection.timeBased.lookbackTime", ConfigValueFactory.fromAnyRef("10m")) + .withValue("gobblin.retention.selection.newestK.versionsNotSelected", ConfigValueFactory.fromAnyRef("2")); + + TimeBasedDatasetStoreDatasetFinder datasetFinder = + new TimeBasedDatasetStoreDatasetFinder(FileSystem.get(new Configuration()), ConfigUtils.configToProperties(cleanerConfig)); + List<DatasetStoreDataset> datasets = datasetFinder.findDatasets(); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN1 + "-" + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN1 + "-" + getJobId(TEST_JOB_ID, 1) + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN1 + "-" + getJobId(TEST_JOB_ID, 2) + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN1 + "-" + getJobId(TEST_JOB_ID, 3) + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN2 + "-" + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN2 + "-" + getJobId(TEST_JOB_ID, 1) + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME2, + TEST_DATASET_URN2 + "-" + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME2, + TEST_DATASET_URN2 + "-" + getJobId(TEST_JOB_ID, 1) + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + for (DatasetStoreDataset dataset : datasets) { + ((CleanableDataset) dataset).clean(); + } + + // the most recent two entries (current and job id 3) should be retained + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN1 + "-" + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN1 + "-" + getJobId(TEST_JOB_ID, 3) + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertFalse(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN1 + "-" + getJobId(TEST_JOB_ID, 1) + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertFalse(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN1 + "-" + getJobId(TEST_JOB_ID, 2) + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN2 + "-" + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME1, + TEST_DATASET_URN2 + "-" + getJobId(TEST_JOB_ID, 1) + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME2, + TEST_DATASET_URN2 + "-" + MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + + Assert.assertTrue(this.dbDatasetStateStore.exists(TEST_JOB_NAME2, + TEST_DATASET_URN2 + "-" + getJobId(TEST_JOB_ID, 1) + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java index 169a5e1..b7694f6 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStore.java @@ -31,6 +31,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -40,6 +41,7 @@ import java.util.zip.GZIPOutputStream; import org.apache.commons.dbcp.BasicDataSource; import org.apache.hadoop.io.Text; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -47,6 +49,9 @@ import com.typesafe.config.Config; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.metadata.StateStoreEntryManager; +import org.apache.gobblin.metastore.predicates.StateStorePredicate; +import org.apache.gobblin.metastore.predicates.StoreNamePredicate; import org.apache.gobblin.password.PasswordManager; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.io.StreamUtils; @@ -106,6 +111,9 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { + " store_name = ? AND table_name = ?)" + " ON DUPLICATE KEY UPDATE state = s.state"; + private static final String SELECT_METADATA_TEMPLATE = + "SELECT store_name, table_name, modified_time from $TABLE$ where store_name like ?"; + // MySQL key length limit is 767 bytes private static final String CREATE_JOB_STATE_TABLE_TEMPLATE = "CREATE TABLE IF NOT EXISTS $TABLE$ (store_name varchar(100) CHARACTER SET latin1 COLLATE latin1_bin not null," @@ -122,6 +130,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { private final String DELETE_JOB_STATE_SQL; private final String CLONE_JOB_STATE_SQL; private final String SELECT_STORE_NAMES_SQL; + private final String SELECT_METADATA_SQL; /** * Manages the persistence and retrieval of {@link State} in a MySQL database @@ -146,6 +155,7 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { DELETE_JOB_STATE_SQL = DELETE_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName); CLONE_JOB_STATE_SQL = CLONE_JOB_STATE_TEMPLATE.replace("$TABLE$", stateStoreTableName); SELECT_STORE_NAMES_SQL = SELECT_STORE_NAMES_TEMPLATE.replace("$TABLE$", stateStoreTableName); + SELECT_METADATA_SQL = SELECT_METADATA_TEMPLATE.replace("$TABLE$", stateStoreTableName); // create table if it does not exist String createJobTable = CREATE_JOB_STATE_TABLE_TEMPLATE.replace("$TABLE$", stateStoreTableName); @@ -463,4 +473,66 @@ public class MysqlStateStore<T extends State> implements StateStore<T> { throw new IOException("failure deleting storeName " + storeName, e); } } + + /** + * Gets entry managers for all tables matching the predicate + * @param predicate Predicate used to filter tables. To allow state stores to push down predicates, use native extensions + * of {@link StateStorePredicate}. + * @throws IOException + */ + @Override + public List<? extends StateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate) + throws IOException { + List<MysqlStateStoreEntryManager> entryManagers = Lists.newArrayList(); + + try (Connection connection = dataSource.getConnection(); + PreparedStatement queryStatement = connection.prepareStatement(SELECT_METADATA_SQL)) { + String storeName = predicate instanceof StoreNamePredicate ? ((StoreNamePredicate) predicate).getStoreName() : "%"; + queryStatement.setString(1, storeName); + + try (ResultSet rs = queryStatement.executeQuery()) { + while (rs.next()) { + String rsStoreName = rs.getString(1); + String rsTableName = rs.getString(2); + Timestamp timestamp = rs.getTimestamp(3); + + StateStoreEntryManager entryManager = + new MysqlStateStoreEntryManager(rsStoreName, rsTableName, timestamp.getTime(), this); + + if (predicate.apply(entryManager)) { + entryManagers.add(new MysqlStateStoreEntryManager(rsStoreName, rsTableName, timestamp.getTime(), this)); + } + } + } + } catch (SQLException e) { + throw new IOException("failure getting metadata for tables", e); + } + + return entryManagers; + } + + /** + * For setting timestamps in tests + * @param timestamp 0 to set to default, non-zero to set an epoch time + * @throws SQLException + */ + @VisibleForTesting + public void setTestTimestamp(long timestamp) throws IOException { + String statement = "SET TIMESTAMP ="; + + // 0 is used to reset to the default + if (timestamp > 0 ) { + statement += timestamp; + } else { + statement += " DEFAULT"; + } + + try (Connection connection = dataSource.getConnection(); + PreparedStatement queryStatement = + connection.prepareStatement(statement)) { + queryStatement.execute(); + } catch (SQLException e) { + throw new IOException("Could not set timestamp " + timestamp, e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreEntryManager.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreEntryManager.java new file mode 100644 index 0000000..5d735a5 --- /dev/null +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/MysqlStateStoreEntryManager.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.metastore; + +import java.io.IOException; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.metadata.StateStoreEntryManager; + + +/** + * A {@link StateStoreEntryManager} generated by {@link MysqlStateStore}. + */ +public class MysqlStateStoreEntryManager<T extends State> extends StateStoreEntryManager<T> { + private final MysqlStateStore<T> stateStore; + + public MysqlStateStoreEntryManager(String storeName, String tableName, long modificationTime, + MysqlStateStore<T> stateStore) { + super(storeName, tableName, modificationTime, stateStore); + this.stateStore = stateStore; + } + + @Override + public T readState() throws IOException { + return this.stateStore.get(getStoreName(), getTableName(), ""); + } + + @Override + public void delete() throws IOException { + this.stateStore.delete(getStoreName(), getTableName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java index 32bc851..40cff79 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/DatasetStateStoreEntryManager.java @@ -20,6 +20,7 @@ package org.apache.gobblin.metastore.metadata; import org.apache.gobblin.configuration.State; import org.apache.gobblin.metastore.DatasetStateStore; +import lombok.EqualsAndHashCode; import lombok.Getter; @@ -27,6 +28,7 @@ import lombok.Getter; * A {@link StateStoreEntryManager} in a {@link DatasetStateStore}. */ @Getter +@EqualsAndHashCode(exclude={"datasetStateStore"}, callSuper = true) public abstract class DatasetStateStoreEntryManager<T extends State> extends StateStoreEntryManager<T> { /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java ---------------------------------------------------------------------- diff --git a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java index 33d404b..fe58196 100644 --- a/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java +++ b/gobblin-metastore/src/main/java/org/apache/gobblin/metastore/metadata/StateStoreEntryManager.java @@ -24,6 +24,7 @@ import org.apache.gobblin.metastore.StateStore; import lombok.Data; +import lombok.EqualsAndHashCode; /** @@ -32,6 +33,7 @@ import lombok.Data; * @param <T> type of {@link State} that can be read from this entry. */ @Data +@EqualsAndHashCode(exclude={"stateStore"}) public abstract class StateStoreEntryManager<T extends State> { private final String storeName; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java index 6e3e57a..22a9718 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/MysqlDatasetStateStore.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,9 @@ import com.google.common.collect.Maps; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metastore.DatasetStateStore; import org.apache.gobblin.metastore.MysqlStateStore; +import org.apache.gobblin.metastore.MysqlStateStoreEntryManager; +import org.apache.gobblin.metastore.predicates.StateStorePredicate; +import org.apache.gobblin.runtime.metastore.mysql.MysqlDatasetStateStoreEntryManager; import javax.sql.DataSource; @@ -132,4 +136,16 @@ public class MysqlDatasetStateStore extends MysqlStateStore<JobState.DatasetStat return Strings.isNullOrEmpty(datasetUrn) ? CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX : datasetUrn + "-" + CURRENT_DATASET_STATE_FILE_SUFFIX + DATASET_STATE_STORE_TABLE_SUFFIX; } + + @Override + public List<MysqlDatasetStateStoreEntryManager> getMetadataForTables(StateStorePredicate predicate) + throws IOException { + // get the metadata from the parent class and convert + List<MysqlStateStoreEntryManager> entryManagers = + (List<MysqlStateStoreEntryManager>) super.getMetadataForTables(predicate); + + return entryManagers.stream().map(entry -> new MysqlDatasetStateStoreEntryManager(entry.getStoreName(), + entry.getTableName(), entry.getTimestamp(), this)).collect(Collectors.toList()); + + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/461999ed/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/mysql/MysqlDatasetStateStoreEntryManager.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/mysql/MysqlDatasetStateStoreEntryManager.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/mysql/MysqlDatasetStateStoreEntryManager.java new file mode 100644 index 0000000..366c713 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metastore/mysql/MysqlDatasetStateStoreEntryManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.metastore.mysql; + +import java.io.IOException; + +import org.apache.gobblin.metastore.DatasetStateStore; +import org.apache.gobblin.metastore.metadata.DatasetStateStoreEntryManager; +import org.apache.gobblin.metastore.metadata.StateStoreEntryManager; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.MysqlDatasetStateStore; + + +/** + * A {@link StateStoreEntryManager} generated by {@link MysqlDatasetStateStore}. + */ +public class MysqlDatasetStateStoreEntryManager extends DatasetStateStoreEntryManager<JobState.DatasetState> { + private final MysqlDatasetStateStore stateStore; + + public MysqlDatasetStateStoreEntryManager(String storeName, String tableName, long modificationTime, + MysqlDatasetStateStore stateStore) { + super(storeName, tableName, modificationTime, new DatasetStateStore.TableNameParser(tableName), stateStore); + this.stateStore = stateStore; + } + + @Override + public JobState.DatasetState readState() throws IOException { + return this.stateStore.get(getStoreName(), getTableName(), this.getStateId()); + } + + @Override + public void delete() throws IOException { + this.stateStore.delete(getStoreName(), getTableName()); + } +}