This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bc93f46bfc Add confirmed_position and current_position at cdc job 
item info. (#29477)
8bc93f46bfc is described below

commit 8bc93f46bfc707ea455aee3cbc7b1c9ff007924b
Author: Xinze Guo <[email protected]>
AuthorDate: Thu Dec 21 13:59:22 2023 +0800

    Add confirmed_position and current_position at cdc job item info. (#29477)
    
    * Add confirmed_position and current_position at cdc job item info.
    
    * Reuse TransmissionJobItemInfo
    
    * Fix CDC inventory order by CSN cause NPE
---
 .../core/job/service/TransmissionJobManager.java   | 14 +++++--
 .../dialect/DialectPipelineSQLBuilder.java         |  9 ++++
 .../ingest/OpenGaussPositionInitializer.java       |  2 +-
 .../wal/decode/OpenGaussLogSequenceNumber.java     |  4 +-
 .../sqlbuilder/OpenGaussPipelineSQLBuilder.java    |  5 +++
 .../ingest/PostgreSQLPositionInitializer.java      |  2 +-
 .../postgresql/ingest/wal/WALPosition.java         |  2 +-
 .../ingest/wal/decode/BaseLogSequenceNumber.java   |  6 +--
 .../wal/decode/PostgreSQLLogSequenceNumber.java    |  4 +-
 .../sqlbuilder/PostgreSQLPipelineSQLBuilder.java   |  5 +++
 .../ingest/wal/WALEventConverterTest.java          |  9 ++--
 .../postgresql/ingest/wal/WALPositionTest.java     |  2 +-
 .../query/ShowStreamingJobStatusExecutor.java      | 28 ++++++++-----
 .../data/pipeline/cdc/api/CDCJobAPI.java           | 49 ++++++++++++++++++++++
 .../pipeline/cdc/core/pojo/CDCJobItemInfo.java}    | 18 ++++----
 .../pipeline/cdc/core/prepare/CDCJobPreparer.java  |  9 +---
 16 files changed, 119 insertions(+), 49 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
index 4772d36ed10..9dcd2cd4676 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobManager.java
@@ -18,14 +18,14 @@
 package org.apache.shardingsphere.data.pipeline.core.job.service;
 
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
+import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
+import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
-import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 
 import java.util.Collection;
@@ -71,7 +71,13 @@ public final class TransmissionJobManager {
         return result;
     }
     
-    private static int getInventoryFinishedPercentage(final 
TransmissionJobItemProgress jobItemProgress) {
+    /**
+     * Get inventory finished percentage.
+     *
+     * @param jobItemProgress job item progress
+     * @return inventory finished percentage
+     */
+    public static int getInventoryFinishedPercentage(final 
TransmissionJobItemProgress jobItemProgress) {
         if (JobStatus.EXECUTE_INCREMENTAL_TASK == jobItemProgress.getStatus() 
|| JobStatus.FINISHED == jobItemProgress.getStatus()) {
             return 100;
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
index 57897e8908d..cd85a842edc 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/dialect/DialectPipelineSQLBuilder.java
@@ -89,4 +89,13 @@ public interface DialectPipelineSQLBuilder extends 
DatabaseTypedSPI {
      * @throws SQLException SQL exception
      */
     Collection<String> buildCreateTableSQLs(DataSource dataSource, String 
schemaName, String tableName) throws SQLException;
+    
+    /**
+     * Build query current position SQL.
+     *
+     * @return built SQL
+     */
+    default Optional<String> buildQueryCurrentPositionSQL() {
+        return Optional.empty();
+    }
 }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
index 1e22f62641c..7a043f91136 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussPositionInitializer.java
@@ -56,7 +56,7 @@ public final class OpenGaussPositionInitializer implements 
PositionInitializer {
     
     @Override
     public WALPosition init(final String data) {
-        return new WALPosition(new 
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
+        return new WALPosition(new 
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data)));
     }
     
     /**
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
index 581efaaf4f0..893b64f51c4 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/OpenGaussLogSequenceNumber.java
@@ -32,8 +32,8 @@ public final class OpenGaussLogSequenceNumber implements 
BaseLogSequenceNumber {
     private final LogSequenceNumber logSequenceNumber;
     
     @Override
-    public long asLong() {
-        return logSequenceNumber.asLong();
+    public String asString() {
+        return logSequenceNumber.asString();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
index 9d191b093d9..d204d71fb02 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/sqlbuilder/OpenGaussPipelineSQLBuilder.java
@@ -76,6 +76,11 @@ public final class OpenGaussPipelineSQLBuilder implements 
DialectPipelineSQLBuil
         throw new CreateTableSQLGenerateException(tableName);
     }
     
+    @Override
+    public Optional<String> buildQueryCurrentPositionSQL() {
+        return Optional.of("SELECT * FROM pg_current_xlog_location()");
+    }
+    
     @Override
     public String getDatabaseType() {
         return "openGauss";
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
index 977671bfe7d..973853c7a30 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLPositionInitializer.java
@@ -53,7 +53,7 @@ public final class PostgreSQLPositionInitializer implements 
PositionInitializer
     
     @Override
     public WALPosition init(final String data) {
-        return new WALPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))));
+        return new WALPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data)));
     }
     
     private void createSlotIfNotExist(final Connection connection, final 
String slotName) throws SQLException {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
index 9da9b2edcea..93400eb853c 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
@@ -33,6 +33,6 @@ public final class WALPosition implements IngestPosition {
     
     @Override
     public String toString() {
-        return String.valueOf(logSequenceNumber.asLong());
+        return logSequenceNumber.asString();
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
index 26e64bda52d..5a92412bc96 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
@@ -23,11 +23,11 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
 public interface BaseLogSequenceNumber {
     
     /**
-     * Convert log sequence number to long.
+     * Convert log sequence number to String.
      *
-     * @return Long the sequence number of long value
+     * @return Long the sequence number of String value
      */
-    long asLong();
+    String asString();
     
     /**
      * Get the binded object.
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
index 59c8dab4995..9d7bf20b257 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
@@ -31,8 +31,8 @@ public final class PostgreSQLLogSequenceNumber implements 
BaseLogSequenceNumber
     private final LogSequenceNumber logSequenceNumber;
     
     @Override
-    public long asLong() {
-        return logSequenceNumber.asLong();
+    public String asString() {
+        return logSequenceNumber.asString();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
index 3e15bfbba9e..838d6bf244e 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/sqlbuilder/PostgreSQLPipelineSQLBuilder.java
@@ -121,6 +121,11 @@ public final class PostgreSQLPipelineSQLBuilder implements 
DialectPipelineSQLBui
         }
     }
     
+    @Override
+    public Optional<String> buildQueryCurrentPositionSQL() {
+        return Optional.of("SELECT * FROM pg_current_wal_lsn()");
+    }
+    
     @Override
     public String getDatabaseType() {
         return "PostgreSQL";
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index b399d2112d2..65c94324d39 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -65,7 +65,6 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class WALEventConverterTest {
     
@@ -148,8 +147,8 @@ class WALEventConverterTest {
         BeginTXEvent beginTXEvent = new BeginTXEvent(100);
         beginTXEvent.setLogSequenceNumber(new 
PostgreSQLLogSequenceNumber(logSequenceNumber));
         Record record = walEventConverter.convert(beginTXEvent);
-        assertTrue(record instanceof PlaceholderRecord);
-        assertThat(((WALPosition) 
record.getPosition()).getLogSequenceNumber().asLong(), is(21953976L));
+        assertInstanceOf(PlaceholderRecord.class, record);
+        assertThat(((WALPosition) 
record.getPosition()).getLogSequenceNumber().asString(), 
is(logSequenceNumber.asString()));
     }
     
     @Test
@@ -157,8 +156,8 @@ class WALEventConverterTest {
         CommitTXEvent commitTXEvent = new CommitTXEvent(1, 3468L);
         commitTXEvent.setLogSequenceNumber(new 
PostgreSQLLogSequenceNumber(logSequenceNumber));
         Record record = walEventConverter.convert(commitTXEvent);
-        assertTrue(record instanceof PlaceholderRecord);
-        assertThat(((WALPosition) 
record.getPosition()).getLogSequenceNumber().asLong(), is(21953976L));
+        assertInstanceOf(PlaceholderRecord.class, record);
+        assertThat(((WALPosition) 
record.getPosition()).getLogSequenceNumber().asString(), 
is(logSequenceNumber.asString()));
     }
     
     @Test
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
index 10a9bf5d7da..819e43cb30e 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPositionTest.java
@@ -28,6 +28,6 @@ class WALPositionTest {
     
     @Test
     void assertToString() {
-        assertThat(new WALPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(), 
is("100"));
+        assertThat(new WALPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(), 
is("0/64"));
     }
 }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index d5a713a0153..968a030cc0e 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -17,13 +17,15 @@
 
 package org.apache.shardingsphere.data.pipeline.cdc.distsql.handler.query;
 
+import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
+import org.apache.shardingsphere.data.pipeline.cdc.core.pojo.CDCJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.cdc.distsql.statement.ShowStreamingStatusStatement;
-import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
+import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
-import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
+import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -35,31 +37,35 @@ import java.util.stream.Collectors;
  */
 public final class ShowStreamingJobStatusExecutor implements 
QueryableRALExecutor<ShowStreamingStatusStatement> {
     
+    private final CDCJobAPI jobAPI = (CDCJobAPI) 
TypedSPILoader.getService(TransmissionJobAPI.class, "STREAMING");
+    
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingStatusStatement sqlStatement) {
-        Collection<TransmissionJobItemInfo> jobItemInfos = new 
TransmissionJobManager(new 
CDCJobType()).getJobItemInfos(sqlStatement.getJobId());
+        Collection<CDCJobItemInfo> jobItemInfos = 
jobAPI.getJobItemInfos(sqlStatement.getJobId());
         long currentTimeMillis = System.currentTimeMillis();
         return jobItemInfos.stream().map(each -> getRow(each, 
currentTimeMillis)).collect(Collectors.toList());
     }
     
-    private LocalDataQueryResultRow getRow(final TransmissionJobItemInfo 
jobItemInfo, final long currentTimeMillis) {
-        TransmissionJobItemProgress jobItemProgress = 
jobItemInfo.getJobItemProgress();
+    private LocalDataQueryResultRow getRow(final CDCJobItemInfo 
cdcJobItemInfo, final long currentTimeMillis) {
+        TransmissionJobItemInfo transmissionJobItemInfo = 
cdcJobItemInfo.getTransmissionJobItemInfo();
+        TransmissionJobItemProgress jobItemProgress = 
transmissionJobItemInfo.getJobItemProgress();
         if (null == jobItemProgress) {
-            return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), 
"", "", "", "", "", "", jobItemInfo.getErrorMessage());
+            return new 
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(), "", "", "", 
"", "", "", "", "", transmissionJobItemInfo.getErrorMessage());
         }
         String incrementalIdleSeconds = "";
         if 
(jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis() > 0) {
-            long latestActiveTimeMillis = 
Math.max(jobItemInfo.getStartTimeMillis(), 
jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
+            long latestActiveTimeMillis = 
Math.max(transmissionJobItemInfo.getStartTimeMillis(), 
jobItemProgress.getIncremental().getIncrementalLatestActiveTimeMillis());
             incrementalIdleSeconds = 
String.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - 
latestActiveTimeMillis));
         }
-        return new LocalDataQueryResultRow(jobItemInfo.getShardingItem(), 
jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
-                jobItemProgress.isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), 
jobItemInfo.getInventoryFinishedPercentage(),
-                incrementalIdleSeconds, jobItemInfo.getErrorMessage());
+        return new 
LocalDataQueryResultRow(transmissionJobItemInfo.getShardingItem(), 
jobItemProgress.getDataSourceName(), jobItemProgress.getStatus(),
+                jobItemProgress.isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(), jobItemProgress.getProcessedRecordsCount(), 
transmissionJobItemInfo.getInventoryFinishedPercentage(),
+                incrementalIdleSeconds, cdcJobItemInfo.getConfirmedPosition(), 
cdcJobItemInfo.getCurrentPosition(), transmissionJobItemInfo.getErrorMessage());
     }
     
     @Override
     public Collection<String> getColumnNames() {
-        return Arrays.asList("item", "data_source", "status", "active", 
"processed_records_count", "inventory_finished_percentage", 
"incremental_idle_seconds", "error_message");
+        return Arrays.asList("item", "data_source", "status", "active", 
"processed_records_count", "inventory_finished_percentage", 
"incremental_idle_seconds", "confirmed_position",
+                "current_position", "error_message");
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
index 29d67efe7f8..e3ca82057c2 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/CDCJobAPI.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJob
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.config.YamlCDCJobConfiguration.YamlSinkConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.swapper.YamlCDCJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
+import org.apache.shardingsphere.data.pipeline.cdc.core.pojo.CDCJobItemInfo;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineContextManager;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
@@ -52,11 +53,15 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJob
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
+import 
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.incremental.IncrementalTaskPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
+import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPipelineSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
+import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
@@ -67,13 +72,18 @@ import 
org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
 import 
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
 
+import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
@@ -266,6 +276,45 @@ public final class CDCJobAPI implements TransmissionJobAPI 
{
         }
     }
     
+    /**
+     * Get job item infos.
+     *
+     * @param jobId job id
+     * @return job item infos
+     */
+    public List<CDCJobItemInfo> getJobItemInfos(final String jobId) {
+        CDCJobConfiguration jobConfig = new 
PipelineJobConfigurationManager(jobType).getJobConfiguration(jobId);
+        ShardingSphereDatabase database = 
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(jobConfig.getDatabaseName());
+        Collection<TransmissionJobItemInfo> jobItemInfos = new 
TransmissionJobManager(jobType).getJobItemInfos(jobId);
+        List<CDCJobItemInfo> result = new LinkedList<>();
+        for (TransmissionJobItemInfo each : jobItemInfos) {
+            TransmissionJobItemProgress jobItemProgress = 
each.getJobItemProgress();
+            if (null == jobItemProgress) {
+                result.add(new CDCJobItemInfo(each, "", ""));
+                continue;
+            }
+            result.add(new CDCJobItemInfo(each, 
jobItemProgress.getIncremental().getIncrementalPosition().map(Object::toString).orElse(""),
+                    getCurrentPosition(database, 
jobItemProgress.getDataSourceName())));
+        }
+        return result;
+    }
+    
+    private String getCurrentPosition(final ShardingSphereDatabase database, 
final String dataSourceName) {
+        StorageUnit storageUnit = 
database.getResourceMetaData().getStorageUnits().get(dataSourceName);
+        DialectPipelineSQLBuilder sqlBuilder = 
DatabaseTypedSPILoader.getService(DialectPipelineSQLBuilder.class, 
storageUnit.getStorageType());
+        Optional<String> queryCurrentPositionSQL = 
sqlBuilder.buildQueryCurrentPositionSQL();
+        if (!queryCurrentPositionSQL.isPresent()) {
+            return "";
+        }
+        try (Connection connection = 
storageUnit.getDataSource().getConnection()) {
+            ResultSet resultSet = 
connection.createStatement().executeQuery(queryCurrentPositionSQL.get());
+            resultSet.next();
+            return resultSet.getString(1);
+        } catch (final SQLException ex) {
+            throw new PipelineInternalException(ex);
+        }
+    }
+    
     @Override
     public void commit(final String jobId) throws SQLException {
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/pojo/CDCJobItemInfo.java
similarity index 64%
copy from 
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
copy to 
kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/pojo/CDCJobItemInfo.java
index 9da9b2edcea..cfbc0d0f2b5 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALPosition.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/pojo/CDCJobItemInfo.java
@@ -15,24 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
+package org.apache.shardingsphere.data.pipeline.cdc.core.pojo;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.BaseLogSequenceNumber;
+import 
org.apache.shardingsphere.data.pipeline.core.pojo.TransmissionJobItemInfo;
 
 /**
- * WAL position.
+ * CDC job item info.
  */
 @RequiredArgsConstructor
 @Getter
-public final class WALPosition implements IngestPosition {
+public class CDCJobItemInfo {
     
-    private final BaseLogSequenceNumber logSequenceNumber;
+    private final TransmissionJobItemInfo transmissionJobItemInfo;
     
-    @Override
-    public String toString() {
-        return String.valueOf(logSequenceNumber.asLong());
-    }
+    private final String confirmedPosition;
+    
+    private final String currentPosition;
 }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
index bb19d367c2a..602a2d6a033 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/prepare/CDCJobPreparer.java
@@ -48,7 +48,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -126,9 +125,7 @@ public final class CDCJobPreparer {
             }
             Dumper dumper = new InventoryDumper(each, channel, 
jobItemContext.getSourceDataSource(), jobItemContext.getSourceMetaDataLoader());
             Importer importer = importerUsed.get() ? null
-                    : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, 
jobItemContext.getSink(),
-                            
needSorting(taskConfig.getDumperContext().getCommonContext().getDataSourceConfig().getDatabaseType()),
-                            importerConfig.getRateLimitAlgorithm());
+                    : new CDCImporter(channelProgressPairs, 
importerConfig.getBatchSize(), 100, TimeUnit.MILLISECONDS, 
jobItemContext.getSink(), false, importerConfig.getRateLimitAlgorithm());
             jobItemContext.getInventoryTasks().add(new 
CDCInventoryTask(PipelineTaskUtils.generateInventoryTaskId(each), 
processContext.getInventoryDumperExecuteEngine(),
                     processContext.getInventoryImporterExecuteEngine(), 
dumper, importer, position));
             if (!(position.get() instanceof FinishedPosition)) {
@@ -138,10 +135,6 @@ public final class CDCJobPreparer {
         log.info("initInventoryTasks cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
     }
     
-    private boolean needSorting(final DatabaseType databaseType) {
-        return new 
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().isSupportGlobalCSN();
-    }
-    
     private void initIncrementalTask(final CDCJobItemContext jobItemContext, 
final AtomicBoolean importerUsed, final List<CDCChannelProgressPair> 
channelProgressPairs) {
         CDCTaskConfiguration taskConfig = jobItemContext.getTaskConfig();
         IncrementalDumperContext dumperContext = taskConfig.getDumperContext();

Reply via email to