This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a9cdcf8b7b2 Add test cases on IncrementalTaskPositionManager (#33409)
a9cdcf8b7b2 is described below
commit a9cdcf8b7b216fda6170eafe095b7ae188ffca3d
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Oct 26 14:14:41 2024 +0800
Add test cases on IncrementalTaskPositionManager (#33409)
---
.../IncrementalTaskPositionManager.java | 18 ++-
.../IncrementalTaskPositionManagerTest.java | 140 +++++++++++++++++++++
2 files changed, 147 insertions(+), 11 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
index 8367dd211bf..32514b9fa4c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManager.java
@@ -82,28 +82,24 @@ public final class IncrementalTaskPositionManager {
final long startTimeMillis = System.currentTimeMillis();
log.info("Cleanup position, database type: {}, pipeline data source
type: {}", databaseType.getType(), pipelineDataSourceConfig.getType());
if (pipelineDataSourceConfig instanceof
ShardingSpherePipelineDataSourceConfiguration) {
- destroyPosition(jobId,
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig,
dialectPositionManager);
+ destroyPosition(jobId,
(ShardingSpherePipelineDataSourceConfiguration) pipelineDataSourceConfig);
} else if (pipelineDataSourceConfig instanceof
StandardPipelineDataSourceConfiguration) {
- destroyPosition(jobId, (StandardPipelineDataSourceConfiguration)
pipelineDataSourceConfig, dialectPositionManager);
+ destroyPosition(jobId, (StandardPipelineDataSourceConfiguration)
pipelineDataSourceConfig);
}
log.info("Destroy position cost {} ms.", System.currentTimeMillis() -
startTimeMillis);
}
- private void destroyPosition(final String jobId,
- final
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig, final
DialectIncrementalPositionManager positionInitializer) throws SQLException {
+ private void destroyPosition(final String jobId, final
ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig) throws
SQLException {
for (DataSourcePoolProperties each : new
YamlDataSourceConfigurationSwapper().getDataSourcePoolPropertiesMap(pipelineDataSourceConfig.getRootConfig()).values())
{
try (PipelineDataSource dataSource = new
PipelineDataSource(DataSourcePoolCreator.create(each), databaseType)) {
- positionInitializer.destroy(dataSource, jobId);
+ dialectPositionManager.destroy(dataSource, jobId);
}
}
}
- private void destroyPosition(final String jobId, final
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig,
- final DialectIncrementalPositionManager
positionInitializer) throws SQLException {
- try (
- PipelineDataSource dataSource = new PipelineDataSource(
-
DataSourcePoolCreator.create((DataSourcePoolProperties)
pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
- positionInitializer.destroy(dataSource, jobId);
+ private void destroyPosition(final String jobId, final
StandardPipelineDataSourceConfiguration pipelineDataSourceConfig) throws
SQLException {
+ try (PipelineDataSource dataSource = new
PipelineDataSource(DataSourcePoolCreator.create((DataSourcePoolProperties)
pipelineDataSourceConfig.getDataSourceConfiguration()), databaseType)) {
+ dialectPositionManager.destroy(dataSource, jobId);
}
}
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManagerTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManagerTest.java
new file mode 100644
index 00000000000..3b7385afeeb
--- /dev/null
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/incremental/IncrementalTaskPositionManagerTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.preparer.incremental;
+
+import lombok.SneakyThrows;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIncrementalPositionManager;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.JobItemIncrementalTasksProgress;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
+import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
+import org.apache.shardingsphere.test.fixture.jdbc.MockedDataSource;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.internal.configuration.plugins.Plugins;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class IncrementalTaskPositionManagerTest {
+
+ private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "H2");
+
+ @Mock
+ private DialectIncrementalPositionManager dialectPositionManager;
+
+ private IncrementalTaskPositionManager incrementalTaskPositionManager;
+
+ @SneakyThrows(ReflectiveOperationException.class)
+ @BeforeEach
+ void setUp() {
+ incrementalTaskPositionManager = new
IncrementalTaskPositionManager(databaseType);
+
Plugins.getMemberAccessor().set(IncrementalTaskPositionManager.class.getDeclaredField("dialectPositionManager"),
incrementalTaskPositionManager, dialectPositionManager);
+ }
+
+ @Test
+ void assertGetPositionWithInitialProgress() throws SQLException {
+ JobItemIncrementalTasksProgress initialProgress =
mock(JobItemIncrementalTasksProgress.class);
+ IngestPosition position = mock(IngestPosition.class);
+
when(initialProgress.getIncrementalPosition()).thenReturn(Optional.of(position));
+ IncrementalDumperContext dumperContext =
mockIncrementalDumperContext();
+ assertThat(incrementalTaskPositionManager.getPosition(initialProgress,
dumperContext, mock(PipelineDataSourceManager.class)), is(position));
+ }
+
+ @Test
+ void assertGetPositionWithoutIncrementalProgress() throws SQLException {
+ JobItemIncrementalTasksProgress initialProgress =
mock(JobItemIncrementalTasksProgress.class);
+
when(initialProgress.getIncrementalPosition()).thenReturn(Optional.empty());
+ IncrementalDumperContext dumperContext =
mockIncrementalDumperContext();
+ PipelineDataSourceManager dataSourceManager =
mock(PipelineDataSourceManager.class);
+ PipelineDataSource dataSource = mock(PipelineDataSource.class);
+
when(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())).thenReturn(dataSource);
+ IngestPosition position = mock(IngestPosition.class);
+ when(dialectPositionManager.init(dataSource,
dumperContext.getJobId())).thenReturn(position);
+ assertThat(incrementalTaskPositionManager.getPosition(initialProgress,
dumperContext, dataSourceManager), is(position));
+ }
+
+ @Test
+ void assertGetPositionWithoutInitialProgress() throws SQLException {
+ IncrementalDumperContext dumperContext =
mockIncrementalDumperContext();
+ PipelineDataSourceManager dataSourceManager =
mock(PipelineDataSourceManager.class);
+ PipelineDataSource dataSource = mock(PipelineDataSource.class);
+
when(dataSourceManager.getDataSource(dumperContext.getCommonContext().getDataSourceConfig())).thenReturn(dataSource);
+ IngestPosition position = mock(IngestPosition.class);
+ when(dialectPositionManager.init(dataSource,
dumperContext.getJobId())).thenReturn(position);
+ assertThat(incrementalTaskPositionManager.getPosition(null,
dumperContext, dataSourceManager), is(position));
+ }
+
+ private IncrementalDumperContext mockIncrementalDumperContext() {
+ IncrementalDumperContext result = mock(IncrementalDumperContext.class,
RETURNS_DEEP_STUBS);
+ PipelineDataSourceConfiguration dataSourceConfig =
mock(PipelineDataSourceConfiguration.class);
+
when(result.getCommonContext().getDataSourceConfig()).thenReturn(dataSourceConfig);
+ return result;
+ }
+
+ @Test
+ void
assertDestroyPositionWithShardingSpherePipelineDataSourceConfiguration() throws
SQLException {
+ YamlRootConfiguration rootConfig = new YamlRootConfiguration();
+ Map<String, Object> dataSourceProps = new HashMap<>();
+ dataSourceProps.put("dataSourceClassName",
MockedDataSource.class.getName());
+ dataSourceProps.put("url", "jdbc:mock://127.0.0.1/foo_ds");
+ rootConfig.getDataSources().put("foo_ds", dataSourceProps);
+ ShardingSpherePipelineDataSourceConfiguration pipelineDataSourceConfig
= new ShardingSpherePipelineDataSourceConfiguration(rootConfig);
+ incrementalTaskPositionManager.destroyPosition("foo_job",
pipelineDataSourceConfig);
+ verify(dialectPositionManager).destroy(any(), eq("foo_job"));
+ }
+
+ @Test
+ void assertDestroyPositionWithStandardPipelineDataSourceConfiguration()
throws SQLException {
+ Map<String, Object> dataSourceProps = new HashMap<>();
+ dataSourceProps.put("dataSourceClassName",
MockedDataSource.class.getName());
+ dataSourceProps.put("url", "jdbc:mock://127.0.0.1/foo_ds");
+ StandardPipelineDataSourceConfiguration pipelineDataSourceConfig = new
StandardPipelineDataSourceConfiguration(dataSourceProps);
+ incrementalTaskPositionManager.destroyPosition("foo_job",
pipelineDataSourceConfig);
+ verify(dialectPositionManager).destroy(any(), eq("foo_job"));
+ }
+
+ @Test
+ void assertDestroyPositionWithUnknownPipelineDataSourceConfiguration()
throws SQLException {
+ incrementalTaskPositionManager.destroyPosition("foo_job",
mock(PipelineDataSourceConfiguration.class));
+ verify(dialectPositionManager, times(0)).destroy(any(), eq("foo_job"));
+ }
+}