This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8bc93f46bfc Add confirmed_position and current_position at cdc job
item info. (#29477)
8bc93f46bfc is described below
commit 8bc93f46bfc707ea455aee3cbc7b1c9ff007924b
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Dec 21 13:59:22 2023 +0800
Add confirmed_position and current_position at cdc job item info. (#29477)
* Add confirmed_position and current_position at cdc job item info.
* Reuse TransmissionJobItemInfo
* Fix CDC inventory order by CSN cause NPE
---
.../core/job/service/TransmissionJobManager.java | 14 +++++--
.../dialect/DialectPipelineSQLBuilder.java | 9 ++++
.../ingest/OpenGaussPositionInitializer.java | 2 +-
.../wal/decode/OpenGaussLogSequenceNumber.java | 4 +-
.../sqlbuilder/OpenGaussPipelineSQLBuilder.java | 5 +++
.../ingest/PostgreSQLPositionInitializer.java | 2 +-
.../postgresql/ingest/wal/WALPosition.java | 2 +-
.../ingest/wal/decode/BaseLogSequenceNumber.java | 6 +--
.../wal/decode/PostgreSQLLogSequenceNumber.java | 4 +-
.../sqlbuilder/PostgreSQLPipelineSQLBuilder.java | 5 +++
.../ingest/wal/WALEventConverterTest.java | 9 ++--
.../postgresql/ingest/wal/WALPositionTest.java | 2 +-
.../query/ShowStreamingJobStatusExecutor.java | 28 ++++++++-----
.../data/pipeline/cdc/api/CDCJobAPI.java | 49 ++++++++++++++++++++++
.../pipeline/cdc/core/pojo/CDCJobItemInfo.java} | 18 ++++----
.../pipeline/cdc/core/prepare/CDCJobPreparer.java | 9 +---
16 files changed, 119 insertions(+), 49 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 4772d36ed10..9dcd2cd4676 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -18,14 +18,14 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import java.util.Collection;
@@ -71,7 +71,13 @@ public final class TransmissionJobManager {
return result;
}
- private static int getInventoryFinishedPercentage(final
TransmissionJobItemProgress jobItemProgress) {
+ /**
+ * Get inventory finished percentage.
+ *
+ * @param jobItemProgress job item progress
+ * @return inventory finished percentage
+ */
+ public static int getInventoryFinishedPercentage(final
TransmissionJobItemProgress jobItemProgress) {
if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemProgress.getStatus()
|| JobStatus.FINISHED == jobItemProgress.getStatus()) {
return 100;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
index 57897e8908d..cd85a842edc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
@@ -89,4 +89,13 @@ public interface DialectPipelineSQLBuilder extends
DatabaseTypedSPI {
* @throws SQLException SQL exception
*/
Collection<String> buildCreateTableSQLs(DataSource dataSource, String
schemaName, String tableName) throws SQLException;
+
+ /**
+ * Build query current position SQL.
+ *
+ * @return built SQL
+ */
+ default Optional<String> buildQueryCurrentPositionSQL() {
+ return Optional.empty();
+ }
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 1e22f62641c..7a043f91136 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -56,7 +56,7 @@ public final class OpenGaussPositionInitializer implements
PositionInitializer {
@Override
public WALPosition init(final String data) {
- return new WALPosition(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
+ return new WALPosition(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data)));
}
/**
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
index 581efaaf4f0..893b64f51c4 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
@@ -32,8 +32,8 @@ public final class OpenGaussLogSequenceNumber implements
BaseLogSequenceNumber {
private final LogSequenceNumber logSequenceNumber;
@Override
- public long asLong() {
- return logSequenceNumber.asLong();
+ public String asString() {
+ return logSequenceNumber.asString();
}
@Override
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 9d191b093d9..d204d71fb02 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -76,6 +76,11 @@ public final class OpenGaussPipelineSQLBuilder implements
DialectPipelineSQLBuil
throw new CreateTableSQLGenerateException(tableName);
}
+ @Override
+ public Optional<String> buildQueryCurrentPositionSQL() {
+ return Optional.of("SELECT * FROM pg_current_xlog_location()");
+ }
+
@Override
public String getDatabaseType() {
return "openGauss";
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 977671bfe7d..973853c7a30 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -53,7 +53,7 @@ public final class PostgreSQLPositionInitializer implements
PositionInitializer
@Override
public WALPosition init(final String data) {
- return new WALPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
+ return new WALPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data)));
}
private void createSlotIfNotExist(final Connection connection, final
String slotName) throws SQLException {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
index 9da9b2edcea..93400eb853c 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
@@ -33,6 +33,6 @@ public final class WALPosition implements IngestPosition {
@Override
public String toString() {
- return String.valueOf(logSequenceNumber.asLong());
+ return logSequenceNumber.asString();
}
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
index 26e64bda52d..5a92412bc96 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
@@ -23,11 +23,11 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
public interface BaseLogSequenceNumber {
/**
- * Convert log sequence number to long.
+ * Convert log sequence number to String.
*
- * @return Long the sequence number of long value
+ * @return Long the sequence number of String value
*/
- long asLong();
+ String asString();
/**
* Get the binded object.
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
index 59c8dab4995..9d7bf20b257 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
@@ -31,8 +31,8 @@ public final class PostgreSQLLogSequenceNumber implements
BaseLogSequenceNumber
private final LogSequenceNumber logSequenceNumber;
@Override
- public long asLong() {
- return logSequenceNumber.asLong();
+ public String asString() {
+ return logSequenceNumber.asString();
}
@Override
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 3e15bfbba9e..838d6bf244e 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -121,6 +121,11 @@ public final class PostgreSQLPipelineSQLBuilder implements
DialectPipelineSQLBui
}
}
+ @Override
+ public Optional<String> buildQueryCurrentPositionSQL() {
+ return Optional.of("SELECT * FROM pg_current_wal_lsn()");
+ }
+
@Override
public String getDatabaseType() {
return "PostgreSQL";
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index b399d2112d2..65c94324d39 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -65,7 +65,6 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
class WALEventConverterTest {
@@ -148,8 +147,8 @@ class WALEventConverterTest {
BeginTXEvent beginTXEvent = new BeginTXEvent(100);
beginTXEvent.setLogSequenceNumber(new
PostgreSQLLogSequenceNumber(logSequenceNumber));
Record record = walEventConverter.convert(beginTXEvent);
- assertTrue(record instanceof PlaceholderRecord);
- assertThat(((WALPosition)
record.getPosition()).getLogSequenceNumber().asLong(), is(21953976L));
+ assertInstanceOf(PlaceholderRecord.class, record);
+ assertThat(((WALPosition)
record.getPosition()).getLogSequenceNumber().asString(),
is(logSequenceNumber.asString()));
}
@Test
@@ -157,8 +156,8 @@ class WALEventConverterTest {
CommitTXEvent commitTXEvent = new CommitTXEvent(1, 3468L);
commitTXEvent.setLogSequenceNumber(new
PostgreSQLLogSequenceNumber(logSequenceNumber));
Record record = walEventConverter.convert(commitTXEvent);
- assertTrue(record instanceof PlaceholderRecord);
- assertThat(((WALPosition)
record.getPosition()).getLogSequenceNumber().asLong(), is(21953976L));
+ assertInstanceOf(PlaceholderRecord.class, record);
+ assertThat(((WALPosition)
record.getPosition()).getLogSequenceNumber().asString(),
is(logSequenceNumber.asString()));
}
@Test
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
index 10a9bf5d7da..819e43cb30e 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
@@ -28,6 +28,6 @@ class WALPositionTest {
@Test
void assertToString() {
- assertThat(new WALPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(),
is("100"));
+ assertThat(new WALPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(),
is("0/64"));
}
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index d5a713a0153..968a030cc0e 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -17,13 +17,15 @@
package org.apache.shardingsphere.data.pipeline.cdc.distsql.handler.query;
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.core.pojo.CDCJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.ShowStreamingStatusStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
+import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.util.Arrays;
import java.util.Collection;
@@ -35,31 +37,35 @@ import java.util.stream.Collectors;
*/
public final class ShowStreamingJobStatusExecutor implements
QueryableRALExecutor<ShowStreamingStatusStatement> {
+ private final CDCJobAPI jobAPI = (CDCJobAPI)
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingStatusStatement sqlStatement) {
- Collection<TransmissionJobItemInfo> jobItemInfos = new
TransmissionJobManager(new
CDCJobType()).getJobItemInfos(sqlStatement.getJobId());
+ Collection<CDCJobItemInfo> jobItemInfos =
jobAPI.getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> getRow(each,
currentTimeMillis)).collect(Collectors.toList());
}
- private LocalDataQueryResultRow getRow(final TransmissionJobItemInfo
jobItemInfo, final long currentTimeMillis) {
- TransmissionJobItemProgress jobItemProgress =
jobItemInfo.getJobItemProgress();
+ private LocalDataQueryResultRow getRow(final CDCJobItemInfo
cdcJobItemInfo, final long currentTimeMillis) {
+ TransmissionJobItemInfo transmissionJobItemInfo =
cdcJobItemInfo.getTransmissionJobItemInfo();
+ TransmissionJobItemProgress jobItemProgress =
transmissionJobItemInfo.getJobItemProgress();
if (null == jobItemProgress) {
- return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(),
"", "", "", "", "", "", jobItemInfo.getErrorMessage());
+ return new
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(), "", "", "",
"", "", "", "", "", transmissionJobItemInfo.getErrorMessage());
}
String incrementalIdleSeconds = "";
if
(jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
- long latestActiveTimeMillis =
Math.max(jobItemInfo.getStartTimeMillis(),
jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
+ long latestActiveTimeMillis =
Math.max(transmissionJobItemInfo.getStartTimeMillis(),
jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
incrementalIdleSeconds =
String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis -
latestActiveTimeMillis));
}
- return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(),
jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
- jobItemProgress.isActive() ? Boolean.TRUE.toString() :
Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(),
jobItemInfo.getInventoryFinishedPercentage(),
- incrementalIdleSeconds, jobItemInfo.getErrorMessage());
+ return new
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(),
jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
+ jobItemProgress.isActive() ? Boolean.TRUE.toString() :
Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(),
transmissionJobItemInfo.getInventoryFinishedPercentage(),
+ incrementalIdleSeconds, cdcJobItemInfo.getConfirmedPosition(),
cdcJobItemInfo.getCurrentPosition(), transmissionJobItemInfo.getErrorMessage());
}
@Override
public Collection<String> getColumnNames() {
- return Arrays.asList("item", "data_source", "status", "active",
"processed_records_count", "inventory_finished_percentage",
"incremental_idle_seconds", "error_message");
+ return Arrays.asList("item", "data_source", "status", "active",
"processed_records_count", "inventory_finished_percentage",
"incremental_idle_seconds", "confirmed_position",
+ "current_position", "error_message");
}
@Override
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 29d67efe7f8..e3ca82057c2 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJob
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.data.pipeline.cdc.core.pojo.CDCJobItemInfo;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
@@ -52,11 +53,15 @@ import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
+import
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -67,13 +72,18 @@ import
org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
+import java.sql.Connection;
+import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDateTime;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
@@ -266,6 +276,45 @@ public final class CDCJobAPI implements TransmissionJobAPI
{
}
}
+ /**
+ * Get job item infos.
+ *
+ * @param jobId job id
+ * @return job item infos
+ */
+ public List<CDCJobItemInfo> getJobItemInfos(final String jobId) {
+ CDCJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
+ ShardingSphereDatabase database =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
+ Collection<TransmissionJobItemInfo> jobItemInfos = new
TransmissionJobManager(jobType).getJobItemInfos(jobId);
+ List<CDCJobItemInfo> result = new LinkedList<>();
+ for (TransmissionJobItemInfo each : jobItemInfos) {
+ TransmissionJobItemProgress jobItemProgress =
each.getJobItemProgress();
+ if (null == jobItemProgress) {
+ result.add(new CDCJobItemInfo(each, "", ""));
+ continue;
+ }
+ result.add(new CDCJobItemInfo(each,
jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse(""),
+ getCurrentPosition(database,
jobItemProgress.getDataSourceName())));
+ }
+ return result;
+ }
+
+ private String getCurrentPosition(final ShardingSphereDatabase database,
final String dataSourceName) {
+ StorageUnit storageUnit =
database.getResourceMetaData().getStorageUnits().get(dataSourceName);
+ DialectPipelineSQLBuilder sqlBuilder =
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class,
storageUnit.getStorageType());
+ Optional<String> queryCurrentPositionSQL =
sqlBuilder.buildQueryCurrentPositionSQL();
+ if (!queryCurrentPositionSQL.isPresent()) {
+ return "";
+ }
+ try (Connection connection =
storageUnit.getDataSource().getConnection()) {
+ ResultSet resultSet =
connection.createStatement().executeQuery(queryCurrentPositionSQL.get());
+ resultSet.next();
+ return resultSet.getString(1);
+ } catch (final SQLException ex) {
+ throw new PipelineInternalException(ex);
+ }
+ }
+
@Override
public void commit(final String jobId) throws SQLException {
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/pojo/CDCJobItemInfo.java
similarity index 64%
copy from
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
copy to
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/pojo/CDCJobItemInfo.java
index 9da9b2edcea..cfbc0d0f2b5 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/pojo/CDCJobItemInfo.java
@@ -15,24 +15,22 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
+package org.apache.shardingsphere.data.pipeline.cdc.core.pojo;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
+import
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
/**
- * WAL position.
+ * CDC job item info.
*/
@RequiredArgsConstructor
@Getter
-public final class WALPosition implements IngestPosition {
+public class CDCJobItemInfo {
- private final BaseLogSequenceNumber logSequenceNumber;
+ private final TransmissionJobItemInfo transmissionJobItemInfo;
- @Override
- public String toString() {
- return String.valueOf(logSequenceNumber.asLong());
- }
+ private final String confirmedPosition;
+
+ private final String currentPosition;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index bb19d367c2a..602a2d6a033 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -48,7 +48,6 @@ import
org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import java.sql.SQLException;
import java.util.Collection;
@@ -126,9 +125,7 @@ public final class CDCJobPreparer {
}
Dumper dumper = new InventoryDumper(each, channel,
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
Importer importer = importerUsed.get() ? null
- : new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS,
jobItemContext.getSink(),
-
needSorting(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType()),
- importerConfig.getRateLimitAlgorithm());
+ : new CDCImporter(channelProgressPairs,
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS,
jobItemContext.getSink(), false, importerConfig.getRateLimitAlgorithm());
jobItemContext.getInventoryTasks().add(new
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each),
processContext.getInventoryDumperExecuteEngine(),
processContext.getInventoryImporterExecuteEngine(),
dumper, importer, position));
if (!(position.get() instanceof FinishedPosition)) {
@@ -138,10 +135,6 @@ public final class CDCJobPreparer {
log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() -
startTimeMillis);
}
- private boolean needSorting(final DatabaseType databaseType) {
- return new
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().isSupportGlobalCSN();
- }
-
private void initIncrementalTask(final CDCJobItemContext jobItemContext,
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair>
channelProgressPairs) {
CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
IncrementalDumperContext dumperContext = taskConfig.getDumperContext();