This is an automated email from the ASF dual-hosted git repository. ycai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git
The following commit(s) were added to refs/heads/trunk by this push: new 75b70c3 Cassandra diff should accept a provided job_id for retrying diffs 75b70c3 is described below commit 75b70c35eea537d038c9f72632eec3d141b05379 Author: Jyothsna Konisa <jkon...@apple.com> AuthorDate: Wed Jul 14 15:51:54 2021 -0700 Cassandra diff should accept a provided job_id for retrying diffs Patch by Jyothsna Konisa; reviewed by Dinesh Joshi, Yifan Cai for CASSANDRA-16968 --- spark-job/pom.xml | 7 +++ .../java/org/apache/cassandra/diff/DiffJob.java | 35 ++++++++++---- .../org/apache/cassandra/diff/JobMetadataDb.java | 2 +- .../org/apache/cassandra/diff/DiffJobTest.java | 55 ++++++++++++++++++++++ 4 files changed, 89 insertions(+), 10 deletions(-) diff --git a/spark-job/pom.xml b/spark-job/pom.xml index c7df5bd..cd1603b 100644 --- a/spark-job/pom.xml +++ b/spark-job/pom.xml @@ -67,5 +67,12 @@ <artifactId>junit</artifactId> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <version>3.5.10</version> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java index d744bff..2632c09 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java @@ -122,9 +122,12 @@ public class DiffJob { ClusterProvider metadataProvider = ClusterProvider.getProvider(configuration.clusterConfig("metadata"), "metadata"); JobMetadataDb.JobLifeCycle job = null; UUID jobId = null; - try (Cluster metadataCluster = metadataProvider.getCluster(); - Session metadataSession = metadataCluster.connect()) { + Cluster metadataCluster = null; + Session metadataSession = null; + try { + metadataCluster = metadataProvider.getCluster(); + metadataSession = metadataCluster.connect(); RetryStrategyProvider retryStrategyProvider = RetryStrategyProvider.create(configuration.retryOptions()); MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions(); JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions, retryStrategyProvider); @@ -197,18 +200,32 @@ public class DiffJob { Differ.shutdown(); JobMetadataDb.ProgressTracker.resetStatements(); } + if (metadataCluster != null) { + metadataCluster.close(); + } + if (metadataSession != null) { + metadataSession.close(); + } + } } - private static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) { + @VisibleForTesting + static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf, List<KeyspaceTablePair> keyspaceTables) { if (conf.jobId().isPresent()) { - return job.getJobParams(conf.jobId().get()); - } else { - return new Params(UUID.randomUUID(), - keyspaceTables, - conf.buckets(), - conf.splits()); + final Params jobParams = job.getJobParams(conf.jobId().get()); + if(jobParams != null) { + // When job_id is passed as a config property for the first time, we will not have metadata associated + // with job_id in metadata table. we should return jobParams from the table only when jobParams is not null + // Otherwise return new jobParams with provided job_id + return jobParams; + } } + final UUID jobId = conf.jobId().isPresent() ? conf.jobId().get() : UUID.randomUUID(); + return new Params(jobId, + keyspaceTables, + conf.buckets(), + conf.splits()); } private static List<Split> getSplits(JobConfiguration config, TokenHelper tokenHelper) { diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java index bef173a..71a802f 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java @@ -369,7 +369,7 @@ public class JobMetadataDb { metadataKeyspace, Schema.RUNNING_JOBS), params.jobId); if (!rs.one().getBool("[applied]")) { - logger.info("Aborting due to inability to mark job as running. " + + logger.info("Could not mark job as running. " + "Did a previous run of job id {} fail non-gracefully?", params.jobId); throw new RuntimeException("Unable to mark job running, aborting"); diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java index 9082970..1bf656d 100644 --- a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java +++ b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java @@ -20,11 +20,18 @@ package org.apache.cassandra.diff; import java.math.BigInteger; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; +import java.util.UUID; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class DiffJobTest { @@ -39,6 +46,37 @@ public class DiffJobTest splitTestHelper(TokenHelper.forPartitioner("RandomPartitioner")); } + @Test + public void testGetJobParamsWithJobIdProvidedShouldReturnNonNullConFigParams() { + final MockConfig mockConfig = new MockConfig(); + final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class); + final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>(); + final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs); + assertNotNull(params); + } + + @Test + public void testGetJobParamsDuringRetryShouldReturnPreviousParams() { + final MockConfig mockConfig = new MockConfig(); + final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class); + final DiffJob.Params mockParams = mock(DiffJob.Params.class); + when(mockJob.getJobParams(any())).thenAnswer(invocationOnMock -> mockParams); + final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>(); + final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs); + assertEquals(params, mockParams); + } + + @Test + public void testGetJobParamsWithNoJobId() { + final MockConfig mockConfig = mock(MockConfig.class); + when(mockConfig.jobId()).thenReturn(Optional.empty()); + + final JobMetadataDb.JobLifeCycle mockJob = mock(JobMetadataDb.JobLifeCycle.class); + final List<KeyspaceTablePair> keyspaceTablePairs = new ArrayList<>(); + final DiffJob.Params params = DiffJob.getJobParams(mockJob, mockConfig, keyspaceTablePairs); + assertNotNull(params.jobId); + } + private void splitTestHelper(TokenHelper tokens) { List<DiffJob.Split> splits = DiffJob.calculateSplits(50, 1, tokens); @@ -54,4 +92,21 @@ public class DiffJobTest for (int i = 0; i < splits.size(); i++) assertEquals(i, splits.get(i).splitNumber); } + + private class MockConfig extends AbstractMockJobConfiguration { + @Override + public int splits() { + return 2; + } + + @Override + public int buckets() { + return 2; + } + + @Override + public Optional<UUID> jobId() { + return Optional.of(UUID.randomUUID()); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org